For rebalancing an index, only consider nodes that currently have a shard of that index or where the index can be allocated

This allows to prune a large number of nodes in case of hot/warm setup
This commit is contained in:
Yannick Welsch 2015-12-28 11:07:41 +01:00
parent 207dfc457d
commit 1536d7fe37
4 changed files with 107 additions and 26 deletions

View File

@ -22,6 +22,7 @@ package org.elasticsearch.cluster.routing.allocation.allocator;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.IntroSorter;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
@ -336,12 +337,33 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
if (onlyAssign == false && changed == false && allocation.deciders().canRebalance(allocation).type() == Type.YES) {
NodeSorter sorter = newNodeSorter();
if (nodes.size() > 1) { /* skip if we only have one node */
for (String index : buildWeightOrderedIndices(sorter)) {
sorter.reset(index);
final float[] weights = sorter.weights;
AllocationDeciders deciders = allocation.deciders();
final ModelNode[] modelNodes = sorter.modelNodes;
final float[] weights = sorter.weights;
for (String index : buildWeightOrderedIndices(sorter)) {
IndexMetaData indexMetaData = metaData.index(index);
// find nodes that have a shard of this index or where shards of this index are allowed to stay
// move these nodes to the front of modelNodes so that we can only balance based on these nodes
int relevantNodes = 0;
for (int i = 0; i < modelNodes.length; i++) {
ModelNode modelNode = modelNodes[i];
if (modelNode.getIndex(index) != null
|| deciders.canAllocate(indexMetaData, routingNodes.node(modelNode.getNodeId()), allocation).type() != Type.NO) {
// swap nodes at position i and relevantNodes
modelNodes[i] = modelNodes[relevantNodes];
modelNodes[relevantNodes] = modelNode;
relevantNodes++;
}
}
if (relevantNodes < 2) {
continue;
}
sorter.reset(index, 0, relevantNodes);
int lowIdx = 0;
int highIdx = weights.length - 1;
int highIdx = relevantNodes - 1;
while (true) {
final ModelNode minNode = modelNodes[lowIdx];
final ModelNode maxNode = modelNodes[highIdx];
@ -384,9 +406,9 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
*/
weights[lowIdx] = sorter.weight(modelNodes[lowIdx]);
weights[highIdx] = sorter.weight(modelNodes[highIdx]);
sorter.sort(0, weights.length);
sorter.sort(0, relevantNodes);
lowIdx = 0;
highIdx = weights.length - 1;
highIdx = relevantNodes - 1;
changed = true;
continue;
}
@ -961,12 +983,16 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
* Resets the sorter, recalculates the weights per node and sorts the
* nodes by weight, with minimal weight first.
*/
public void reset(String index) {
public void reset(String index, int from, int to) {
this.index = index;
for (int i = 0; i < weights.length; i++) {
for (int i = from; i < to; i++) {
weights[i] = weight(modelNodes[i]);
}
sort(0, modelNodes.length);
sort(from, to);
}
public void reset(String index) {
reset(index, 0, modelNodes.length);
}
public float weight(ModelNode node) {

View File

@ -19,6 +19,7 @@
package org.elasticsearch.cluster.routing.allocation.decider;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
@ -73,6 +74,14 @@ public abstract class AllocationDecider extends AbstractComponent {
return Decision.ALWAYS;
}
/**
* Returns a {@link Decision} whether the given shard routing can be allocated at all at this state of the
* {@link RoutingAllocation}. The default is {@link Decision#ALWAYS}.
*/
public Decision canAllocate(IndexMetaData indexMetaData, RoutingNode node, RoutingAllocation allocation) {
return Decision.ALWAYS;
}
/**
* Returns a {@link Decision} whether the given node can allow any allocation at all at this state of the
* {@link RoutingAllocation}. The default is {@link Decision#ALWAYS}.

View File

@ -19,6 +19,7 @@
package org.elasticsearch.cluster.routing.allocation.decider;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
@ -120,6 +121,25 @@ public class AllocationDeciders extends AllocationDecider {
return ret;
}
@Override
public Decision canAllocate(IndexMetaData indexMetaData, RoutingNode node, RoutingAllocation allocation) {
Decision.Multi ret = new Decision.Multi();
for (AllocationDecider allocationDecider : allocations) {
Decision decision = allocationDecider.canAllocate(indexMetaData, node, allocation);
// short track if a NO is returned.
if (decision == Decision.NO) {
if (!allocation.debugDecision()) {
return decision;
} else {
ret.add(decision);
}
} else if (decision != Decision.ALWAYS) {
ret.add(decision);
}
}
return ret;
}
@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingAllocation allocation) {
Decision.Multi ret = new Decision.Multi();

View File

@ -88,29 +88,37 @@ public class FilterAllocationDecider extends AllocationDecider {
return shouldFilter(shardRouting, node, allocation);
}
@Override
public Decision canAllocate(IndexMetaData indexMetaData, RoutingNode node, RoutingAllocation allocation) {
return shouldFilter(indexMetaData, node, allocation);
}
@Override
public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
return shouldFilter(shardRouting, node, allocation);
}
private Decision shouldFilter(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
if (clusterRequireFilters != null) {
if (!clusterRequireFilters.match(node.node())) {
return allocation.decision(Decision.NO, NAME, "node does not match global required filters [%s]", clusterRequireFilters);
}
}
if (clusterIncludeFilters != null) {
if (!clusterIncludeFilters.match(node.node())) {
return allocation.decision(Decision.NO, NAME, "node does not match global include filters [%s]", clusterIncludeFilters);
}
}
if (clusterExcludeFilters != null) {
if (clusterExcludeFilters.match(node.node())) {
return allocation.decision(Decision.NO, NAME, "node matches global exclude filters [%s]", clusterExcludeFilters);
}
Decision decision = shouldClusterFilter(node, allocation);
if (decision != null) return decision;
decision = shouldIndexFilter(allocation.routingNodes().metaData().index(shardRouting.index()), node, allocation);
if (decision != null) return decision;
return allocation.decision(Decision.YES, NAME, "node passes include/exclude/require filters");
}
IndexMetaData indexMd = allocation.routingNodes().metaData().index(shardRouting.index());
private Decision shouldFilter(IndexMetaData indexMd, RoutingNode node, RoutingAllocation allocation) {
Decision decision = shouldClusterFilter(node, allocation);
if (decision != null) return decision;
decision = shouldIndexFilter(indexMd, node, allocation);
if (decision != null) return decision;
return allocation.decision(Decision.YES, NAME, "node passes include/exclude/require filters");
}
private Decision shouldIndexFilter(IndexMetaData indexMd, RoutingNode node, RoutingAllocation allocation) {
if (indexMd.requireFilters() != null) {
if (!indexMd.requireFilters().match(node.node())) {
return allocation.decision(Decision.NO, NAME, "node does not match index required filters [%s]", indexMd.requireFilters());
@ -126,8 +134,26 @@ public class FilterAllocationDecider extends AllocationDecider {
return allocation.decision(Decision.NO, NAME, "node matches index exclude filters [%s]", indexMd.excludeFilters());
}
}
return null;
}
return allocation.decision(Decision.YES, NAME, "node passes include/exclude/require filters");
private Decision shouldClusterFilter(RoutingNode node, RoutingAllocation allocation) {
if (clusterRequireFilters != null) {
if (!clusterRequireFilters.match(node.node())) {
return allocation.decision(Decision.NO, NAME, "node does not match global required filters [%s]", clusterRequireFilters);
}
}
if (clusterIncludeFilters != null) {
if (!clusterIncludeFilters.match(node.node())) {
return allocation.decision(Decision.NO, NAME, "node does not match global include filters [%s]", clusterIncludeFilters);
}
}
if (clusterExcludeFilters != null) {
if (clusterExcludeFilters.match(node.node())) {
return allocation.decision(Decision.NO, NAME, "node matches global exclude filters [%s]", clusterExcludeFilters);
}
}
return null;
}
private void setClusterRequireFilters(Settings settings) {