diff --git a/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java b/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java index 60f858b8f5d..0a78a6a33fd 100644 --- a/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java +++ b/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java @@ -19,6 +19,7 @@ package org.elasticsearch.cluster.routing; +import com.google.common.base.Predicate; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; import gnu.trove.map.hash.TObjectIntHashMap; @@ -247,6 +248,20 @@ public class RoutingNodes implements Iterable { return count; } + public List shards(Predicate predicate) { + List shards = newArrayList(); + for (RoutingNode routingNode : this) { + List nodeShards = routingNode.shards(); + for (int i = 0; i < nodeShards.size(); i++) { + MutableShardRouting shardRouting = nodeShards.get(i); + if (predicate.apply(shardRouting)) { + shards.add(shardRouting); + } + } + } + return shards; + } + public List shardsWithState(ShardRoutingState... state) { List shards = newArrayList(); for (RoutingNode routingNode : this) { diff --git a/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index 0e99e30b1a2..15253b6d8b3 100644 --- a/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -20,11 +20,13 @@ package org.elasticsearch.cluster.routing.allocation.allocator; import com.google.common.base.Predicate; -import com.google.common.collect.Iterables; import org.apache.lucene.util.SorterTemplate; import org.elasticsearch.ElasticSearchIllegalArgumentException; import org.elasticsearch.cluster.metadata.MetaData; -import org.elasticsearch.cluster.routing.*; +import org.elasticsearch.cluster.routing.MutableShardRouting; +import org.elasticsearch.cluster.routing.RoutingNode; +import org.elasticsearch.cluster.routing.RoutingNodes; +import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation; @@ -66,9 +68,9 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards public static final String SETTING_INDEX_BALANCE_FACTOR = "cluster.routing.allocation.balance.index"; public static final String SETTING_SHARD_BALANCE_FACTOR = "cluster.routing.allocation.balance.shard"; public static final String SETTING_PRIMARY_BALANCE_FACTOR = "cluster.routing.allocation.balance.primary"; - + private static final float DEFAULT_INDEX_BALANCE_FACTOR = 0.5f; - private static final float DEFAULT_SHARD_BALANCE_FACTOR = 0.45f; + private static final float DEFAULT_SHARD_BALANCE_FACTOR = 0.45f; private static final float DEFAULT_PRIMARY_BALANCE_FACTOR = 0.05f; class ApplySettings implements NodeSettingsService.Listener { @@ -89,7 +91,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards private volatile WeightFunction weightFunction = new WeightFunction(DEFAULT_INDEX_BALANCE_FACTOR, DEFAULT_SHARD_BALANCE_FACTOR, DEFAULT_PRIMARY_BALANCE_FACTOR); private volatile float threshold = 1.0f; - + public BalancedShardsAllocator(Settings settings) { this(settings, new NodeSettingsService(settings)); @@ -125,28 +127,28 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards final Balancer balancer = new Balancer(logger, allocation, weightFunction, threshold); return balancer.move(shardRouting, node); } - + /** * Returns the currently configured delta threshold */ public float getThreshold() { return threshold; } - + /** * Returns the index related weight factor. */ public float getIndexBalance() { return weightFunction.indexBalance; } - + /** * Returns the primary related weight factor. */ public float getPrimaryBalance() { return weightFunction.primaryBalance; } - + /** * Returns the shard related weight factor. */ @@ -194,23 +196,23 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards if (sum <= 0.0f) { throw new ElasticSearchIllegalArgumentException("Balance factors must sum to a value > 0 but was: " + sum); } - final float[] defaultTheta = new float[] { shardBalance / sum, indexBalance / sum, primaryBalance / sum }; - for(Operation operation : Operation.values()) { - switch(operation) { - case THRESHOLD_CHECK: - sum = indexBalance + shardBalance; - if (sum <= 0.0f) { - thetaMap.put(operation, defaultTheta); - } - thetaMap.put(operation, new float[] { shardBalance / sum, indexBalance / sum, 0}); - break; - case BALANCE: - case ALLOCATE: - case MOVE: - thetaMap.put(operation, defaultTheta); - break; - default: - assert false; + final float[] defaultTheta = new float[]{shardBalance / sum, indexBalance / sum, primaryBalance / sum}; + for (Operation operation : Operation.values()) { + switch (operation) { + case THRESHOLD_CHECK: + sum = indexBalance + shardBalance; + if (sum <= 0.0f) { + thetaMap.put(operation, defaultTheta); + } + thetaMap.put(operation, new float[]{shardBalance / sum, indexBalance / sum, 0}); + break; + case BALANCE: + case ALLOCATE: + case MOVE: + thetaMap.put(operation, defaultTheta); + break; + default: + assert false; } } this.indexBalance = indexBalance; @@ -224,32 +226,32 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards final float weightPrimary = (node.numPrimaries() - balancer.avgPrimariesPerNode()); final float[] theta = thetaMap.get(operation); assert theta != null; - return theta[0] * weightShard + theta[1] * weightIndex + theta[2] * weightPrimary; + return theta[0] * weightShard + theta[1] * weightIndex + theta[2] * weightPrimary; } } - + /** * An enum that donates the actual operation the {@link WeightFunction} is * applied to. */ public static enum Operation { /** - * Provided during balance operations. + * Provided during balance operations. */ BALANCE, /** - * Provided during initial allocation operation for unassigned shards. + * Provided during initial allocation operation for unassigned shards. */ ALLOCATE, /** * Provided during move operation. */ - MOVE, + MOVE, /** * Provided when the weight delta is checked against the configured threshold. - * This can be used to ignore tie-breaking weight factors that should not - * solely trigger a relocation unless the delta is above the threshold. + * This can be used to ignore tie-breaking weight factors that should not + * solely trigger a relocation unless the delta is above the threshold. */ THRESHOLD_CHECK } @@ -267,7 +269,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards private final float threshold; private final MetaData metaData; - + private final Predicate assignedFilter = new Predicate() { @Override public boolean apply(MutableShardRouting input) { @@ -325,27 +327,20 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards /** * Returns a new {@link NodeSorter} that sorts the nodes based on their * current weight with respect to the index passed to the sorter. The - * returned sorter is not sorted. Use {@link NodeSorter#reset(String)} + * returned sorter is not sorted. Use {@link NodeSorter#reset(org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.Operation, String)} * to sort based on an index. */ private NodeSorter newNodeSorter() { - final NodeSorter sorter = new NodeSorter(nodesArray(), weight, this); - return sorter; + return new NodeSorter(nodesArray(), weight, this); } private boolean initialize(RoutingNodes routing) { - Collection shards = new ArrayList(); if (logger.isTraceEnabled()) { logger.trace("Start distributing Shards"); } - for (IndexRoutingTable index : allocation.routingTable().indicesRouting().values()) { - indices.add(index.index()); - for (IndexShardRoutingTable shard : index.getShards().values()) { - shards.addAll(routing.shardsRoutingFor(index.index(), shard.shardId().id())); - } - } - buildModelFromAssigned(Iterables.filter(shards, assignedFilter)); + indices.addAll(allocation.routingTable().indicesRouting().keySet()); + buildModelFromAssigned(routing.shards(assignedFilter)); return allocateUnassigned(allocation.routingNodes().unassigned(), allocation.routingNodes().ignoredUnassigned()); } @@ -376,7 +371,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards NodeSorter sorter = newNodeSorter(); if (nodes.size() > 1) { /* skip if we only have one node */ for (String index : buildWeightOrderedIndidces(Operation.BALANCE, sorter)) { - sorter.reset(Operation.BALANCE,index); + sorter.reset(Operation.BALANCE, index); final float[] weights = sorter.weights; final ModelNode[] modelNodes = sorter.modelNodes; int lowIdx = 0; @@ -389,7 +384,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards delta = delta <= threshold ? delta : sorter.weight(Operation.THRESHOLD_CHECK, maxNode) - sorter.weight(Operation.THRESHOLD_CHECK, minNode); if (delta <= threshold) { if (logger.isTraceEnabled()) { - + logger.trace("Stop balancing index [{}] min_node [{}] weight: [{}] max_node [{}] weight: [{}] delta: [{}]", index, maxNode.getNodeId(), weights[highIdx], minNode.getNodeId(), weights[lowIdx], delta); } @@ -622,15 +617,15 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards ModelNode minNode = null; Decision decision = null; for (ModelNode node : nodes.values()) { - /* - * The shard we add is removed below to simulate the + /* + * The shard we add is removed below to simulate the * addition for weight calculation we use Decision.ALWAYS to * not violate the not null condition. */ if (!node.containsShard(shard)) { node.addShard(shard, Decision.ALWAYS); float currentWeight = weight.weight(Operation.ALLOCATE, this, node, shard.index()); - /* + /* * Remove the shard from the node again this is only a * simulation */ diff --git a/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java b/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java index 61699b35e14..5c011d6e232 100644 --- a/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java +++ b/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java @@ -75,9 +75,12 @@ public class ThrottlingAllocationDecider extends AllocationDecider { public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { if (shardRouting.primary()) { boolean primaryUnassigned = false; - for (MutableShardRouting shard : allocation.routingNodes().unassigned()) { + List unassigned = allocation.routingNodes().unassigned(); + for (int i1 = 0; i1 < unassigned.size(); i1++) { + MutableShardRouting shard = unassigned.get(i1); if (shard.shardId().equals(shardRouting.shardId())) { primaryUnassigned = true; + break; } } if (primaryUnassigned) {