optimize reroute

- optimize initialization of building the all the assigned shards state
- optimize iteration in throttling allocation decider
This commit is contained in:
Shay Banon 2013-07-06 14:12:52 -07:00
parent cc1173b58f
commit 759a13f1de
3 changed files with 64 additions and 51 deletions

View File

@ -19,6 +19,7 @@
package org.elasticsearch.cluster.routing; package org.elasticsearch.cluster.routing;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import gnu.trove.map.hash.TObjectIntHashMap; import gnu.trove.map.hash.TObjectIntHashMap;
@ -247,6 +248,20 @@ public class RoutingNodes implements Iterable<RoutingNode> {
return count; return count;
} }
public List<MutableShardRouting> shards(Predicate<MutableShardRouting> predicate) {
List<MutableShardRouting> shards = newArrayList();
for (RoutingNode routingNode : this) {
List<MutableShardRouting> nodeShards = routingNode.shards();
for (int i = 0; i < nodeShards.size(); i++) {
MutableShardRouting shardRouting = nodeShards.get(i);
if (predicate.apply(shardRouting)) {
shards.add(shardRouting);
}
}
}
return shards;
}
public List<MutableShardRouting> shardsWithState(ShardRoutingState... state) { public List<MutableShardRouting> shardsWithState(ShardRoutingState... state) {
List<MutableShardRouting> shards = newArrayList(); List<MutableShardRouting> shards = newArrayList();
for (RoutingNode routingNode : this) { for (RoutingNode routingNode : this) {

View File

@ -20,11 +20,13 @@
package org.elasticsearch.cluster.routing.allocation.allocator; package org.elasticsearch.cluster.routing.allocation.allocator;
import com.google.common.base.Predicate; import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import org.apache.lucene.util.SorterTemplate; import org.apache.lucene.util.SorterTemplate;
import org.elasticsearch.ElasticSearchIllegalArgumentException; import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.*; import org.elasticsearch.cluster.routing.MutableShardRouting;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation; import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation; import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation;
@ -66,9 +68,9 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
public static final String SETTING_INDEX_BALANCE_FACTOR = "cluster.routing.allocation.balance.index"; public static final String SETTING_INDEX_BALANCE_FACTOR = "cluster.routing.allocation.balance.index";
public static final String SETTING_SHARD_BALANCE_FACTOR = "cluster.routing.allocation.balance.shard"; public static final String SETTING_SHARD_BALANCE_FACTOR = "cluster.routing.allocation.balance.shard";
public static final String SETTING_PRIMARY_BALANCE_FACTOR = "cluster.routing.allocation.balance.primary"; public static final String SETTING_PRIMARY_BALANCE_FACTOR = "cluster.routing.allocation.balance.primary";
private static final float DEFAULT_INDEX_BALANCE_FACTOR = 0.5f; private static final float DEFAULT_INDEX_BALANCE_FACTOR = 0.5f;
private static final float DEFAULT_SHARD_BALANCE_FACTOR = 0.45f; private static final float DEFAULT_SHARD_BALANCE_FACTOR = 0.45f;
private static final float DEFAULT_PRIMARY_BALANCE_FACTOR = 0.05f; private static final float DEFAULT_PRIMARY_BALANCE_FACTOR = 0.05f;
class ApplySettings implements NodeSettingsService.Listener { class ApplySettings implements NodeSettingsService.Listener {
@ -89,7 +91,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
private volatile WeightFunction weightFunction = new WeightFunction(DEFAULT_INDEX_BALANCE_FACTOR, DEFAULT_SHARD_BALANCE_FACTOR, DEFAULT_PRIMARY_BALANCE_FACTOR); private volatile WeightFunction weightFunction = new WeightFunction(DEFAULT_INDEX_BALANCE_FACTOR, DEFAULT_SHARD_BALANCE_FACTOR, DEFAULT_PRIMARY_BALANCE_FACTOR);
private volatile float threshold = 1.0f; private volatile float threshold = 1.0f;
public BalancedShardsAllocator(Settings settings) { public BalancedShardsAllocator(Settings settings) {
this(settings, new NodeSettingsService(settings)); this(settings, new NodeSettingsService(settings));
@ -125,28 +127,28 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
final Balancer balancer = new Balancer(logger, allocation, weightFunction, threshold); final Balancer balancer = new Balancer(logger, allocation, weightFunction, threshold);
return balancer.move(shardRouting, node); return balancer.move(shardRouting, node);
} }
/** /**
* Returns the currently configured delta threshold * Returns the currently configured delta threshold
*/ */
public float getThreshold() { public float getThreshold() {
return threshold; return threshold;
} }
/** /**
* Returns the index related weight factor. * Returns the index related weight factor.
*/ */
public float getIndexBalance() { public float getIndexBalance() {
return weightFunction.indexBalance; return weightFunction.indexBalance;
} }
/** /**
* Returns the primary related weight factor. * Returns the primary related weight factor.
*/ */
public float getPrimaryBalance() { public float getPrimaryBalance() {
return weightFunction.primaryBalance; return weightFunction.primaryBalance;
} }
/** /**
* Returns the shard related weight factor. * Returns the shard related weight factor.
*/ */
@ -194,23 +196,23 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
if (sum <= 0.0f) { if (sum <= 0.0f) {
throw new ElasticSearchIllegalArgumentException("Balance factors must sum to a value > 0 but was: " + sum); throw new ElasticSearchIllegalArgumentException("Balance factors must sum to a value > 0 but was: " + sum);
} }
final float[] defaultTheta = new float[] { shardBalance / sum, indexBalance / sum, primaryBalance / sum }; final float[] defaultTheta = new float[]{shardBalance / sum, indexBalance / sum, primaryBalance / sum};
for(Operation operation : Operation.values()) { for (Operation operation : Operation.values()) {
switch(operation) { switch (operation) {
case THRESHOLD_CHECK: case THRESHOLD_CHECK:
sum = indexBalance + shardBalance; sum = indexBalance + shardBalance;
if (sum <= 0.0f) { if (sum <= 0.0f) {
thetaMap.put(operation, defaultTheta); thetaMap.put(operation, defaultTheta);
} }
thetaMap.put(operation, new float[] { shardBalance / sum, indexBalance / sum, 0}); thetaMap.put(operation, new float[]{shardBalance / sum, indexBalance / sum, 0});
break; break;
case BALANCE: case BALANCE:
case ALLOCATE: case ALLOCATE:
case MOVE: case MOVE:
thetaMap.put(operation, defaultTheta); thetaMap.put(operation, defaultTheta);
break; break;
default: default:
assert false; assert false;
} }
} }
this.indexBalance = indexBalance; this.indexBalance = indexBalance;
@ -224,32 +226,32 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
final float weightPrimary = (node.numPrimaries() - balancer.avgPrimariesPerNode()); final float weightPrimary = (node.numPrimaries() - balancer.avgPrimariesPerNode());
final float[] theta = thetaMap.get(operation); final float[] theta = thetaMap.get(operation);
assert theta != null; assert theta != null;
return theta[0] * weightShard + theta[1] * weightIndex + theta[2] * weightPrimary; return theta[0] * weightShard + theta[1] * weightIndex + theta[2] * weightPrimary;
} }
} }
/** /**
* An enum that donates the actual operation the {@link WeightFunction} is * An enum that donates the actual operation the {@link WeightFunction} is
* applied to. * applied to.
*/ */
public static enum Operation { public static enum Operation {
/** /**
* Provided during balance operations. * Provided during balance operations.
*/ */
BALANCE, BALANCE,
/** /**
* Provided during initial allocation operation for unassigned shards. * Provided during initial allocation operation for unassigned shards.
*/ */
ALLOCATE, ALLOCATE,
/** /**
* Provided during move operation. * Provided during move operation.
*/ */
MOVE, MOVE,
/** /**
* Provided when the weight delta is checked against the configured threshold. * Provided when the weight delta is checked against the configured threshold.
* This can be used to ignore tie-breaking weight factors that should not * This can be used to ignore tie-breaking weight factors that should not
* solely trigger a relocation unless the delta is above the threshold. * solely trigger a relocation unless the delta is above the threshold.
*/ */
THRESHOLD_CHECK THRESHOLD_CHECK
} }
@ -267,7 +269,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
private final float threshold; private final float threshold;
private final MetaData metaData; private final MetaData metaData;
private final Predicate<MutableShardRouting> assignedFilter = new Predicate<MutableShardRouting>() { private final Predicate<MutableShardRouting> assignedFilter = new Predicate<MutableShardRouting>() {
@Override @Override
public boolean apply(MutableShardRouting input) { public boolean apply(MutableShardRouting input) {
@ -325,27 +327,20 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
/** /**
* Returns a new {@link NodeSorter} that sorts the nodes based on their * Returns a new {@link NodeSorter} that sorts the nodes based on their
* current weight with respect to the index passed to the sorter. The * current weight with respect to the index passed to the sorter. The
* returned sorter is not sorted. Use {@link NodeSorter#reset(String)} * returned sorter is not sorted. Use {@link NodeSorter#reset(org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.Operation, String)}
* to sort based on an index. * to sort based on an index.
*/ */
private NodeSorter newNodeSorter() { private NodeSorter newNodeSorter() {
final NodeSorter sorter = new NodeSorter(nodesArray(), weight, this); return new NodeSorter(nodesArray(), weight, this);
return sorter;
} }
private boolean initialize(RoutingNodes routing) { private boolean initialize(RoutingNodes routing) {
Collection<MutableShardRouting> shards = new ArrayList<MutableShardRouting>();
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("Start distributing Shards"); logger.trace("Start distributing Shards");
} }
for (IndexRoutingTable index : allocation.routingTable().indicesRouting().values()) { indices.addAll(allocation.routingTable().indicesRouting().keySet());
indices.add(index.index()); buildModelFromAssigned(routing.shards(assignedFilter));
for (IndexShardRoutingTable shard : index.getShards().values()) {
shards.addAll(routing.shardsRoutingFor(index.index(), shard.shardId().id()));
}
}
buildModelFromAssigned(Iterables.filter(shards, assignedFilter));
return allocateUnassigned(allocation.routingNodes().unassigned(), allocation.routingNodes().ignoredUnassigned()); return allocateUnassigned(allocation.routingNodes().unassigned(), allocation.routingNodes().ignoredUnassigned());
} }
@ -376,7 +371,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
NodeSorter sorter = newNodeSorter(); NodeSorter sorter = newNodeSorter();
if (nodes.size() > 1) { /* skip if we only have one node */ if (nodes.size() > 1) { /* skip if we only have one node */
for (String index : buildWeightOrderedIndidces(Operation.BALANCE, sorter)) { for (String index : buildWeightOrderedIndidces(Operation.BALANCE, sorter)) {
sorter.reset(Operation.BALANCE,index); sorter.reset(Operation.BALANCE, index);
final float[] weights = sorter.weights; final float[] weights = sorter.weights;
final ModelNode[] modelNodes = sorter.modelNodes; final ModelNode[] modelNodes = sorter.modelNodes;
int lowIdx = 0; int lowIdx = 0;
@ -389,7 +384,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
delta = delta <= threshold ? delta : sorter.weight(Operation.THRESHOLD_CHECK, maxNode) - sorter.weight(Operation.THRESHOLD_CHECK, minNode); delta = delta <= threshold ? delta : sorter.weight(Operation.THRESHOLD_CHECK, maxNode) - sorter.weight(Operation.THRESHOLD_CHECK, minNode);
if (delta <= threshold) { if (delta <= threshold) {
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("Stop balancing index [{}] min_node [{}] weight: [{}] max_node [{}] weight: [{}] delta: [{}]", logger.trace("Stop balancing index [{}] min_node [{}] weight: [{}] max_node [{}] weight: [{}] delta: [{}]",
index, maxNode.getNodeId(), weights[highIdx], minNode.getNodeId(), weights[lowIdx], delta); index, maxNode.getNodeId(), weights[highIdx], minNode.getNodeId(), weights[lowIdx], delta);
} }
@ -622,15 +617,15 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
ModelNode minNode = null; ModelNode minNode = null;
Decision decision = null; Decision decision = null;
for (ModelNode node : nodes.values()) { for (ModelNode node : nodes.values()) {
/* /*
* The shard we add is removed below to simulate the * The shard we add is removed below to simulate the
* addition for weight calculation we use Decision.ALWAYS to * addition for weight calculation we use Decision.ALWAYS to
* not violate the not null condition. * not violate the not null condition.
*/ */
if (!node.containsShard(shard)) { if (!node.containsShard(shard)) {
node.addShard(shard, Decision.ALWAYS); node.addShard(shard, Decision.ALWAYS);
float currentWeight = weight.weight(Operation.ALLOCATE, this, node, shard.index()); float currentWeight = weight.weight(Operation.ALLOCATE, this, node, shard.index());
/* /*
* Remove the shard from the node again this is only a * Remove the shard from the node again this is only a
* simulation * simulation
*/ */

View File

@ -75,9 +75,12 @@ public class ThrottlingAllocationDecider extends AllocationDecider {
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
if (shardRouting.primary()) { if (shardRouting.primary()) {
boolean primaryUnassigned = false; boolean primaryUnassigned = false;
for (MutableShardRouting shard : allocation.routingNodes().unassigned()) { List<MutableShardRouting> unassigned = allocation.routingNodes().unassigned();
for (int i1 = 0; i1 < unassigned.size(); i1++) {
MutableShardRouting shard = unassigned.get(i1);
if (shard.shardId().equals(shardRouting.shardId())) { if (shard.shardId().equals(shardRouting.shardId())) {
primaryUnassigned = true; primaryUnassigned = true;
break;
} }
} }
if (primaryUnassigned) { if (primaryUnassigned) {