Reuse shard model across 3 phases in BalancedShardsAllocator

This commit is contained in:
Yannick Welsch 2016-03-07 17:21:41 +01:00
parent f6ae9ec4f6
commit 64e84dcc76
3 changed files with 228 additions and 274 deletions

View File

@ -597,6 +597,13 @@ public class RoutingNodes implements Iterable<RoutingNode> {
} }
/**
* Returns the number of routing nodes
*/
public int size() {
return nodesToShards.size();
}
public static final class UnassignedShards implements Iterable<ShardRouting> { public static final class UnassignedShards implements Iterable<ShardRouting> {
private final RoutingNodes nodes; private final RoutingNodes nodes;

View File

@ -19,7 +19,6 @@
package org.elasticsearch.cluster.routing.allocation.allocator; package org.elasticsearch.cluster.routing.allocation.allocator;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.apache.lucene.util.ArrayUtil; import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.IntroSorter; import org.apache.lucene.util.IntroSorter;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
@ -28,9 +27,7 @@ import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.elasticsearch.cluster.routing.allocation.decider.Decision; import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.cluster.routing.allocation.decider.Decision.Type; import org.elasticsearch.cluster.routing.allocation.decider.Decision.Type;
@ -42,18 +39,14 @@ import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.gateway.PriorityComparator; import org.elasticsearch.gateway.PriorityComparator;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.IdentityHashMap; import java.util.IdentityHashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.function.Predicate;
import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING; import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING;
@ -104,24 +97,14 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
@Override @Override
public boolean allocate(RoutingAllocation allocation) { public boolean allocate(RoutingAllocation allocation) {
if (allocation.routingNodes().size() == 0) {
/* with no nodes this is pointless */
return false;
}
final Balancer balancer = new Balancer(logger, allocation, weightFunction, threshold); final Balancer balancer = new Balancer(logger, allocation, weightFunction, threshold);
boolean changed = false; boolean changed = balancer.allocateUnassigned();
if (allocation.routingNodes().unassigned().size() > 0) {
changed |= balancer.allocateUnassigned();
}
changed |= balancer.moveShards(); changed |= balancer.moveShards();
if (allocation.hasPendingAsyncFetch() == false) {
/*
* see https://github.com/elastic/elasticsearch/issues/14387
* if we allow rebalance operations while we are still fetching shard store data
* we might end up with unnecessary rebalance operations which can be super confusion/frustrating
* since once the fetches come back we might just move all the shards back again.
* Therefore we only do a rebalance if we have fetched all information.
*/
changed |= balancer.balance(); changed |= balancer.balance();
} else {
logger.debug("skipping rebalance due to in-flight shard/store fetches");
}
return changed; return changed;
} }
@ -202,8 +185,8 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
} }
private float weight(Balancer balancer, ModelNode node, String index, int numAdditionalShards) { private float weight(Balancer balancer, ModelNode node, String index, int numAdditionalShards) {
final float weightShard = (node.numShards() + numAdditionalShards - balancer.avgShardsPerNode()); final float weightShard = node.numShards() + numAdditionalShards - balancer.avgShardsPerNode();
final float weightIndex = (node.numShards(index) + numAdditionalShards - balancer.avgShardsPerNode(index)); final float weightIndex = node.numShards(index) + numAdditionalShards - balancer.avgShardsPerNode(index);
return theta0 * weightShard + theta1 * weightIndex; return theta0 * weightShard + theta1 * weightIndex;
} }
@ -215,7 +198,6 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
public static class Balancer { public static class Balancer {
private final ESLogger logger; private final ESLogger logger;
private final Map<String, ModelNode> nodes = new HashMap<>(); private final Map<String, ModelNode> nodes = new HashMap<>();
private final HashSet<String> indices = new HashSet<>();
private final RoutingAllocation allocation; private final RoutingAllocation allocation;
private final RoutingNodes routingNodes; private final RoutingNodes routingNodes;
private final WeightFunction weight; private final WeightFunction weight;
@ -224,19 +206,15 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
private final MetaData metaData; private final MetaData metaData;
private final float avgShardsPerNode; private final float avgShardsPerNode;
private final Predicate<ShardRouting> assignedFilter = shard -> shard.assignedToNode();
public Balancer(ESLogger logger, RoutingAllocation allocation, WeightFunction weight, float threshold) { public Balancer(ESLogger logger, RoutingAllocation allocation, WeightFunction weight, float threshold) {
this.logger = logger; this.logger = logger;
this.allocation = allocation; this.allocation = allocation;
this.weight = weight; this.weight = weight;
this.threshold = threshold; this.threshold = threshold;
this.routingNodes = allocation.routingNodes(); this.routingNodes = allocation.routingNodes();
for (RoutingNode node : routingNodes) {
nodes.put(node.nodeId(), new ModelNode(node.nodeId()));
}
metaData = routingNodes.metaData(); metaData = routingNodes.metaData();
avgShardsPerNode = ((float) metaData.totalNumberOfShards()) / nodes.size(); avgShardsPerNode = ((float) metaData.totalNumberOfShards()) / routingNodes.size();
buildModelFromAssigned();
} }
/** /**
@ -270,17 +248,6 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
return new NodeSorter(nodesArray(), weight, this); return new NodeSorter(nodesArray(), weight, this);
} }
private boolean initialize(RoutingNodes routing, RoutingNodes.UnassignedShards unassigned) {
if (logger.isTraceEnabled()) {
logger.trace("Start distributing Shards");
}
for (ObjectCursor<String> index : allocation.routingTable().indicesRouting().keys()) {
indices.add(index.value);
}
buildModelFromAssigned(routing.shards(assignedFilter));
return allocateUnassigned(unassigned);
}
private static float absDelta(float lower, float higher) { private static float absDelta(float lower, float higher) {
assert higher >= lower : higher + " lt " + lower +" but was expected to be gte"; assert higher >= lower : higher + " lt " + lower +" but was expected to be gte";
return Math.abs(higher - lower); return Math.abs(higher - lower);
@ -294,12 +261,36 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
} }
/** /**
* Allocates all possible unassigned shards * Balances the nodes on the cluster model according to the weight function.
* The actual balancing is delegated to {@link #balanceByWeights()}
*
* @return <code>true</code> if the current configuration has been * @return <code>true</code> if the current configuration has been
* changed, otherwise <code>false</code> * changed, otherwise <code>false</code>
*/ */
final boolean allocateUnassigned() { private boolean balance() {
return balance(true); if (logger.isTraceEnabled()) {
logger.trace("Start balancing cluster");
}
if (allocation.hasPendingAsyncFetch()) {
/*
* see https://github.com/elastic/elasticsearch/issues/14387
* if we allow rebalance operations while we are still fetching shard store data
* we might end up with unnecessary rebalance operations which can be super confusion/frustrating
* since once the fetches come back we might just move all the shards back again.
* Therefore we only do a rebalance if we have fetched all information.
*/
logger.debug("skipping rebalance due to in-flight shard/store fetches");
return false;
}
if (allocation.deciders().canRebalance(allocation).type() != Type.YES) {
logger.trace("skipping rebalance as it is disabled");
return false;
}
if (nodes.size() < 2) { /* skip if we only have one node */
logger.trace("skipping rebalance as single node only");
return false;
}
return balanceByWeights();
} }
/** /**
@ -316,28 +307,10 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
* @return <code>true</code> if the current configuration has been * @return <code>true</code> if the current configuration has been
* changed, otherwise <code>false</code> * changed, otherwise <code>false</code>
*/ */
public boolean balance() { private boolean balanceByWeights() {
return balance(false); boolean changed = false;
} final NodeSorter sorter = newNodeSorter();
final AllocationDeciders deciders = allocation.deciders();
private boolean balance(boolean onlyAssign) {
if (this.nodes.isEmpty()) {
/* with no nodes this is pointless */
return false;
}
if (logger.isTraceEnabled()) {
if (onlyAssign) {
logger.trace("Start balancing cluster");
} else {
logger.trace("Start assigning unassigned shards");
}
}
final RoutingNodes.UnassignedShards unassigned = routingNodes.unassigned();
boolean changed = initialize(routingNodes, unassigned);
if (onlyAssign == false && changed == false && allocation.deciders().canRebalance(allocation).type() == Type.YES) {
NodeSorter sorter = newNodeSorter();
if (nodes.size() > 1) { /* skip if we only have one node */
AllocationDeciders deciders = allocation.deciders();
final ModelNode[] modelNodes = sorter.modelNodes; final ModelNode[] modelNodes = sorter.modelNodes;
final float[] weights = sorter.weights; final float[] weights = sorter.weights;
for (String index : buildWeightOrderedIndices(sorter)) { for (String index : buildWeightOrderedIndices(sorter)) {
@ -349,7 +322,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
for (int i = 0; i < modelNodes.length; i++) { for (int i = 0; i < modelNodes.length; i++) {
ModelNode modelNode = modelNodes[i]; ModelNode modelNode = modelNodes[i];
if (modelNode.getIndex(index) != null if (modelNode.getIndex(index) != null
|| deciders.canAllocate(indexMetaData, modelNode.getRoutingNode(routingNodes), allocation).type() != Type.NO) { || deciders.canAllocate(indexMetaData, modelNode.getRoutingNode(), allocation).type() != Type.NO) {
// swap nodes at position i and relevantNodes // swap nodes at position i and relevantNodes
modelNodes[i] = modelNodes[relevantNodes]; modelNodes[i] = modelNodes[relevantNodes];
modelNodes[relevantNodes] = modelNode; modelNodes[relevantNodes] = modelNode;
@ -431,8 +404,6 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
} }
} }
} }
}
}
return changed; return changed;
} }
@ -450,7 +421,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
* to the nodes we relocated them from. * to the nodes we relocated them from.
*/ */
private String[] buildWeightOrderedIndices(NodeSorter sorter) { private String[] buildWeightOrderedIndices(NodeSorter sorter) {
final String[] indices = this.indices.toArray(new String[this.indices.size()]); final String[] indices = allocation.routingTable().indicesRouting().keys().toArray(String.class);
final float[] deltas = new float[indices.length]; final float[] deltas = new float[indices.length];
for (int i = 0; i < deltas.length; i++) { for (int i = 0; i < deltas.length; i++) {
sorter.reset(indices[i]); sorter.reset(indices[i]);
@ -502,20 +473,16 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
* @return <code>true</code> if the allocation has changed, otherwise <code>false</code> * @return <code>true</code> if the allocation has changed, otherwise <code>false</code>
*/ */
public boolean moveShards() { public boolean moveShards() {
if (nodes.isEmpty()) { // Iterate over the started shards interleaving between nodes, and check if they can remain. In the presence of throttling
/* with no nodes this is pointless */
return false;
}
// Create a copy of the started shards interleaving between nodes, and check if they can remain. In the presence of throttling
// shard movements, the goal of this iteration order is to achieve a fairer movement of shards from the nodes that are // shard movements, the goal of this iteration order is to achieve a fairer movement of shards from the nodes that are
// offloading the shards. // offloading the shards.
List<ShardRouting> shards = new ArrayList<>(); boolean changed = false;
int index = 0; int index = 0;
boolean found = true; boolean found = true;
final NodeSorter sorter = newNodeSorter();
while (found) { while (found) {
found = false; found = false;
for (RoutingNode routingNode : routingNodes) { for (RoutingNode routingNode : allocation.routingNodes()) {
if (index >= routingNode.size()) { if (index >= routingNode.size()) {
continue; continue;
} }
@ -523,26 +490,26 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
ShardRouting shardRouting = routingNode.get(index); ShardRouting shardRouting = routingNode.get(index);
// we can only move started shards... // we can only move started shards...
if (shardRouting.started()) { if (shardRouting.started()) {
shards.add(shardRouting); final ModelNode sourceNode = nodes.get(shardRouting.currentNodeId());
assert sourceNode != null && sourceNode.containsShard(shardRouting);
Decision decision = allocation.deciders().canRemain(shardRouting, routingNode, allocation);
if (decision.type() == Decision.Type.NO) {
changed |= moveShard(sorter, shardRouting, sourceNode, routingNode);
}
} }
} }
index++; index++;
} }
if (shards.isEmpty()) {
return false; return changed;
} }
final RoutingNodes.UnassignedShards unassigned = routingNodes.unassigned(); /**
boolean changed = initialize(routingNodes, unassigned); * Move started shard to the minimal eligible node with respect to the weight function
if (changed == false) { *
final NodeSorter sorter = newNodeSorter(); * @return <code>true</code> if the shard was moved successfully, otherwise <code>false</code>
final ModelNode[] modelNodes = sorter.modelNodes; */
for (ShardRouting shardRouting : shards) { private boolean moveShard(NodeSorter sorter, ShardRouting shardRouting, ModelNode sourceNode, RoutingNode routingNode) {
final ModelNode sourceNode = nodes.get(shardRouting.currentNodeId());
assert sourceNode != null && sourceNode.containsShard(shardRouting);
final RoutingNode routingNode = sourceNode.getRoutingNode(routingNodes);
Decision decision = allocation.deciders().canRemain(shardRouting, routingNode, allocation);
if (decision.type() == Decision.Type.NO) {
logger.debug("[{}][{}] allocated on [{}], but can no longer be allocated on it, moving...", shardRouting.index(), shardRouting.id(), routingNode.node()); logger.debug("[{}][{}] allocated on [{}], but can no longer be allocated on it, moving...", shardRouting.index(), shardRouting.id(), routingNode.node());
sorter.reset(shardRouting.getIndexName()); sorter.reset(shardRouting.getIndexName());
/* /*
@ -551,36 +518,24 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
* This is not guaranteed to be balanced after this operation we still try best effort to * This is not guaranteed to be balanced after this operation we still try best effort to
* allocate on the minimal eligible node. * allocate on the minimal eligible node.
*/ */
boolean moved = false; for (ModelNode currentNode : sorter.modelNodes) {
for (ModelNode currentNode : modelNodes) { if (currentNode != sourceNode) {
if (currentNode == sourceNode) { RoutingNode target = currentNode.getRoutingNode();
continue;
}
RoutingNode target = currentNode.getRoutingNode(routingNodes);
Decision allocationDecision = allocation.deciders().canAllocate(shardRouting, target, allocation); Decision allocationDecision = allocation.deciders().canAllocate(shardRouting, target, allocation);
Decision rebalanceDecision = allocation.deciders().canRebalance(shardRouting, allocation); Decision rebalanceDecision = allocation.deciders().canRebalance(shardRouting, allocation);
if (allocationDecision.type() == Type.YES && rebalanceDecision.type() == Type.YES) { // TODO maybe we can respect throttling here too? if (allocationDecision.type() == Type.YES && rebalanceDecision.type() == Type.YES) { // TODO maybe we can respect throttling here too?
Decision sourceDecision = sourceNode.removeShard(shardRouting); sourceNode.removeShard(shardRouting);
ShardRouting targetRelocatingShard = routingNodes.relocate(shardRouting, target.nodeId(), allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE)); ShardRouting targetRelocatingShard = routingNodes.relocate(shardRouting, target.nodeId(), allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE));
// re-add (now relocating shard) to source node currentNode.addShard(targetRelocatingShard);
sourceNode.addShard(shardRouting, sourceDecision);
Decision targetDecision = new Decision.Multi().add(allocationDecision).add(rebalanceDecision);
currentNode.addShard(targetRelocatingShard, targetDecision);
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("Moved shard [{}] to node [{}]", shardRouting, routingNode.node()); logger.trace("Moved shard [{}] to node [{}]", shardRouting, routingNode.node());
} }
moved = true; return true;
changed = true; }
break;
} }
} }
if (moved == false) {
logger.debug("[{}][{}] can't move", shardRouting.index(), shardRouting.id()); logger.debug("[{}][{}] can't move", shardRouting.index(), shardRouting.id());
} return false;
}
}
}
return changed;
} }
/** /**
@ -592,27 +547,31 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
* on the target node which we respect during the allocation / balancing * on the target node which we respect during the allocation / balancing
* process. In short, this method recreates the status-quo in the cluster. * process. In short, this method recreates the status-quo in the cluster.
*/ */
private void buildModelFromAssigned(Iterable<ShardRouting> shards) { private void buildModelFromAssigned() {
for (ShardRouting shard : shards) { for (RoutingNode rn : routingNodes) {
assert shard.assignedToNode(); ModelNode node = new ModelNode(rn);
nodes.put(rn.nodeId(), node);
for (ShardRouting shard : rn) {
assert rn.nodeId().equals(shard.currentNodeId());
/* we skip relocating shards here since we expect an initializing shard with the same id coming in */ /* we skip relocating shards here since we expect an initializing shard with the same id coming in */
if (shard.state() == RELOCATING) { if (shard.state() != RELOCATING) {
continue; node.addShard(shard);
}
ModelNode node = nodes.get(shard.currentNodeId());
assert node != null;
node.addShard(shard, Decision.single(Type.YES, "Already allocated on node", node.getNodeId()));
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("Assigned shard [{}] to node [{}]", shard, node.getNodeId()); logger.trace("Assigned shard [{}] to node [{}]", shard, node.getNodeId());
} }
} }
} }
}
}
/** /**
* Allocates all given shards on the minimal eligible node for the shards index * Allocates all given shards on the minimal eligible node for the shards index
* with respect to the weight function. All given shards must be unassigned. * with respect to the weight function. All given shards must be unassigned.
* @return <code>true</code> if the current configuration has been
* changed, otherwise <code>false</code>
*/ */
private boolean allocateUnassigned(RoutingNodes.UnassignedShards unassigned) { private boolean allocateUnassigned() {
RoutingNodes.UnassignedShards unassigned = routingNodes.unassigned();
assert !nodes.isEmpty(); assert !nodes.isEmpty();
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("Start allocating unassigned shards"); logger.trace("Start allocating unassigned shards");
@ -656,7 +615,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
int secondaryLength = 0; int secondaryLength = 0;
int primaryLength = primary.length; int primaryLength = primary.length;
ArrayUtil.timSort(primary, comparator); ArrayUtil.timSort(primary, comparator);
final Set<ModelNode> throttledNodes = Collections.newSetFromMap(new IdentityHashMap<ModelNode, Boolean>()); final Set<ModelNode> throttledNodes = Collections.newSetFromMap(new IdentityHashMap<>());
do { do {
for (int i = 0; i < primaryLength; i++) { for (int i = 0; i < primaryLength; i++) {
ShardRouting shard = primary[i]; ShardRouting shard = primary[i];
@ -694,7 +653,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
* don't check deciders * don't check deciders
*/ */
if (currentWeight <= minWeight) { if (currentWeight <= minWeight) {
Decision currentDecision = deciders.canAllocate(shard, node.getRoutingNode(routingNodes), allocation); Decision currentDecision = deciders.canAllocate(shard, node.getRoutingNode(), allocation);
NOUPDATE: NOUPDATE:
if (currentDecision.type() == Type.YES || currentDecision.type() == Type.THROTTLE) { if (currentDecision.type() == Type.YES || currentDecision.type() == Type.THROTTLE) {
if (currentWeight == minWeight) { if (currentWeight == minWeight) {
@ -735,7 +694,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
} }
assert decision != null && minNode != null || decision == null && minNode == null; assert decision != null && minNode != null || decision == null && minNode == null;
if (minNode != null) { if (minNode != null) {
minNode.addShard(shard, decision); minNode.addShard(shard);
if (decision.type() == Type.YES) { if (decision.type() == Type.YES) {
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("Assigned shard [{}] to [{}]", shard, minNode.getNodeId()); logger.trace("Assigned shard [{}] to [{}]", shard, minNode.getNodeId());
@ -744,7 +703,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
changed = true; changed = true;
continue; // don't add to ignoreUnassigned continue; // don't add to ignoreUnassigned
} else { } else {
final RoutingNode node = minNode.getRoutingNode(routingNodes); final RoutingNode node = minNode.getRoutingNode();
if (deciders.canAllocate(node, allocation).type() != Type.YES) { if (deciders.canAllocate(node, allocation).type() != Type.YES) {
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("Can not allocate on node [{}] remove from round decision [{}]", node, decision.type()); logger.trace("Can not allocate on node [{}] remove from round decision [{}]", node, decision.type());
@ -790,10 +749,10 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
} }
ShardRouting candidate = null; ShardRouting candidate = null;
final AllocationDeciders deciders = allocation.deciders(); final AllocationDeciders deciders = allocation.deciders();
for (ShardRouting shard : index.getAllShards()) { for (ShardRouting shard : index) {
if (shard.started()) { if (shard.started()) {
// skip initializing, unassigned and relocating shards we can't relocate them anyway // skip initializing, unassigned and relocating shards we can't relocate them anyway
Decision allocationDecision = deciders.canAllocate(shard, minNode.getRoutingNode(routingNodes), allocation); Decision allocationDecision = deciders.canAllocate(shard, minNode.getRoutingNode(), allocation);
Decision rebalanceDecision = deciders.canRebalance(shard, allocation); Decision rebalanceDecision = deciders.canRebalance(shard, allocation);
if (((allocationDecision.type() == Type.YES) || (allocationDecision.type() == Type.THROTTLE)) if (((allocationDecision.type() == Type.YES) || (allocationDecision.type() == Type.THROTTLE))
&& ((rebalanceDecision.type() == Type.YES) || (rebalanceDecision.type() == Type.THROTTLE))) { && ((rebalanceDecision.type() == Type.YES) || (rebalanceDecision.type() == Type.THROTTLE))) {
@ -814,24 +773,17 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
} }
if (candidate != null) { if (candidate != null) {
/* allocate on the model even if not throttled */ /* allocate on the model even if not throttled */
maxNode.removeShard(candidate); maxNode.removeShard(candidate);
minNode.addShard(candidate, decision); minNode.addShard(candidate);
if (decision.type() == Type.YES) { /* only allocate on the cluster if we are not throttled */ if (decision.type() == Type.YES) { /* only allocate on the cluster if we are not throttled */
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("Relocate shard [{}] from node [{}] to node [{}]", candidate, maxNode.getNodeId(), logger.trace("Relocate shard [{}] from node [{}] to node [{}]", candidate, maxNode.getNodeId(),
minNode.getNodeId()); minNode.getNodeId());
} }
/* now allocate on the cluster - if we are started we need to relocate the shard */ /* now allocate on the cluster */
if (candidate.started()) {
routingNodes.relocate(candidate, minNode.getNodeId(), allocation.clusterInfo().getShardSize(candidate, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE)); routingNodes.relocate(candidate, minNode.getNodeId(), allocation.clusterInfo().getShardSize(candidate, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE));
} else {
routingNodes.initialize(candidate, minNode.getNodeId(), null, allocation.clusterInfo().getShardSize(candidate, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE));
}
return true; return true;
} }
} }
} }
@ -845,14 +797,12 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
} }
static class ModelNode implements Iterable<ModelIndex> { static class ModelNode implements Iterable<ModelIndex> {
private final String id;
private final Map<String, ModelIndex> indices = new HashMap<>(); private final Map<String, ModelIndex> indices = new HashMap<>();
private int numShards = 0; private int numShards = 0;
// lazily calculated private final RoutingNode routingNode;
private RoutingNode routingNode;
public ModelNode(String id) { public ModelNode(RoutingNode routingNode) {
this.id = id; this.routingNode = routingNode;
} }
public ModelIndex getIndex(String indexId) { public ModelIndex getIndex(String indexId) {
@ -860,13 +810,10 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
} }
public String getNodeId() { public String getNodeId() {
return id; return routingNode.nodeId();
} }
public RoutingNode getRoutingNode(RoutingNodes routingNodes) { public RoutingNode getRoutingNode() {
if (routingNode == null) {
routingNode = routingNodes.node(id);
}
return routingNode; return routingNode;
} }
@ -887,33 +834,31 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
return -1; return -1;
} }
public void addShard(ShardRouting shard, Decision decision) { public void addShard(ShardRouting shard) {
ModelIndex index = indices.get(shard.getIndexName()); ModelIndex index = indices.get(shard.getIndexName());
if (index == null) { if (index == null) {
index = new ModelIndex(shard.getIndexName()); index = new ModelIndex(shard.getIndexName());
indices.put(index.getIndexId(), index); indices.put(index.getIndexId(), index);
} }
index.addShard(shard, decision); index.addShard(shard);
numShards++; numShards++;
} }
public Decision removeShard(ShardRouting shard) { public void removeShard(ShardRouting shard) {
ModelIndex index = indices.get(shard.getIndexName()); ModelIndex index = indices.get(shard.getIndexName());
Decision removed = null;
if (index != null) { if (index != null) {
removed = index.removeShard(shard); index.removeShard(shard);
if (removed != null && index.numShards() == 0) { if (index.numShards() == 0) {
indices.remove(shard.getIndexName()); indices.remove(shard.getIndexName());
} }
} }
numShards--; numShards--;
return removed;
} }
@Override @Override
public String toString() { public String toString() {
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
sb.append("Node(").append(id).append(")"); sb.append("Node(").append(routingNode.nodeId()).append(")");
return sb.toString(); return sb.toString();
} }
@ -929,9 +874,9 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
} }
static final class ModelIndex { static final class ModelIndex implements Iterable<ShardRouting> {
private final String id; private final String id;
private final Map<ShardRouting, Decision> shards = new HashMap<>(); private final Set<ShardRouting> shards = new HashSet<>(4); // expect few shards of same index to be allocated on same node
private int highestPrimary = -1; private int highestPrimary = -1;
public ModelIndex(String id) { public ModelIndex(String id) {
@ -941,7 +886,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
public int highestPrimary() { public int highestPrimary() {
if (highestPrimary == -1) { if (highestPrimary == -1) {
int maxId = -1; int maxId = -1;
for (ShardRouting shard : shards.keySet()) { for (ShardRouting shard : shards) {
if (shard.primary()) { if (shard.primary()) {
maxId = Math.max(maxId, shard.id()); maxId = Math.max(maxId, shard.id());
} }
@ -959,24 +904,25 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
return shards.size(); return shards.size();
} }
public Collection<ShardRouting> getAllShards() { @Override
return shards.keySet(); public Iterator<ShardRouting> iterator() {
return shards.iterator();
} }
public Decision removeShard(ShardRouting shard) { public void removeShard(ShardRouting shard) {
highestPrimary = -1; highestPrimary = -1;
return shards.remove(shard); assert shards.contains(shard) : "Shard not allocated on current node: " + shard;
shards.remove(shard);
} }
public void addShard(ShardRouting shard, Decision decision) { public void addShard(ShardRouting shard) {
highestPrimary = -1; highestPrimary = -1;
assert decision != null; assert !shards.contains(shard) : "Shard already allocated on current node: " + shard;
assert !shards.containsKey(shard) : "Shard already allocated on current node: " + shards.get(shard) + " " + shard; shards.add(shard);
shards.put(shard, decision);
} }
public boolean containsShard(ShardRouting shard) { public boolean containsShard(ShardRouting shard) {
return shards.containsKey(shard); return shards.contains(shard);
} }
} }

View File

@ -25,15 +25,16 @@ import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
* <p> * <p>
* A {@link ShardsAllocator} is the main entry point for shard allocation on nodes in the cluster. * A {@link ShardsAllocator} is the main entry point for shard allocation on nodes in the cluster.
* The allocator makes basic decision where a shard instance will be allocated, if already allocated instances * The allocator makes basic decision where a shard instance will be allocated, if already allocated instances
* need relocate to other nodes due to node failures or due to rebalancing decisions. * need to relocate to other nodes due to node failures or due to rebalancing decisions.
* </p> * </p>
*/ */
public interface ShardsAllocator { public interface ShardsAllocator {
/** /**
* Assign all unassigned shards to nodes * Allocates shards to nodes in the cluster. An implementation of this method should:
* Move started shards that can not be allocated to a node anymore * - assign unassigned shards
* Rebalancing number of shards on all nodes * - relocate shards that cannot stay on a node anymore
* - relocate shards to find a good shard balance in the cluster
* *
* @param allocation current node allocation * @param allocation current node allocation
* @return <code>true</code> if the allocation has changed, otherwise <code>false</code> * @return <code>true</code> if the allocation has changed, otherwise <code>false</code>