diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index 8cdc9b44b3f..9f34bd139d5 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -22,6 +22,7 @@ package org.elasticsearch.cluster.routing.allocation.allocator; import com.carrotsearch.hppc.cursors.ObjectCursor; import org.apache.lucene.util.ArrayUtil; import org.apache.lucene.util.IntroSorter; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.RoutingNodes; @@ -336,12 +337,33 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards if (onlyAssign == false && changed == false && allocation.deciders().canRebalance(allocation).type() == Type.YES) { NodeSorter sorter = newNodeSorter(); if (nodes.size() > 1) { /* skip if we only have one node */ + AllocationDeciders deciders = allocation.deciders(); + final ModelNode[] modelNodes = sorter.modelNodes; + final float[] weights = sorter.weights; for (String index : buildWeightOrderedIndices(sorter)) { - sorter.reset(index); - final float[] weights = sorter.weights; - final ModelNode[] modelNodes = sorter.modelNodes; + IndexMetaData indexMetaData = metaData.index(index); + + // find nodes that have a shard of this index or where shards of this index are allowed to stay + // move these nodes to the front of modelNodes so that we can only balance based on these nodes + int relevantNodes = 0; + for (int i = 0; i < modelNodes.length; i++) { + ModelNode modelNode = modelNodes[i]; + if (modelNode.getIndex(index) != null + || deciders.canAllocate(indexMetaData, routingNodes.node(modelNode.getNodeId()), allocation).type() != Type.NO) { + // swap nodes at position i and relevantNodes + modelNodes[i] = modelNodes[relevantNodes]; + modelNodes[relevantNodes] = modelNode; + relevantNodes++; + } + } + + if (relevantNodes < 2) { + continue; + } + + sorter.reset(index, 0, relevantNodes); int lowIdx = 0; - int highIdx = weights.length - 1; + int highIdx = relevantNodes - 1; while (true) { final ModelNode minNode = modelNodes[lowIdx]; final ModelNode maxNode = modelNodes[highIdx]; @@ -384,9 +406,9 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards */ weights[lowIdx] = sorter.weight(modelNodes[lowIdx]); weights[highIdx] = sorter.weight(modelNodes[highIdx]); - sorter.sort(0, weights.length); + sorter.sort(0, relevantNodes); lowIdx = 0; - highIdx = weights.length - 1; + highIdx = relevantNodes - 1; changed = true; continue; } @@ -961,12 +983,16 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards * Resets the sorter, recalculates the weights per node and sorts the * nodes by weight, with minimal weight first. */ - public void reset(String index) { + public void reset(String index, int from, int to) { this.index = index; - for (int i = 0; i < weights.length; i++) { + for (int i = from; i < to; i++) { weights[i] = weight(modelNodes[i]); } - sort(0, modelNodes.length); + sort(from, to); + } + + public void reset(String index) { + reset(index, 0, modelNodes.length); } public float weight(ModelNode node) { diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDecider.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDecider.java index a6204485d7d..3bd4069ac73 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDecider.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDecider.java @@ -19,6 +19,7 @@ package org.elasticsearch.cluster.routing.allocation.decider; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; @@ -73,6 +74,14 @@ public abstract class AllocationDecider extends AbstractComponent { return Decision.ALWAYS; } + /** + * Returns a {@link Decision} whether the given shard routing can be allocated at all at this state of the + * {@link RoutingAllocation}. The default is {@link Decision#ALWAYS}. + */ + public Decision canAllocate(IndexMetaData indexMetaData, RoutingNode node, RoutingAllocation allocation) { + return Decision.ALWAYS; + } + /** * Returns a {@link Decision} whether the given node can allow any allocation at all at this state of the * {@link RoutingAllocation}. The default is {@link Decision#ALWAYS}. diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDeciders.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDeciders.java index f57c48e8a75..059748c3f62 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDeciders.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDeciders.java @@ -19,6 +19,7 @@ package org.elasticsearch.cluster.routing.allocation.decider; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; @@ -120,6 +121,25 @@ public class AllocationDeciders extends AllocationDecider { return ret; } + @Override + public Decision canAllocate(IndexMetaData indexMetaData, RoutingNode node, RoutingAllocation allocation) { + Decision.Multi ret = new Decision.Multi(); + for (AllocationDecider allocationDecider : allocations) { + Decision decision = allocationDecider.canAllocate(indexMetaData, node, allocation); + // short track if a NO is returned. + if (decision == Decision.NO) { + if (!allocation.debugDecision()) { + return decision; + } else { + ret.add(decision); + } + } else if (decision != Decision.ALWAYS) { + ret.add(decision); + } + } + return ret; + } + @Override public Decision canAllocate(ShardRouting shardRouting, RoutingAllocation allocation) { Decision.Multi ret = new Decision.Multi(); diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/FilterAllocationDecider.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/FilterAllocationDecider.java index 4c451e7fffa..eb9fe10e965 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/FilterAllocationDecider.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/FilterAllocationDecider.java @@ -88,29 +88,37 @@ public class FilterAllocationDecider extends AllocationDecider { return shouldFilter(shardRouting, node, allocation); } + @Override + public Decision canAllocate(IndexMetaData indexMetaData, RoutingNode node, RoutingAllocation allocation) { + return shouldFilter(indexMetaData, node, allocation); + } + @Override public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { return shouldFilter(shardRouting, node, allocation); } private Decision shouldFilter(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { - if (clusterRequireFilters != null) { - if (!clusterRequireFilters.match(node.node())) { - return allocation.decision(Decision.NO, NAME, "node does not match global required filters [%s]", clusterRequireFilters); - } - } - if (clusterIncludeFilters != null) { - if (!clusterIncludeFilters.match(node.node())) { - return allocation.decision(Decision.NO, NAME, "node does not match global include filters [%s]", clusterIncludeFilters); - } - } - if (clusterExcludeFilters != null) { - if (clusterExcludeFilters.match(node.node())) { - return allocation.decision(Decision.NO, NAME, "node matches global exclude filters [%s]", clusterExcludeFilters); - } - } + Decision decision = shouldClusterFilter(node, allocation); + if (decision != null) return decision; - IndexMetaData indexMd = allocation.routingNodes().metaData().index(shardRouting.index()); + decision = shouldIndexFilter(allocation.routingNodes().metaData().index(shardRouting.index()), node, allocation); + if (decision != null) return decision; + + return allocation.decision(Decision.YES, NAME, "node passes include/exclude/require filters"); + } + + private Decision shouldFilter(IndexMetaData indexMd, RoutingNode node, RoutingAllocation allocation) { + Decision decision = shouldClusterFilter(node, allocation); + if (decision != null) return decision; + + decision = shouldIndexFilter(indexMd, node, allocation); + if (decision != null) return decision; + + return allocation.decision(Decision.YES, NAME, "node passes include/exclude/require filters"); + } + + private Decision shouldIndexFilter(IndexMetaData indexMd, RoutingNode node, RoutingAllocation allocation) { if (indexMd.requireFilters() != null) { if (!indexMd.requireFilters().match(node.node())) { return allocation.decision(Decision.NO, NAME, "node does not match index required filters [%s]", indexMd.requireFilters()); @@ -126,8 +134,26 @@ public class FilterAllocationDecider extends AllocationDecider { return allocation.decision(Decision.NO, NAME, "node matches index exclude filters [%s]", indexMd.excludeFilters()); } } + return null; + } - return allocation.decision(Decision.YES, NAME, "node passes include/exclude/require filters"); + private Decision shouldClusterFilter(RoutingNode node, RoutingAllocation allocation) { + if (clusterRequireFilters != null) { + if (!clusterRequireFilters.match(node.node())) { + return allocation.decision(Decision.NO, NAME, "node does not match global required filters [%s]", clusterRequireFilters); + } + } + if (clusterIncludeFilters != null) { + if (!clusterIncludeFilters.match(node.node())) { + return allocation.decision(Decision.NO, NAME, "node does not match global include filters [%s]", clusterIncludeFilters); + } + } + if (clusterExcludeFilters != null) { + if (clusterExcludeFilters.match(node.node())) { + return allocation.decision(Decision.NO, NAME, "node matches global exclude filters [%s]", clusterExcludeFilters); + } + } + return null; } private void setClusterRequireFilters(Settings settings) {