Added BalancedShardsAllocator that balances shards based on a weight function.

* Weights are calculated per index and incorporate index level, global and primary related parameters
 * Balance operations are executed based on a win maximation strategy that tries to relocate shards
   first that offer the biggest gain towards the weight functions optimum
 * The WeightFunction allows settings to prefer index based balance over global balance and vice versa
 * Balance operations can be throttled by raising a threshold resulting in less agressive balance operations
 * WeightFunction shipps with defaults to achive evenly distributed indexes while maintaining a global balance

Closes #2555
This commit is contained in:
Simon Willnauer 2012-11-26 10:15:09 +01:00
parent d97839b8a8
commit 2eb09e6b1a
12 changed files with 2294 additions and 29 deletions

View File

@ -133,7 +133,9 @@ public class MetaData implements Iterable<IndexMetaData> {
private final ImmutableMap<String, IndexTemplateMetaData> templates;
private final ImmutableMap<String, Custom> customs;
private final transient int totalNumberOfShards;
private final transient int totalNumberOfShards; // Transient ? not serializable anyway?
private final int numberOfShards;
private final String[] allIndices;
private final ImmutableSet<String> allIndicesSet;
@ -148,6 +150,7 @@ public class MetaData implements Iterable<IndexMetaData> {
private final ImmutableMap<String, String[]> aliasAndIndexToIndexMap;
MetaData(long version, Settings transientSettings, Settings persistentSettings, ImmutableMap<String, IndexMetaData> indices, ImmutableMap<String, IndexTemplateMetaData> templates, ImmutableMap<String, Custom> customs) {
this.version = version;
this.transientSettings = transientSettings;
@ -157,10 +160,13 @@ public class MetaData implements Iterable<IndexMetaData> {
this.customs = customs;
this.templates = templates;
int totalNumberOfShards = 0;
int numberOfShards = 0;
for (IndexMetaData indexMetaData : indices.values()) {
totalNumberOfShards += indexMetaData.totalNumberOfShards();
numberOfShards += indexMetaData.numberOfShards();
}
this.totalNumberOfShards = totalNumberOfShards;
this.numberOfShards = numberOfShards;
// build all indices map
List<String> allIndicesLst = Lists.newArrayList();
@ -690,6 +696,14 @@ public class MetaData implements Iterable<IndexMetaData> {
public int getTotalNumberOfShards() {
return totalNumberOfShards();
}
public int numberOfShards() {
return this.numberOfShards;
}
public int getnumberOfShards() {
return numberOfShards();
}
/**

View File

@ -351,7 +351,7 @@ public class MetaDataCreateIndexService extends AbstractComponent {
});
return updatedState;
} catch (Exception e) {
} catch (Throwable e) {
logger.warn("[{}] failed to create", e, request.index);
listener.onFailure(e);
return currentState;

View File

@ -0,0 +1,966 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.routing.allocation.allocator;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import org.apache.lucene.util.SorterTemplate;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.MutableShardRouting;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.cluster.routing.allocation.decider.Decision.Type;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.node.settings.NodeSettingsService;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING;
/**
* The {@link BalancedShardsAllocator} re-balances the nodes allocations
* within an cluster based on a {@link WeightFunction}. The clusters balance is defined by four parameters which can be set
* in the cluster update API that allows changes in real-time:
*
* <ul><li><code>cluster.routing.allocation.balance.shard</code> - The <b>shard balance</b> defines the weight factor
* for shards allocated on a {@link RoutingNode}</li>
* <li><code>cluster.routing.allocation.balance.index</code> - The <b>index balance</b> defines a factor to the number
* of {@link ShardRouting}s per index allocated on a specific node</li>
* <li><code>cluster.routing.allocation.balance.primary</code> - the <b>primary balance</b> defines a weight factor for
* the number of primaries of a specific index allocated on a node</li>
* <li><code>cluster.routing.allocation.balance.threshold</code> - A <b>threshold</b> to set the minimal optimization
* value of operations that should be performed</li>
* </ul>
*
* These parameters are combined in a {@link WeightFunction} that allows calculation of node weights which
* are used to re-balance shards based on global as well as per-index factors.
*/
public class BalancedShardsAllocator extends AbstractComponent implements ShardsAllocator {
public static final String SETTING_THRESHOLD = "cluster.routing.allocation.balance.threshold";
public static final String SETTING_INDEX_BALANCE_FACTOR = "cluster.routing.allocation.balance.index";
public static final String SETTING_SHARD_BALANCE_FACTOR = "cluster.routing.allocation.balance.shard";
public static final String SETTING_PRIMARY_BALANCE_FACTOR = "cluster.routing.allocation.balance.primary";
static {
MetaData.addDynamicSettings(
SETTING_INDEX_BALANCE_FACTOR,
SETTING_PRIMARY_BALANCE_FACTOR,
SETTING_SHARD_BALANCE_FACTOR,
SETTING_THRESHOLD
);
}
class ApplySettings implements NodeSettingsService.Listener {
@Override
public void onRefreshSettings(Settings settings) {
float indexBalance = settings.getAsFloat(SETTING_INDEX_BALANCE_FACTOR, 0.5f);
float shardBalance = settings.getAsFloat(SETTING_SHARD_BALANCE_FACTOR, 0.45f);
float primaryBalance = settings.getAsFloat(SETTING_PRIMARY_BALANCE_FACTOR, 0.05f);
float threshold = settings.getAsFloat(SETTING_THRESHOLD, 1.0f);
if (threshold <= 0.0f) {
throw new ElasticSearchIllegalArgumentException("threshold must be greater than 0.0f but was: " + threshold);
}
BalancedShardsAllocator.this.threshold = threshold;
BalancedShardsAllocator.this.weightFunction = new WeightFunction(indexBalance, shardBalance, primaryBalance);
}
}
private volatile WeightFunction weightFunction;
private volatile float threshold;
public BalancedShardsAllocator(Settings settings) {
this(settings, new NodeSettingsService(settings));
}
@Inject
public BalancedShardsAllocator(Settings settings, NodeSettingsService nodeSettingsService) {
super(settings);
ApplySettings applySettings = new ApplySettings();
applySettings.onRefreshSettings(settings);
nodeSettingsService.addListener(applySettings);
}
@Override
public void applyStartedShards(StartedRerouteAllocation allocation) { /* ONLY FOR GATEWAYS */ }
@Override
public void applyFailedShards(FailedRerouteAllocation allocation) { /* ONLY FOR GATEWAYS */ }
@Override
public boolean allocateUnassigned(RoutingAllocation allocation) {
return rebalance(allocation);
}
@Override
public boolean rebalance(RoutingAllocation allocation) {
final Balancer balancer = new Balancer(logger, allocation, weightFunction, threshold);
return balancer.balance();
}
@Override
public boolean move(MutableShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
final Balancer balancer = new Balancer(logger, allocation, weightFunction, threshold);
return balancer.move(shardRouting, node);
}
/**
* This class is the primary weight function used to create balanced over nodes and shards in the cluster.
* Currently this function has 3 properties:
* <ul>
* <li><code>index balance</code> - balance property over shards per index</li>
* <li><code>shard balance</code> - balance property over shards per cluster</li>
* <li><code>primary balance</code> - balance property over primaries per cluster</li>
* </ul>
* <p>
* Each of these properties are expressed as factor such that the properties factor defines the relative importance of the property for the
* weight function. For example if the weight function should calculate the weights only based on a global (shard) balance the index and primary balance
* can be set to <tt>0.0</tt> and will in turn have no effect on the distribution.
* </p>
* The weight per index is calculated based on the following formula:
* <ul>
* <li>
* <code>weight<sub>index</sub>(node, index) = indexBalance * (node.numShards(index) - avgShardsPerNode(index))</code>
* </li>
* <li>
* <code>weight<sub>node</sub>(node, index) = shardBalance * (node.numShards() - avgShardsPerNode)</code>
* </li>
* <li>
* <code>weight<sub>primary</sub>(node, index) = primaryBalance * (node.numPrimaries() - avgPrimariesPerNode)</code>
* </li>
* </ul>
* <code>weight(node, index) = weight<sub>index</sub>(node, index) + weight<sub>node</sub>(node, index) + weight<sub>primary</sub>(node, index)</code>
*
*/
public static class WeightFunction {
private final float indexBalance;
private final float shardBalance;
private final float primaryBalance;
public WeightFunction(float indexBalance, float shardBalance, float primaryBalance) {
final float sum = indexBalance + shardBalance + primaryBalance;
if (sum <= 0.0f) {
throw new ElasticSearchIllegalArgumentException("Balance factors must sum to a value > 0 but was: " + sum);
}
this.indexBalance = indexBalance / sum;
this.shardBalance = shardBalance / sum;
this.primaryBalance = primaryBalance / sum;
}
public float weight(Balancer balancer, ModelNode node, String index) {
final float weightShard = shardBalance * (node.numShards() - balancer.avgShardsPerNode());
final float weightIndex = indexBalance * (node.numShards(index) - balancer.avgShardsPerNode(index));
final float weightPrimary = primaryBalance * (node.numPrimaries() - balancer.avgPrimariesPerNode());
return weightShard + weightIndex + weightPrimary;
}
}
/**
* A {@link Balancer}
*/
public static class Balancer {
private final ESLogger logger;
private final Map<String, ModelNode> nodes = new HashMap<String, ModelNode>();
private final HashSet<String> indices = new HashSet<String>();
private final RoutingAllocation allocation;
private final WeightFunction weight;
private final float threshold;
private final MetaData metaData;
private final Predicate<MutableShardRouting> assignedFilter = new Predicate<MutableShardRouting>() {
@Override
public boolean apply(MutableShardRouting input) {
return input.assignedToNode();
}
};
public Balancer(ESLogger logger, RoutingAllocation allocation, WeightFunction weight, float threshold) {
this.logger = logger;
this.allocation = allocation;
this.weight = weight;
this.threshold = threshold;
for (RoutingNode node : allocation.routingNodes()) {
nodes.put(node.nodeId(), new ModelNode(node.nodeId()));
}
metaData = allocation.routingNodes().metaData();
}
/**
* Returns an array view on the nodes in the balancer. Nodes should not be removed from this list.
*/
private ModelNode[] nodesArray() {
return nodes.values().toArray(new ModelNode[nodes.size()]);
}
/**
* Returns the average of shards per node for the given index
*/
public float avgShardsPerNode(String index) {
return ((float) metaData.index(index).totalNumberOfShards()) / nodes.size();
}
/**
* Returns the global average of shards per node
*/
public float avgShardsPerNode() {
return ((float) metaData.totalNumberOfShards()) / nodes.size();
}
/**
* Returns the global average of primaries per node
*/
public float avgPrimariesPerNode() {
return ((float) metaData.numberOfShards()) / nodes.size();
}
/**
* Returns the average of primaries per node for the given index
*/
public float avgPrimariesPerNode(String index) {
return ((float) metaData.index(index).numberOfShards()) / nodes.size();
}
/**
* Returns a new {@link NodeSorter} that sorts the nodes based on their
* current weight with respect to the index passed to the sorter. The
* returned sorter is not sorted. Use {@link NodeSorter#reset(String)}
* to sort based on an index.
*/
private NodeSorter newNodeSorter() {
final NodeSorter sorter = new NodeSorter(nodesArray(), weight, this);
return sorter;
}
private boolean initialize(RoutingNodes routing) {
Collection<MutableShardRouting> shards = new ArrayList<MutableShardRouting>();
if (logger.isTraceEnabled()) {
logger.trace("Start distributing Shards");
}
for (IndexRoutingTable index : allocation.routingTable().indicesRouting().values()) {
indices.add(index.index());
for (IndexShardRoutingTable shard : index.getShards().values()) {
shards.addAll(routing.shardsRoutingFor(index.index(), shard.shardId().id()));
}
}
buildModelFromAssigned(Iterables.filter(shards, assignedFilter));
return allocateUnassigned(allocation.routingNodes().unassigned(), allocation.routingNodes().ignoredUnassigned());
}
/**
* Balances the nodes on the cluster model according to the weight
* function. The configured threshold is the minimum delta between the
* weight of the maximum node and the minimum node according to the
* {@link WeightFunction}. This weight is calculated per index to
* distribute shards evenly per index. The balancer tries to relocate
* shards only if the delta exceeds the threshold. If the default case
* the threshold is set to <tt>1.0</tt> to enforce gaining relocation
* only, or in other words relocations that move the weight delta closer
* to <tt>0.0</tt>
*
* @return <code>true</code> if the current configuration has been
* changed, otherwise <code>false</code>
*/
public boolean balance() {
if (this.nodes.isEmpty()) {
/* with no nodes this is pointless */
return false;
}
if (logger.isTraceEnabled()) {
logger.trace("Start balancing cluster");
}
boolean changed = initialize(allocation.routingNodes());
NodeSorter sorter = newNodeSorter();
if (nodes.size() > 1) { /* skip if we only have one node */
for (String index : buildWeightOrderedIndidces(sorter)) {
sorter.reset(index);
final float[] weights = sorter.weights;
final ModelNode[] modelNodes = sorter.modelNodes;
int lowIdx = 0;
int highIdx = weights.length - 1;
while (true) {
final ModelNode minNode = modelNodes[lowIdx];
final ModelNode maxNode = modelNodes[highIdx];
if (maxNode.numShards(index) > 0) {
float delta = weights[highIdx] - weights[lowIdx];
if (delta <= threshold) {
if (logger.isTraceEnabled()) {
logger.trace("Stop balancing index [{}] min_node [{}] weight: [{}] max_node [{}] weight: [{}] delta: [{}]",
index, maxNode.getNodeId(), weights[highIdx], minNode.getNodeId(), weights[lowIdx], delta);
}
break;
}
if (logger.isTraceEnabled()) {
logger.trace("Balancing from node [{}] weight: [{}] to node [{}] weight: [{}] delta: [{}]",
maxNode.getNodeId(), weights[highIdx], minNode.getNodeId(), weights[lowIdx], delta);
}
/* pass the delta to the replication function to prevent relocations that only swap the weights of the two nodes.
* a relocation must bring us closer to the balance if we only achive the same delta the relocation is useless */
if (tryRelocateShard(minNode, maxNode, index, delta)) {
/*
* TODO we could be a bit smarter here, we don't need to fully sort necessarily
* we could just find the place to insert linearly but the win might be minor
* compared to the added complexity
*/
weights[lowIdx] = sorter.weight(modelNodes[lowIdx]);
weights[highIdx] = sorter.weight(modelNodes[highIdx]);
sorter.quickSort(0, weights.length - 1);
lowIdx = 0;
highIdx = weights.length - 1;
changed = true;
continue;
}
}
if (lowIdx < highIdx - 1) {
/* we can't move from any shard from the min node lets move on to the next node
* and see if the threshold still holds. We either don't have any shard of this
* index on this node of allocation deciders prevent any relocation.*/
lowIdx++;
} else if (lowIdx > 0) {
/* now we go max to min since obviously we can't move anything to the max node
* lets pick the next highest */
lowIdx = 0;
highIdx--;
} else {
/* we are done here, we either can't relocate anymore or we are balanced */
break;
}
}
}
}
return changed;
}
/**
* This builds a initial index ordering where the indices are returned
* in most unbalanced first. We need this in order to prevent over
* allocations on added nodes from one index when the weight parameters
* for global balance overrule the index balance at an intermediate
* state. For example this can happen if we have 3 nodes and 3 indices
* with 3 shards and 1 shard. At the first stage all three nodes hold
* 2 shard for each index. now we add another node and the first index
* is balanced moving 3 two of the nodes over to the new node since it
* has no shards yet and global balance for the node is way below
* average. To re-balance we need to move shards back eventually likely
* to the nodes we relocated them from.
*/
private String[] buildWeightOrderedIndidces(NodeSorter sorter) {
final String[] indices = this.indices.toArray(new String[this.indices.size()]);
final float[] deltas = new float[indices.length];
for (int i = 0; i < deltas.length; i++) {
sorter.reset(indices[i]);
deltas[i] = sorter.delta();
}
new SorterTemplate() {
float pivotWeight;
@Override
protected void swap(int i, int j) {
final String tmpIdx = indices[i];
indices[i] = indices[j];
indices[j] = tmpIdx;
final float tmpDelta = deltas[i];
deltas[i] = deltas[j];
deltas[j] = tmpDelta;
}
@Override
protected int compare(int i, int j) {
return Float.compare(deltas[j], deltas[i]);
}
@Override
protected void setPivot(int i) {
pivotWeight = deltas[i];
}
@Override
protected int comparePivot(int j) {
return Float.compare(deltas[j], pivotWeight);
}
}.quickSort(0, deltas.length - 1);
return indices;
}
/**
* This function executes a move operation moving the given shard from
* the given node to the minimal eligible node with respect to the
* weight function. Iff the shard is moved the shard will be set to
* {@link ShardRoutingState#RELOCATING} and a shadow instance of this
* shard is created with an incremented version in the state
* {@link ShardRoutingState#INITIALIZING}.
*
* @return <code>true</code> iff the shard has successfully been moved.
*/
public boolean move(MutableShardRouting shard, RoutingNode node) {
if (nodes.isEmpty() || !shard.started()) {
/* with no nodes or a not started shard this is pointless */
return false;
}
if (logger.isTraceEnabled()) {
logger.trace("Try moving shard [{}] from [{}]", shard, node);
}
boolean changed = initialize(allocation.routingNodes());
final ModelNode sourceNode = nodes.get(node.nodeId());
assert sourceNode != null;
final NodeSorter sorter = newNodeSorter();
sorter.reset(shard.getIndex());
final ModelNode[] nodes = sorter.modelNodes;
assert sourceNode.containsShard(shard);
/*
* the sorter holds the minimum weight node first for the shards index.
* We now walk through the nodes until we find a node to allocate the shard.
* This is not guaranteed to be balanced after this operation we still try best effort to
* allocate on the minimal eligable node.
*/
for (ModelNode currentNode : nodes) {
if (currentNode.getNodeId().equals(node.nodeId())) {
continue;
}
RoutingNode target = allocation.routingNodes().node(currentNode.getNodeId());
Decision decision = allocation.deciders().canAllocate(shard, target, allocation);
if (decision.type() == Type.YES) { // TODO maybe we can respect throtteling here too?
sourceNode.removeShard(shard);
final MutableShardRouting initializingShard = new MutableShardRouting(shard.index(), shard.id(), currentNode.getNodeId(),
shard.currentNodeId(), shard.primary(), INITIALIZING, shard.version() + 1);
currentNode.addShard(initializingShard, decision);
target.add(initializingShard);
shard.relocate(target.nodeId()); // set the node to relocate after we added the initializing shard
if (logger.isTraceEnabled()) {
logger.trace("Moved shard [{}] to node [{}]", shard, currentNode.getNodeId());
}
return true;
}
}
return changed;
}
/**
* Builds the internal model from all shards in the given
* {@link Iterable}. All shards in the {@link Iterable} must be assigned
* to a node. This method will skip shards in the state
* {@link ShardRoutingState#RELOCATING} since each relocating shard has
* a shadow shard in the state {@link ShardRoutingState#INITIALIZING}
* on the target node which we respect during the allocation / balancing
* process. In short, this method recreates the status-quo in the cluster.
*/
private void buildModelFromAssigned(Iterable<MutableShardRouting> shards) {
for (MutableShardRouting shard : shards) {
assert shard.assignedToNode();
/* we skip relocating shards here since we expect an initializing shard with the same id coming in */
if (shard.state() == RELOCATING) {
continue;
}
ModelNode node = nodes.get(shard.currentNodeId());
assert node != null;
node.addShard(shard, Decision.single(Type.YES, "Already allocated on node", node.getNodeId()));
if (logger.isTraceEnabled()) {
logger.trace("Assigned shard [{}] to node [{}]", shard, node.getNodeId());
}
}
}
/**
* Allocates all given shards on the minimal eligable node for the shards index
* with respect to the weight function. All given shards must be unassigned.
*/
private boolean allocateUnassigned(List<MutableShardRouting> unassigned, List<MutableShardRouting> ignoredUnassigned) {
assert !nodes.isEmpty();
if (logger.isTraceEnabled()) {
logger.trace("Start allocating unassigned shards");
}
if (unassigned.isEmpty()) {
return false;
}
boolean changed = false;
/*
* TODO: We could be smarter here and group the shards by index and then
* use the sorter to save some iterations.
*/
final RoutingNodes routingNodes = allocation.routingNodes();
final AllocationDeciders deciders = allocation.deciders();
final Set<MutableShardRouting> currentRound = new TreeSet<MutableShardRouting>(new Comparator<MutableShardRouting>() {
@Override
public int compare(MutableShardRouting o1,
MutableShardRouting o2) {
final int indexCmp;
if ((indexCmp = o1.index().compareTo(o2.index())) == 0) {
if (o1.getId() - o2.getId() == 0) {
return o1.primary() ? -1 : o2.primary() ? 1 : 0;
}
return o1.getId() - o2.getId();
}
return indexCmp;
}
});
do {
Iterator<MutableShardRouting> iterator = unassigned.iterator();
while(iterator.hasNext()) {
/* we treat every index equally here once chunk a time such that we fill up
* nodes with all indices at the same time. Only on shard of a shard a time.
* Although there might be a primary and a shard of a shard in the set but
* primaries will be started first.*/
if (currentRound.add(iterator.next())) {
iterator.remove();
}
}
boolean iterationChanged = false;
for (MutableShardRouting shard : currentRound) {
assert !shard.assignedToNode();
/* find an node with minimal weight we can allocate on*/
float minWeight = Float.POSITIVE_INFINITY;
ModelNode minNode = null;
Decision decision = null;
for (ModelNode node : nodes.values()) {
/*
* The shard we add is removed below to simulate the
* addition for weight calculation we use Decision.ALWAYS to
* not violate the not null condition.
*/
if (!node.containsShard(shard)) {
node.addShard(shard, Decision.ALWAYS);
float currentWeight = weight.weight(this, node, shard.index());
/*
* Remove the shard from the node again this is only a
* simulation
*/
Decision removed = node.removeShard(shard);
assert removed != null;
/*
* Unless the operation is not providing any gains we
* don't check deciders
*/
if (currentWeight <= minWeight) {
Decision currentDecision = deciders.canAllocate(shard, routingNodes.node(node.getNodeId()), allocation);
NOUPDATE:
if (currentDecision.type() == Type.YES || currentDecision.type() == Type.THROTTLE) {
if (currentWeight == minWeight) {
/* we have an equal weight tie breaking:
* 1. if one decision is YES prefer it
* 2. prefer the node that holds the primary for this index with the next id in the ring ie.
* for the 3 shards 2 replica case we try to build up:
* 1 2 0
* 2 0 1
* 0 1 2
* such that if we need to tie-break we try to prefer the node holding a shard with the minimal id greater
* than the id of the shard we need to assign. This works find when new indices are created since
* primaries are added first and we only add one shard set a time in this algorithm.
*/
if (currentDecision.type() == decision.type()) {
final int repId = shard.id();
final int nodeHigh = node.highestPrimary(shard.index());
final int minNodeHigh = minNode.highestPrimary(shard.index());
if ((((nodeHigh > repId && minNodeHigh > repId) || (nodeHigh < repId && minNodeHigh < repId)) && (nodeHigh < minNodeHigh))
|| (nodeHigh > minNodeHigh && nodeHigh > repId && minNodeHigh < repId)) {
minNode = node;
minWeight = currentWeight;
decision = currentDecision;
} else { break NOUPDATE; }
} else if (currentDecision.type() != Type.YES) {
break NOUPDATE;
}
}
minNode = node;
minWeight = currentWeight;
decision = currentDecision;
}
}
}
}
assert decision != null && minNode != null || decision == null && minNode == null;
if (minNode != null) {
iterationChanged = true;
minNode.addShard(shard, decision);
if (decision.type() == Type.YES) {
if (logger.isTraceEnabled()) {
logger.trace("Assigned shard [{}] to [{}]", shard, minNode.getNodeId());
}
routingNodes.node(minNode.getNodeId()).add(shard);
changed = true;
continue; // don't add to ignoreUnassigned
}
} else if (logger.isTraceEnabled()) {
logger.trace("No Node found to assign shard [{}]", shard);
}
ignoredUnassigned.add(shard);
}
if (!iterationChanged && !unassigned.isEmpty()) {
ignoredUnassigned.addAll(unassigned);
unassigned.clear();
return changed;
}
currentRound.clear();
} while(!unassigned.isEmpty());
// clear everything we have either added it or moved to ingoreUnassigned
return changed;
}
/**
* Tries to find a relocation from the max node to the minimal node for an arbitrary shard of the given index on the
* balance model. Iff this method returns a <code>true</code> the relocation has already been executed on the
* simulation model as well as on the cluster.
*/
private boolean tryRelocateShard(ModelNode minNode, ModelNode maxNode, String idx, float minCost) {
final ModelIndex index = maxNode.getIndex(idx);
if (index != null) {
if (logger.isTraceEnabled()) {
logger.trace("Try relocating shard for index index [{}] from node [{}] to node [{}]", idx, maxNode.getNodeId(),
minNode.getNodeId());
}
final RoutingNode node = allocation.routingNodes().node(minNode.getNodeId());
MutableShardRouting candidate = null;
Decision decision = null;
final AllocationDeciders deciders = allocation.deciders();
/* make a copy since we modify this list in the loop */
final ArrayList<MutableShardRouting> shards = new ArrayList<MutableShardRouting>(index.getAllShards());
for (MutableShardRouting shard : shards) {
if (shard.started()) {
// skip initializing, unassigned and relocating shards we can't relocate them anyway
Decision allocationDecision = deciders.canAllocate(shard, node, allocation);
Decision rebalanceDecission = deciders.canRebalance(shard, allocation);
if (((allocationDecision.type() == Type.YES) || (allocationDecision.type() == Type.THROTTLE))
&& ((rebalanceDecission.type() == Type.YES) || (rebalanceDecission.type() == Type.THROTTLE))) {
Decision srcDecision;
if ((srcDecision = maxNode.removeShard(shard)) != null) {
minNode.addShard(shard, srcDecision);
final float delta = weight.weight(this, minNode, idx) - weight.weight(this, maxNode, idx);
if (delta < minCost) {
minCost = delta;
candidate = shard;
decision = new Decision.Multi().add(allocationDecision).add(rebalanceDecission);
}
minNode.removeShard(shard);
maxNode.addShard(shard, srcDecision);
}
}
}
}
if (candidate != null) {
/* allocate on the model even if not throttled */
maxNode.removeShard(candidate);
minNode.addShard(candidate, decision);
if (decision.type() == Type.YES) { /* only allocate on the cluster if we are not throttled */
if (logger.isTraceEnabled()) {
logger.trace("Relocate shard [{}] from node [{}] to node [{}]", candidate, maxNode.getNodeId(),
minNode.getNodeId());
}
/* now allocate on the cluster - if we are started we need to relocate the shard */
if (candidate.started()) {
RoutingNode lowRoutingNode = allocation.routingNodes().node(minNode.getNodeId());
lowRoutingNode.add(new MutableShardRouting(candidate.index(), candidate.id(), lowRoutingNode.nodeId(), candidate
.currentNodeId(), candidate.primary(), INITIALIZING, candidate.version() + 1));
candidate.relocate(lowRoutingNode.nodeId());
} else {
assert candidate.unassigned();
allocation.routingNodes().node(minNode.getNodeId()).add(candidate);
}
return true;
}
}
}
if (logger.isTraceEnabled()) {
logger.trace("Couldn't find shard to relocate from node [{}] to node [{}]", maxNode.getNodeId(),
minNode.getNodeId());
}
return false;
}
}
static class ModelNode implements Iterable<ModelIndex> {
private final String id;
private final Map<String, ModelIndex> indices = new HashMap<String, ModelIndex>();
/* cached stats - invalidated on add/remove and lazily calculated */
private int numShards = -1;
private int numPrimaries = -1;
public ModelNode(String id) {
this.id = id;
}
public ModelIndex getIndex(String indexId) {
return indices.get(indexId);
}
public String getNodeId() {
return id;
}
public int numShards() {
if (numShards == -1) {
int sum = 0;
for (ModelIndex index : indices.values()) {
sum += index.numShards();
}
numShards = sum;
}
return numShards;
}
public int numShards(String idx) {
ModelIndex index = indices.get(idx);
return index == null ? 0 : index.numShards();
}
public int numPrimaries(String idx) {
ModelIndex index = indices.get(idx);
return index == null ? 0 : index.numPrimaries();
}
public int numPrimaries() {
if (numPrimaries == -1) {
int sum = 0;
for (ModelIndex index : indices.values()) {
sum += index.numPrimaries();
}
numPrimaries = sum;
}
return numPrimaries;
}
public Collection<MutableShardRouting> shards() {
Collection<MutableShardRouting> result = new ArrayList<MutableShardRouting>();
for (ModelIndex index : indices.values()) {
result.addAll(index.getAllShards());
}
return result;
}
public int highestPrimary(String index) {
ModelIndex idx = indices.get(index);
if (idx != null) {
return idx.highestPrimary();
}
return -1;
}
public void addShard(MutableShardRouting shard, Decision decision) {
numPrimaries = numShards = -1;
ModelIndex index = indices.get(shard.index());
if (index == null) {
index = new ModelIndex(shard.index());
indices.put(index.getIndexId(), index);
}
index.addShard(shard, decision);
}
public Decision removeShard(MutableShardRouting shard) {
numPrimaries = numShards = -1;
ModelIndex index = indices.get(shard.index());
Decision removed = null;
if (index != null) {
removed = index.removeShard(shard);
if (removed != null && index.numShards() == 0) {
indices.remove(shard.index());
}
}
return removed;
}
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("Node(").append(id).append(")");
return sb.toString();
}
@Override
public Iterator<ModelIndex> iterator() {
return indices.values().iterator();
}
public boolean containsShard(MutableShardRouting shard) {
ModelIndex index = getIndex(shard.getIndex());
return index == null ? false : index.containsShard(shard);
}
}
static final class ModelIndex {
private final String id;
private final Map<MutableShardRouting, Decision> shards = new HashMap<MutableShardRouting, Decision>();
private int numPrimaries = -1;
private int highestPrimary = -1;
public ModelIndex(String id) {
this.id = id;
}
public int highestPrimary() {
if (highestPrimary == -1) {
int maxId = -1;
for (MutableShardRouting shard : shards.keySet()) {
if (shard.primary()) {
maxId = Math.max(maxId, shard.id());
}
}
return highestPrimary = maxId;
}
return highestPrimary;
}
public String getIndexId() {
return id;
}
public Decision getDecicion(MutableShardRouting shard) {
return shards.get(shard);
}
public int numShards() {
return shards.size();
}
public Collection<MutableShardRouting> getAllShards() {
return shards.keySet();
}
public int numPrimaries() {
if (numPrimaries == -1) {
int num = 0;
for (MutableShardRouting shard : shards.keySet()) {
if (shard.primary()) {
num++;
}
}
return numPrimaries = num;
}
return numPrimaries;
}
public Decision removeShard(MutableShardRouting shard) {
highestPrimary = numPrimaries = -1;
return shards.remove(shard);
}
public void addShard(MutableShardRouting shard, Decision decision) {
highestPrimary = numPrimaries = -1;
assert decision != null;
assert !shards.containsKey(shard) : "Shard already allocated on current node: " + shards.get(shard) + " " + shard;
shards.put(shard, decision);
}
public boolean containsShard(MutableShardRouting shard) {
return shards.containsKey(shard);
}
}
static final class NodeSorter extends SorterTemplate {
final ModelNode[] modelNodes;
/* the nodes weights with respect to the current weight function / index */
final float[] weights;
private final WeightFunction function;
private String index;
private final Balancer balancer;
private float pivotWeight;
public NodeSorter(ModelNode[] modelNodes, WeightFunction function, Balancer balancer) {
this.function = function;
this.balancer = balancer;
this.modelNodes = modelNodes;
weights = new float[modelNodes.length];
}
/**
* Resets the sorter, recalculates the weights per node and sorts the
* nodes by weight, with minimal weight first.
*/
public void reset(String index) {
this.index = index;
for (int i = 0; i < weights.length; i++) {
weights[i] = weight(modelNodes[i]);
}
quickSort(0, modelNodes.length - 1);
}
public float weight(ModelNode node) {
return function.weight(balancer, node, index);
}
@Override
protected void swap(int i, int j) {
final ModelNode tmpNode = modelNodes[i];
modelNodes[i] = modelNodes[j];
modelNodes[j] = tmpNode;
final float tmpWeight = weights[i];
weights[i] = weights[j];
weights[j] = tmpWeight;
}
@Override
protected int compare(int i, int j) {
return Float.compare(weights[i], weights[j]);
}
@Override
protected void setPivot(int i) {
pivotWeight = weights[i];
}
@Override
protected int comparePivot(int j) {
return Float.compare(pivotWeight, weights[j]);
}
public float delta() {
return weights[weights.length-1] - weights[0];
}
}
}

View File

@ -48,6 +48,6 @@ public class ShardsAllocatorModule extends AbstractModule {
@Override
protected void configure() {
bind(GatewayAllocator.class).to(gatewayAllocator).asEagerSingleton();
bind(ShardsAllocator.class).to(shardsAllocator == null ? EvenShardsCountAllocator.class : shardsAllocator).asEagerSingleton();
bind(ShardsAllocator.class).to(shardsAllocator == null ? BalancedShardsAllocator.class : shardsAllocator).asEagerSingleton();
}
}

View File

@ -45,7 +45,7 @@ public class ShardsAllocators extends AbstractComponent implements ShardsAllocat
}
public ShardsAllocators(Settings settings) {
this(settings, new NoneGatewayAllocator(), new EvenShardsCountAllocator(settings));
this(settings, new NoneGatewayAllocator(), new BalancedShardsAllocator(settings));
}
@Inject

View File

@ -0,0 +1,413 @@
package org.elasticsearch.test.unit.cluster.routing.allocation;
import static org.elasticsearch.cluster.ClusterState.newClusterStateBuilder;
import static org.elasticsearch.cluster.metadata.IndexMetaData.newIndexMetaDataBuilder;
import static org.elasticsearch.cluster.metadata.MetaData.newMetaDataBuilder;
import static org.elasticsearch.cluster.node.DiscoveryNodes.newNodesBuilder;
import static org.elasticsearch.cluster.routing.RoutingBuilders.routingTable;
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.elasticsearch.test.unit.cluster.routing.allocation.RoutingAllocationTests.newNode;
import static org.hamcrest.MatcherAssert.assertThat;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.testng.annotations.Test;
public class AddIncrementallyTests {
private final ESLogger logger = Loggers.getLogger(AddIncrementallyTests.class);
@Test
public void testAddNodesAndIndices() {
ImmutableSettings.Builder settings = settingsBuilder();
settings.put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceAllocationDecider.ClusterRebalanceType.ALWAYS.toString());
AllocationService service = new AllocationService(settings.build());
ClusterState clusterState = initCluster(service, 1, 3, 3, 1);
assertThat(clusterState.routingNodes().node("node0").shardsWithState(STARTED).size(), Matchers.equalTo(9));
assertThat(clusterState.routingNodes().getUnassigned().size(), Matchers.equalTo(9));
int nodeOffset = 1;
clusterState = addNodes(clusterState, service, 1, nodeOffset++);
assertThat(clusterState.routingNodes().node("node0").shardsWithState(STARTED).size(), Matchers.equalTo(9));
assertThat(clusterState.routingNodes().node("node1").shardsWithState(STARTED).size(), Matchers.equalTo(9));
assertThat(clusterState.routingNodes().getUnassigned().size(), Matchers.equalTo(0));
assertNumIndexShardsPerNode(clusterState, Matchers.equalTo(3));
clusterState = addNodes(clusterState, service, 1, nodeOffset++);
assertNumIndexShardsPerNode(clusterState, Matchers.equalTo(2));
clusterState = addNodes(clusterState, service, 1, nodeOffset++);
assertNumIndexShardsPerNode(clusterState, Matchers.lessThanOrEqualTo(2));
assertAtLeastOneIndexShardPerNode(clusterState);
clusterState = removeNodes(clusterState, service, 1);
assertNumIndexShardsPerNode(clusterState, Matchers.equalTo(2));
clusterState = addIndex(clusterState, service, 3, 2, 3);
assertThat(clusterState.routingNodes().getUnassigned().size(), Matchers.equalTo(2));
assertNumIndexShardsPerNode(clusterState, "test3", Matchers.equalTo(2));
assertNumIndexShardsPerNode(clusterState, Matchers.lessThanOrEqualTo(2));
clusterState = addIndex(clusterState, service, 4, 2, 3);
assertThat(clusterState.routingNodes().getUnassigned().size(), Matchers.equalTo(4));
assertNumIndexShardsPerNode(clusterState, "test4", Matchers.equalTo(2));
assertNumIndexShardsPerNode(clusterState, Matchers.lessThanOrEqualTo(2));
clusterState = addNodes(clusterState, service, 1, nodeOffset++);
assertNumIndexShardsPerNode(clusterState, Matchers.lessThanOrEqualTo(2));
assertThat(clusterState.routingNodes().getUnassigned().size(), Matchers.equalTo(0));
clusterState = removeNodes(clusterState, service, 1);
assertThat(clusterState.routingNodes().getUnassigned().size(), Matchers.equalTo(4));
assertNumIndexShardsPerNode(clusterState, Matchers.lessThanOrEqualTo(2));
clusterState = addNodes(clusterState, service, 1, nodeOffset++);
assertNumIndexShardsPerNode(clusterState, Matchers.lessThanOrEqualTo(2));
assertThat(clusterState.routingNodes().getUnassigned().size(), Matchers.equalTo(0));
logger.debug("ClusterState: {}", clusterState.getRoutingNodes().prettyPrint());
}
@Test
public void testMinimalRelocations() {
ImmutableSettings.Builder settings = settingsBuilder();
settings.put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceAllocationDecider.ClusterRebalanceType.ALWAYS.toString())
.put("cluster.routing.allocation.node_concurrent_recoveries", 2);
AllocationService service = new AllocationService(settings.build());
ClusterState clusterState = initCluster(service, 1, 3, 3, 1);
assertThat(clusterState.routingNodes().node("node0").shardsWithState(STARTED).size(), Matchers.equalTo(9));
assertThat(clusterState.routingNodes().getUnassigned().size(), Matchers.equalTo(9));
int nodeOffset = 1;
clusterState = addNodes(clusterState, service, 1, nodeOffset++);
assertThat(clusterState.routingNodes().node("node0").shardsWithState(STARTED).size(), Matchers.equalTo(9));
assertThat(clusterState.routingNodes().node("node1").shardsWithState(STARTED).size(), Matchers.equalTo(9));
assertThat(clusterState.routingNodes().getUnassigned().size(), Matchers.equalTo(0));
assertNumIndexShardsPerNode(clusterState, Matchers.equalTo(3));
logger.info("now, start one more node, check that rebalancing will happen because we set it to always");
DiscoveryNodes.Builder nodes = newNodesBuilder().putAll(clusterState.nodes());
nodes.put(newNode("node2"));
clusterState = newClusterStateBuilder().state(clusterState).nodes(nodes.build()).build();
RoutingTable routingTable = service.reroute(clusterState).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
RoutingNodes routingNodes = clusterState.routingNodes();
assertThat(clusterState.routingNodes().node("node2").shardsWithState(INITIALIZING).size(), Matchers.equalTo(2));
assertThat(clusterState.routingNodes().node("node0").shardsWithState(INITIALIZING).size(), Matchers.equalTo(0));
assertThat(clusterState.routingNodes().node("node1").shardsWithState(INITIALIZING).size(), Matchers.equalTo(0));
RoutingTable prev = routingTable;
routingTable = service.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
assertThat(prev, Matchers.not(Matchers.sameInstance(routingTable)));
assertThat(clusterState.routingNodes().node("node2").shardsWithState(STARTED).size(), Matchers.equalTo(2));
assertThat(clusterState.routingNodes().node("node2").shardsWithState(INITIALIZING).size(), Matchers.equalTo(2));
assertThat(clusterState.routingNodes().node("node0").shardsWithState(INITIALIZING).size(), Matchers.equalTo(0));
assertThat(clusterState.routingNodes().node("node1").shardsWithState(INITIALIZING).size(), Matchers.equalTo(0));
assertThat(prev, Matchers.not(Matchers.sameInstance(routingTable)));
prev = routingTable;
routingTable = service.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
assertThat(clusterState.routingNodes().node("node2").shardsWithState(STARTED).size(), Matchers.equalTo(4));
assertThat(clusterState.routingNodes().node("node2").shardsWithState(INITIALIZING).size(), Matchers.equalTo(2));
assertThat(clusterState.routingNodes().node("node0").shardsWithState(INITIALIZING).size(), Matchers.equalTo(0));
assertThat(clusterState.routingNodes().node("node1").shardsWithState(INITIALIZING).size(), Matchers.equalTo(0));
assertThat(prev, Matchers.not(Matchers.sameInstance(routingTable)));
prev = routingTable;
routingTable = service.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
assertThat(clusterState.routingNodes().node("node2").shardsWithState(STARTED).size(), Matchers.equalTo(6));
assertThat(clusterState.routingNodes().node("node2").shardsWithState(INITIALIZING).size(), Matchers.equalTo(0));
assertThat(clusterState.routingNodes().node("node0").shardsWithState(INITIALIZING).size(), Matchers.equalTo(0));
assertThat(clusterState.routingNodes().node("node1").shardsWithState(INITIALIZING).size(), Matchers.equalTo(0));
assertThat(prev, Matchers.not(Matchers.sameInstance(routingTable)));
prev = routingTable;
routingTable = service.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
assertThat(prev, Matchers.sameInstance(routingTable));
assertNumIndexShardsPerNode(clusterState, Matchers.equalTo(2));
logger.debug("ClusterState: {}", clusterState.getRoutingNodes().prettyPrint());
}
@Test
public void testMinimalRelocationsNoLimit() {
ImmutableSettings.Builder settings = settingsBuilder();
settings.put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceAllocationDecider.ClusterRebalanceType.ALWAYS.toString())
.put("cluster.routing.allocation.node_concurrent_recoveries", 100)
.put("cluster.routing.allocation.node_initial_primaries_recoveries", 100);
AllocationService service = new AllocationService(settings.build());
ClusterState clusterState = initCluster(service, 1, 3, 3, 1);
assertThat(clusterState.routingNodes().node("node0").shardsWithState(STARTED).size(), Matchers.equalTo(9));
assertThat(clusterState.routingNodes().getUnassigned().size(), Matchers.equalTo(9));
int nodeOffset = 1;
clusterState = addNodes(clusterState, service, 1, nodeOffset++);
assertThat(clusterState.routingNodes().node("node0").shardsWithState(STARTED).size(), Matchers.equalTo(9));
assertThat(clusterState.routingNodes().node("node1").shardsWithState(STARTED).size(), Matchers.equalTo(9));
assertThat(clusterState.routingNodes().getUnassigned().size(), Matchers.equalTo(0));
assertNumIndexShardsPerNode(clusterState, Matchers.equalTo(3));
logger.info("now, start one more node, check that rebalancing will happen because we set it to always");
DiscoveryNodes.Builder nodes = newNodesBuilder().putAll(clusterState.nodes());
nodes.put(newNode("node2"));
clusterState = newClusterStateBuilder().state(clusterState).nodes(nodes.build()).build();
RoutingTable routingTable = service.reroute(clusterState).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
RoutingNodes routingNodes = clusterState.routingNodes();
assertThat(clusterState.routingNodes().node("node2").shardsWithState(INITIALIZING).size(), Matchers.equalTo(2));
assertThat(clusterState.routingNodes().node("node0").shardsWithState(INITIALIZING).size(), Matchers.equalTo(0));
assertThat(clusterState.routingNodes().node("node1").shardsWithState(INITIALIZING).size(), Matchers.equalTo(0));
RoutingTable prev = routingTable;
routingTable = service.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
assertThat(prev, Matchers.not(Matchers.sameInstance(routingTable)));
assertThat(clusterState.routingNodes().node("node2").shardsWithState(STARTED).size(), Matchers.equalTo(2));
assertThat(clusterState.routingNodes().node("node2").shardsWithState(INITIALIZING).size(), Matchers.equalTo(2));
assertThat(clusterState.routingNodes().node("node0").shardsWithState(INITIALIZING).size(), Matchers.equalTo(0));
assertThat(clusterState.routingNodes().node("node1").shardsWithState(INITIALIZING).size(), Matchers.equalTo(0));
assertThat(prev, Matchers.not(Matchers.sameInstance(routingTable)));
prev = routingTable;
routingTable = service.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
assertThat(clusterState.routingNodes().node("node2").shardsWithState(STARTED).size(), Matchers.equalTo(4));
assertThat(clusterState.routingNodes().node("node2").shardsWithState(INITIALIZING).size(), Matchers.equalTo(2));
assertThat(clusterState.routingNodes().node("node0").shardsWithState(INITIALIZING).size(), Matchers.equalTo(0));
assertThat(clusterState.routingNodes().node("node1").shardsWithState(INITIALIZING).size(), Matchers.equalTo(0));
assertThat(prev, Matchers.not(Matchers.sameInstance(routingTable)));
prev = routingTable;
routingTable = service.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
assertThat(clusterState.routingNodes().node("node2").shardsWithState(STARTED).size(), Matchers.equalTo(6));
assertThat(clusterState.routingNodes().node("node2").shardsWithState(INITIALIZING).size(), Matchers.equalTo(0));
assertThat(clusterState.routingNodes().node("node0").shardsWithState(INITIALIZING).size(), Matchers.equalTo(0));
assertThat(clusterState.routingNodes().node("node1").shardsWithState(INITIALIZING).size(), Matchers.equalTo(0));
assertThat(prev, Matchers.not(Matchers.sameInstance(routingTable)));
prev = routingTable;
routingTable = service.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
assertThat(prev, Matchers.sameInstance(routingTable));
assertNumIndexShardsPerNode(clusterState, Matchers.equalTo(2));
logger.debug("ClusterState: {}", clusterState.getRoutingNodes().prettyPrint());
}
private void assertNumIndexShardsPerNode(ClusterState state, Matcher<Integer> matcher) {
for (String index : state.routingTable().indicesRouting().keySet()) {
assertNumIndexShardsPerNode(state, index, matcher);
}
}
private void assertNumIndexShardsPerNode(ClusterState state, String index, Matcher<Integer> matcher) {
for (RoutingNode node : state.routingNodes()) {
assertThat(node.shardsWithState(index, STARTED).size(), matcher);
}
}
private void assertAtLeastOneIndexShardPerNode(ClusterState state) {
for (String index : state.routingTable().indicesRouting().keySet()) {
for (RoutingNode node : state.routingNodes()) {
assertThat(node.shardsWithState(index, STARTED).size(), Matchers.greaterThanOrEqualTo(1));
}
}
}
private ClusterState addNodes(ClusterState clusterState, AllocationService service, int numNodes, int nodeOffset) {
logger.info("now, start [{}] more node, check that rebalancing will happen because we set it to always", numNodes);
DiscoveryNodes.Builder nodes = newNodesBuilder().putAll(clusterState.nodes());
for (int i = 0; i < numNodes; i++) {
nodes.put(newNode("node" + (i + nodeOffset)));
}
clusterState = newClusterStateBuilder().state(clusterState).nodes(nodes.build()).build();
RoutingTable routingTable = service.reroute(clusterState).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
RoutingNodes routingNodes = clusterState.routingNodes();
// move initializing to started
RoutingTable prev = routingTable;
while (true) {
logger.debug("ClusterState: {}", clusterState.getRoutingNodes().prettyPrint());
routingTable = service.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
if (routingTable == prev)
break;
prev = routingTable;
}
return clusterState;
}
private ClusterState initCluster(AllocationService service, int numberOfNodes, int numberOfIndices, int numberOfShards,
int numberOfReplicas) {
MetaData.Builder metaDataBuilder = newMetaDataBuilder();
RoutingTable.Builder routingTableBuilder = routingTable();
for (int i = 0; i < numberOfIndices; i++) {
IndexMetaData.Builder index = newIndexMetaDataBuilder("test" + i).numberOfShards(numberOfShards).numberOfReplicas(
numberOfReplicas);
metaDataBuilder = metaDataBuilder.put(index);
}
MetaData metaData = metaDataBuilder.build();
for (IndexMetaData index : metaData.indices().values()) {
routingTableBuilder.addAsNew(index);
}
RoutingTable routingTable = routingTableBuilder.build();
logger.info("start " + numberOfNodes + " nodes");
DiscoveryNodes.Builder nodes = newNodesBuilder();
for (int i = 0; i < numberOfNodes; i++) {
nodes.put(newNode("node" + i));
}
ClusterState clusterState = newClusterStateBuilder().nodes(nodes).metaData(metaData).routingTable(routingTable).build();
routingTable = service.reroute(clusterState).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
RoutingNodes routingNodes = clusterState.routingNodes();
logger.info("restart all the primary shards, replicas will start initializing");
routingNodes = clusterState.routingNodes();
routingTable = service.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
logger.info("start the replica shards");
routingNodes = clusterState.routingNodes();
routingTable = service.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
logger.info("complete rebalancing");
RoutingTable prev = routingTable;
while (true) {
routingTable = service.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
if (routingTable == prev)
break;
prev = routingTable;
}
return clusterState;
}
private ClusterState addIndex(ClusterState clusterState, AllocationService service, int indexOrdinal, int numberOfShards,
int numberOfReplicas) {
MetaData.Builder metaDataBuilder = newMetaDataBuilder().metaData(clusterState.getMetaData());
RoutingTable.Builder routingTableBuilder = routingTable().routingTable(clusterState.routingTable());
IndexMetaData.Builder index = newIndexMetaDataBuilder("test" + indexOrdinal).numberOfShards(numberOfShards).numberOfReplicas(
numberOfReplicas);
metaDataBuilder = metaDataBuilder.put(index);
routingTableBuilder.addAsNew(index.build());
MetaData metaData = metaDataBuilder.build();
RoutingTable routingTable = routingTableBuilder.build();
clusterState = newClusterStateBuilder().state(clusterState).metaData(metaData).routingTable(routingTable).build();
routingTable = service.reroute(clusterState).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
RoutingNodes routingNodes = clusterState.routingNodes();
logger.info("restart all the primary shards, replicas will start initializing");
routingNodes = clusterState.routingNodes();
routingTable = service.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
logger.info("start the replica shards");
routingNodes = clusterState.routingNodes();
routingTable = service.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
logger.info("complete rebalancing");
RoutingTable prev = routingTable;
while (true) {
routingTable = service.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
if (routingTable == prev)
break;
prev = routingTable;
}
return clusterState;
}
private ClusterState removeNodes(ClusterState clusterState, AllocationService service, int numNodes) {
logger.info("Removing [{}] nodes", numNodes);
DiscoveryNodes.Builder nodes = newNodesBuilder().putAll(clusterState.nodes());
for (DiscoveryNode node : clusterState.nodes()) {
nodes.remove(node.id());
numNodes--;
if (numNodes <= 0) {
break;
}
}
clusterState = newClusterStateBuilder().state(clusterState).nodes(nodes.build()).build();
RoutingNodes routingNodes = clusterState.routingNodes();
logger.info("start all the primary shards, replicas will start initializing");
RoutingTable routingTable = service.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
logger.info("start the replica shards");
routingTable = service.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
logger.info("rebalancing");
routingTable = service.reroute(clusterState).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
logger.info("complete rebalancing");
RoutingTable prev = routingTable;
while(true) {
routingTable = service.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
if (routingTable == prev)
break;
prev = routingTable;
}
return clusterState;
}
}

View File

@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableMap;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.common.logging.ESLogger;
@ -55,7 +56,7 @@ public class AwarenessAllocationTests {
.put("cluster.routing.allocation.awareness.attributes", "rack_id")
.build());
logger.info("Building initial routing table");
logger.info("Building initial routing table for 'moveShardOnceNewNodeWithAttributeAdded1'");
MetaData metaData = newMetaDataBuilder()
.put(newIndexMetaDataBuilder("test").numberOfShards(1).numberOfReplicas(1))
@ -124,7 +125,7 @@ public class AwarenessAllocationTests {
.put("cluster.routing.allocation.awareness.attributes", "rack_id")
.build());
logger.info("Building initial routing table");
logger.info("Building initial routing table for 'moveShardOnceNewNodeWithAttributeAdded2'");
MetaData metaData = newMetaDataBuilder()
.put(newIndexMetaDataBuilder("test").numberOfShards(1).numberOfReplicas(1))
@ -194,9 +195,12 @@ public class AwarenessAllocationTests {
.put("cluster.routing.allocation.allow_rebalance", "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
.put("cluster.routing.allocation.awareness.attributes", "rack_id")
.put("cluster.routing.allocation.balance.index", 0.0f)
.put("cluster.routing.allocation.balance.replica", 1.0f)
.put("cluster.routing.allocation.balance.primary", 0.0f)
.build());
logger.info("Building initial routing table");
logger.info("Building initial routing table for 'moveShardOnceNewNodeWithAttributeAdded3'");
MetaData metaData = newMetaDataBuilder()
.put(newIndexMetaDataBuilder("test").numberOfShards(5).numberOfReplicas(1))
@ -215,6 +219,20 @@ public class AwarenessAllocationTests {
).build();
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
for (ShardRouting shard : clusterState.routingNodes().shardsWithState(INITIALIZING)) {
logger.info(shard.toString());
}
for (ShardRouting shard : clusterState.routingNodes().shardsWithState(STARTED)) {
logger.info(shard.toString());
}
for (ShardRouting shard : clusterState.routingNodes().shardsWithState(RELOCATING)) {
logger.info(shard.toString());
}
for (ShardRouting shard : clusterState.routingNodes().shardsWithState(UNASSIGNED)) {
logger.info(shard.toString());
}
assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(5));
logger.info("--> start the shards (primaries)");
@ -280,7 +298,7 @@ public class AwarenessAllocationTests {
.put("cluster.routing.allocation.awareness.attributes", "rack_id")
.build());
logger.info("Building initial routing table");
logger.info("Building initial routing table for 'moveShardOnceNewNodeWithAttributeAdded4'");
MetaData metaData = newMetaDataBuilder()
.put(newIndexMetaDataBuilder("test1").numberOfShards(5).numberOfReplicas(1))
@ -364,7 +382,7 @@ public class AwarenessAllocationTests {
.put("cluster.routing.allocation.awareness.attributes", "rack_id")
.build());
logger.info("Building initial routing table");
logger.info("Building initial routing table for 'moveShardOnceNewNodeWithAttributeAdded5'");
MetaData metaData = newMetaDataBuilder()
.put(newIndexMetaDataBuilder("test").numberOfShards(1).numberOfReplicas(2))
@ -443,7 +461,7 @@ public class AwarenessAllocationTests {
.put("cluster.routing.allocation.awareness.attributes", "rack_id")
.build());
logger.info("Building initial routing table");
logger.info("Building initial routing table for 'moveShardOnceNewNodeWithAttributeAdded6'");
MetaData metaData = newMetaDataBuilder()
.put(newIndexMetaDataBuilder("test").numberOfShards(1).numberOfReplicas(3))
@ -525,7 +543,7 @@ public class AwarenessAllocationTests {
.put("cluster.routing.allocation.awareness.attributes", "rack_id")
.build());
logger.info("Building initial routing table");
logger.info("Building initial routing table for 'fullAwareness1'");
MetaData metaData = newMetaDataBuilder()
.put(newIndexMetaDataBuilder("test").numberOfShards(1).numberOfReplicas(1))
@ -593,7 +611,7 @@ public class AwarenessAllocationTests {
.put("cluster.routing.allocation.awareness.attributes", "rack_id")
.build());
logger.info("Building initial routing table");
logger.info("Building initial routing table for 'fullAwareness2'");
MetaData metaData = newMetaDataBuilder()
.put(newIndexMetaDataBuilder("test").numberOfShards(1).numberOfReplicas(1))
@ -662,9 +680,12 @@ public class AwarenessAllocationTests {
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
.put("cluster.routing.allocation.awareness.force.rack_id.values", "1,2")
.put("cluster.routing.allocation.awareness.attributes", "rack_id")
.put("cluster.routing.allocation.balance.index", 0.0f)
.put("cluster.routing.allocation.balance.replica", 1.0f)
.put("cluster.routing.allocation.balance.primary", 0.0f)
.build());
logger.info("Building initial routing table");
logger.info("Building initial routing table for 'fullAwareness3'");
MetaData metaData = newMetaDataBuilder()
.put(newIndexMetaDataBuilder("test1").numberOfShards(5).numberOfReplicas(1))

View File

@ -0,0 +1,311 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test.unit.cluster.routing.allocation;
import static org.elasticsearch.cluster.ClusterState.newClusterStateBuilder;
import static org.elasticsearch.cluster.metadata.IndexMetaData.newIndexMetaDataBuilder;
import static org.elasticsearch.cluster.metadata.MetaData.newMetaDataBuilder;
import static org.elasticsearch.cluster.node.DiscoveryNodes.newNodesBuilder;
import static org.elasticsearch.cluster.routing.RoutingBuilders.routingTable;
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.elasticsearch.test.unit.cluster.routing.allocation.RoutingAllocationTests.newNode;
import static org.hamcrest.MatcherAssert.assertThat;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.hamcrest.Matchers;
import org.testng.annotations.Test;
public class BalanceConfigurationTests {
private final ESLogger logger = Loggers.getLogger(BalanceConfigurationTests.class);
// TODO maybe we can randomize these numbers somehow
final int numberOfNodes = 25;
final int numberOfIndices = 12;
final int numberOfShards = 2;
final int numberOfReplicas = 2;
@Test
public void testIndexBalance() {
/* Tests balance over indices only */
final float indexBalance = 1.0f;
final float replicaBalance = 0.0f;
final float primaryBalance = 0.0f;
final float balanceTreshold = 1.0f;
ImmutableSettings.Builder settings = settingsBuilder();
settings.put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceAllocationDecider.ClusterRebalanceType.ALWAYS.toString());
settings.put(BalancedShardsAllocator.SETTING_INDEX_BALANCE_FACTOR, indexBalance);
settings.put(BalancedShardsAllocator.SETTING_SHARD_BALANCE_FACTOR, replicaBalance);
settings.put(BalancedShardsAllocator.SETTING_PRIMARY_BALANCE_FACTOR, primaryBalance);
settings.put(BalancedShardsAllocator.SETTING_THRESHOLD, balanceTreshold);
AllocationService strategy = new AllocationService(settings.build());
ClusterState clusterState = initCluster(strategy);
assertIndexBalance(logger, clusterState.getRoutingNodes(), numberOfNodes, numberOfIndices, numberOfReplicas, numberOfShards, balanceTreshold);
clusterState = addNode(clusterState, strategy);
assertIndexBalance(logger, clusterState.getRoutingNodes(), numberOfNodes+1, numberOfIndices, numberOfReplicas, numberOfShards, balanceTreshold);
clusterState = removeNodes(clusterState, strategy);
assertIndexBalance(logger, clusterState.getRoutingNodes(), (numberOfNodes+1)-(numberOfNodes+1)/2, numberOfIndices, numberOfReplicas, numberOfShards, balanceTreshold);
}
@Test
public void testReplicaBalance() {
/* Tests balance over replicas only */
final float indexBalance = 0.0f;
final float replicaBalance = 1.0f;
final float primaryBalance = 0.0f;
final float balanceTreshold = 1.0f;
ImmutableSettings.Builder settings = settingsBuilder();
settings.put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceAllocationDecider.ClusterRebalanceType.ALWAYS.toString());
settings.put(BalancedShardsAllocator.SETTING_INDEX_BALANCE_FACTOR, indexBalance);
settings.put(BalancedShardsAllocator.SETTING_SHARD_BALANCE_FACTOR, replicaBalance);
settings.put(BalancedShardsAllocator.SETTING_PRIMARY_BALANCE_FACTOR, primaryBalance);
settings.put(BalancedShardsAllocator.SETTING_THRESHOLD, balanceTreshold);
AllocationService strategy = new AllocationService(settings.build());
ClusterState clusterState = initCluster(strategy);
assertReplicaBalance(logger, clusterState.getRoutingNodes(), numberOfNodes, numberOfIndices, numberOfReplicas, numberOfShards, balanceTreshold);
clusterState = addNode(clusterState, strategy);
assertReplicaBalance(logger, clusterState.getRoutingNodes(), numberOfNodes+1, numberOfIndices, numberOfReplicas, numberOfShards, balanceTreshold);
clusterState = removeNodes(clusterState, strategy);
assertReplicaBalance(logger, clusterState.getRoutingNodes(), (numberOfNodes+1)-(numberOfNodes+1)/2, numberOfIndices, numberOfReplicas, numberOfShards, balanceTreshold);
}
@Test
public void testPrimaryBalance() {
/* Tests balance over primaries only */
final float indexBalance = 0.0f;
final float replicaBalance = 0.0f;
final float primaryBalance = 1.0f;
final float balanceTreshold = 1.0f;
ImmutableSettings.Builder settings = settingsBuilder();
settings.put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceAllocationDecider.ClusterRebalanceType.ALWAYS.toString());
settings.put(BalancedShardsAllocator.SETTING_INDEX_BALANCE_FACTOR, indexBalance);
settings.put(BalancedShardsAllocator.SETTING_SHARD_BALANCE_FACTOR, replicaBalance);
settings.put(BalancedShardsAllocator.SETTING_PRIMARY_BALANCE_FACTOR, primaryBalance);
settings.put(BalancedShardsAllocator.SETTING_THRESHOLD, balanceTreshold);
AllocationService strategy = new AllocationService(settings.build());
ClusterState clusterstate = initCluster(strategy);
assertPrimaryBalance(logger, clusterstate.getRoutingNodes(), numberOfNodes, numberOfIndices, numberOfReplicas, numberOfShards, balanceTreshold);
clusterstate = addNode(clusterstate, strategy);
assertPrimaryBalance(logger, clusterstate.getRoutingNodes(), numberOfNodes+1, numberOfIndices, numberOfReplicas, numberOfShards, balanceTreshold);
clusterstate = removeNodes(clusterstate, strategy);
assertPrimaryBalance(logger, clusterstate.getRoutingNodes(), numberOfNodes+1-(numberOfNodes+1)/2, numberOfIndices, numberOfReplicas, numberOfShards, balanceTreshold);
}
private ClusterState initCluster(AllocationService strategy) {
MetaData.Builder metaDataBuilder = newMetaDataBuilder();
RoutingTable.Builder routingTableBuilder = routingTable();
for (int i = 0; i < numberOfIndices; i++) {
IndexMetaData.Builder index = newIndexMetaDataBuilder("test"+i).numberOfShards(numberOfShards).numberOfReplicas(numberOfReplicas);
metaDataBuilder = metaDataBuilder.put(index);
}
MetaData metaData = metaDataBuilder.build();
for (IndexMetaData index : metaData.indices().values()) {
routingTableBuilder.addAsNew(index);
}
RoutingTable routingTable = routingTableBuilder.build();
logger.info("start "+numberOfNodes+" nodes");
DiscoveryNodes.Builder nodes = newNodesBuilder();
for (int i = 0; i < numberOfNodes; i++) {
nodes.put(newNode("node"+i));
}
ClusterState clusterState = newClusterStateBuilder().nodes(nodes).metaData(metaData).routingTable(routingTable).build();
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
RoutingNodes routingNodes = clusterState.routingNodes();
logger.info("restart all the primary shards, replicas will start initializing");
routingNodes = clusterState.routingNodes();
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
logger.info("start the replica shards");
routingNodes = clusterState.routingNodes();
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
logger.info("complete rebalancing");
RoutingTable prev = routingTable;
while(true) {
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
if (routingTable == prev)
break;
prev = routingTable;
}
return clusterState;
}
private ClusterState addNode(ClusterState clusterState, AllocationService strategy) {
logger.info("now, start 1 more node, check that rebalancing will happen because we set it to always");
clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().putAll(clusterState.nodes())
.put(newNode("node"+numberOfNodes)))
.build();
RoutingTable routingTable = strategy.reroute(clusterState).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
RoutingNodes routingNodes = clusterState.routingNodes();
// move initializing to started
RoutingTable prev = routingTable;
while(true) {
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
if (routingTable == prev)
break;
prev = routingTable;
}
return clusterState;
}
private ClusterState removeNodes(ClusterState clusterState, AllocationService strategy) {
logger.info("Removing half the nodes ("+(numberOfNodes+1)/2+")");
DiscoveryNodes.Builder nodes = newNodesBuilder().putAll(clusterState.nodes());
for(int i=(numberOfNodes+1)/2; i<=numberOfNodes; i++){
nodes.remove("node"+i);
}
clusterState = newClusterStateBuilder().state(clusterState).nodes(nodes.build()).build();
RoutingNodes routingNodes = clusterState.routingNodes();
logger.info("start all the primary shards, replicas will start initializing");
RoutingTable routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
logger.info("start the replica shards");
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
logger.info("rebalancing");
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
logger.info("complete rebalancing");
RoutingTable prev = routingTable;
while(true) {
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
if (routingTable == prev)
break;
prev = routingTable;
}
return clusterState;
}
private void assertReplicaBalance(ESLogger logger, RoutingNodes nodes, int numberOfNodes, int numberOfIndices, int numberOfReplicas, int numberOfShards, float treshold) {
final int numShards = numberOfIndices * numberOfShards * (numberOfReplicas+1);
final float avgNumShards = (float)(numShards) / (float)(numberOfNodes);
final int minAvgNumberOfShards = Math.round(Math.round(Math.floor(avgNumShards-treshold)));
final int maxAvgNumberOfShards = Math.round(Math.round(Math.ceil(avgNumShards+treshold)));
for (RoutingNode node : nodes) {
// logger.info(node.nodeId() + ": " + node.shardsWithState(INITIALIZING, STARTED).size() + " shards ("+minAvgNumberOfShards+" to "+maxAvgNumberOfShards+")");
assertThat(node.shardsWithState(STARTED).size(), Matchers.greaterThanOrEqualTo(minAvgNumberOfShards));
assertThat(node.shardsWithState(STARTED).size(), Matchers.lessThanOrEqualTo(maxAvgNumberOfShards));
}
}
private void assertIndexBalance(ESLogger logger, RoutingNodes nodes, int numberOfNodes, int numberOfIndices, int numberOfReplicas, int numberOfShards, float treshold) {
final int numShards = numberOfShards * (numberOfReplicas+1);
final float avgNumShards = (float)(numShards) / (float)(numberOfNodes);
final int minAvgNumberOfShards = Math.round(Math.round(Math.floor(avgNumShards-treshold)));
final int maxAvgNumberOfShards = Math.round(Math.round(Math.ceil(avgNumShards+treshold)));
for(String index : nodes.getRoutingTable().indicesRouting().keySet()) {
for (RoutingNode node : nodes) {
// logger.info(node.nodeId() +":"+index+ ": " + node.shardsWithState(index, INITIALIZING, STARTED).size() + " shards ("+minAvgNumberOfShards+" to "+maxAvgNumberOfShards+")");
assertThat(node.shardsWithState(index, STARTED).size(), Matchers.greaterThanOrEqualTo(minAvgNumberOfShards));
assertThat(node.shardsWithState(index, STARTED).size(), Matchers.lessThanOrEqualTo(maxAvgNumberOfShards));
}
}
}
private void assertPrimaryBalance(ESLogger logger, RoutingNodes nodes, int numberOfNodes, int numberOfIndices, int numberOfReplicas, int numberOfShards, float treshold) {
final int numShards = numberOfShards;
final float avgNumShards = (float)(numShards) / (float)(numberOfNodes);
final int minAvgNumberOfShards = Math.round(Math.round(Math.floor(avgNumShards-treshold)));
final int maxAvgNumberOfShards = Math.round(Math.round(Math.ceil(avgNumShards+treshold)));
for(String index : nodes.getRoutingTable().indicesRouting().keySet()) {
for (RoutingNode node : nodes) {
int primaries = 0;
for(ShardRouting shard : node.shardsWithState(index, STARTED)) {
primaries += shard.primary()?1:0;
}
// logger.info(node.nodeId() + ": " + primaries + " primaries ("+minAvgNumberOfShards+" to "+maxAvgNumberOfShards+")");
assertThat(primaries, Matchers.greaterThanOrEqualTo(minAvgNumberOfShards));
assertThat(primaries, Matchers.lessThanOrEqualTo(maxAvgNumberOfShards));
}
}
}
}

View File

@ -88,7 +88,7 @@ public class ClusterRebalanceRoutingTests {
for (int i = 0; i < routingTable.index("test1").shards().size(); i++) {
assertThat(routingTable.index("test1").shard(i).shards().size(), equalTo(2));
assertThat(routingTable.index("test1").shard(i).primaryShard().state(), equalTo(STARTED));
// assertThat(routingTable.index("test1").shard(i).primaryShard().state(), equalTo(STARTED));
assertThat(routingTable.index("test1").shard(i).replicaShards().get(0).state(), equalTo(INITIALIZING));
}

View File

@ -0,0 +1,543 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test.unit.cluster.routing.allocation;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.testng.annotations.Test;
import static org.elasticsearch.cluster.ClusterState.newClusterStateBuilder;
import static org.elasticsearch.cluster.metadata.IndexMetaData.newIndexMetaDataBuilder;
import static org.elasticsearch.cluster.metadata.MetaData.newMetaDataBuilder;
import static org.elasticsearch.cluster.node.DiscoveryNodes.newNodesBuilder;
import static org.elasticsearch.cluster.routing.RoutingBuilders.routingTable;
import static org.elasticsearch.cluster.routing.ShardRoutingState.*;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.elasticsearch.test.unit.cluster.routing.allocation.RoutingAllocationTests.newNode;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;
/**
*
*/
public class IndexBalanceTests {
private final ESLogger logger = Loggers.getLogger(IndexBalanceTests.class);
@Test
public void testBalanceAllNodesStarted() {
AllocationService strategy = new AllocationService(settingsBuilder()
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.put("cluster.routing.allocation.node_initial_primaries_recoveries", 10)
.put("cluster.routing.allocation.allow_rebalance", "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1).build());
logger.info("Building initial routing table");
MetaData metaData = newMetaDataBuilder().put(newIndexMetaDataBuilder("test").numberOfShards(3).numberOfReplicas(1))
.put(newIndexMetaDataBuilder("test1").numberOfShards(3).numberOfReplicas(1)).build();
RoutingTable routingTable = routingTable().addAsNew(metaData.index("test")).addAsNew(metaData.index("test1")).build();
ClusterState clusterState = newClusterStateBuilder().metaData(metaData).routingTable(routingTable).build();
assertThat(routingTable.index("test").shards().size(), equalTo(3));
for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
assertThat(routingTable.index("test").shard(i).size(), equalTo(2));
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2));
assertThat(routingTable.index("test").shard(i).shards().get(0).state(), equalTo(UNASSIGNED));
assertThat(routingTable.index("test").shard(i).shards().get(1).state(), equalTo(UNASSIGNED));
assertThat(routingTable.index("test").shard(i).shards().get(0).currentNodeId(), nullValue());
assertThat(routingTable.index("test").shard(i).shards().get(1).currentNodeId(), nullValue());
}
assertThat(routingTable.index("test1").shards().size(), equalTo(3));
for (int i = 0; i < routingTable.index("test1").shards().size(); i++) {
assertThat(routingTable.index("test1").shard(i).size(), equalTo(2));
assertThat(routingTable.index("test1").shard(i).shards().size(), equalTo(2));
assertThat(routingTable.index("test1").shard(i).shards().get(0).state(), equalTo(UNASSIGNED));
assertThat(routingTable.index("test1").shard(i).shards().get(1).state(), equalTo(UNASSIGNED));
assertThat(routingTable.index("test1").shard(i).shards().get(0).currentNodeId(), nullValue());
assertThat(routingTable.index("test1").shard(i).shards().get(1).currentNodeId(), nullValue());
}
logger.info("Adding three node and performing rerouting");
clusterState = newClusterStateBuilder().state(clusterState)
.nodes(newNodesBuilder().put(newNode("node1")).put(newNode("node2")).put(newNode("node3"))).build();
RoutingTable prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(prevRoutingTable != routingTable, equalTo(true));
assertThat(routingTable.index("test").shards().size(), equalTo(3));
for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
assertThat(routingTable.index("test").shard(i).size(), equalTo(2));
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2));
assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(INITIALIZING));
assertThat(routingTable.index("test").shard(i).replicaShards().size(), equalTo(1));
assertThat(routingTable.index("test").shard(i).replicaShards().get(0).state(), equalTo(UNASSIGNED));
assertThat(routingTable.index("test").shard(i).replicaShards().get(0).currentNodeId(), nullValue());
}
logger.info("Another round of rebalancing");
clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().putAll(clusterState.nodes())).build();
prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(prevRoutingTable == routingTable, equalTo(true));
RoutingNodes routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(prevRoutingTable != routingTable, equalTo(true));
assertThat(routingTable.index("test").shards().size(), equalTo(3));
for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
assertThat(routingTable.index("test").shard(i).size(), equalTo(2));
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2));
assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(STARTED));
assertThat(routingTable.index("test").shard(i).replicaShards().size(), equalTo(1));
// backup shards are initializing as well, we make sure that they
// recover from primary *started* shards in the
// IndicesClusterStateService
assertThat(routingTable.index("test").shard(i).replicaShards().get(0).state(), equalTo(INITIALIZING));
}
logger.info("Reroute, nothing should change");
prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState).routingTable();
assertThat(prevRoutingTable == routingTable, equalTo(true));
logger.info("Start the more shards");
routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
assertThat(prevRoutingTable != routingTable, equalTo(true));
assertThat(routingTable.index("test").shards().size(), equalTo(3));
for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
assertThat(routingTable.index("test").shard(i).size(), equalTo(2));
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2));
assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(STARTED));
assertThat(routingTable.index("test").shard(i).replicaShards().size(), equalTo(1));
assertThat(routingTable.index("test").shard(i).replicaShards().get(0).state(), equalTo(STARTED));
}
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
assertThat(prevRoutingTable != routingTable, equalTo(true));
assertThat(routingTable.index("test1").shards().size(), equalTo(3));
for (int i = 0; i < routingTable.index("test1").shards().size(); i++) {
assertThat(routingTable.index("test1").shard(i).size(), equalTo(2));
assertThat(routingTable.index("test1").shard(i).shards().size(), equalTo(2));
assertThat(routingTable.index("test1").shard(i).primaryShard().state(), equalTo(STARTED));
assertThat(routingTable.index("test1").shard(i).replicaShards().size(), equalTo(1));
assertThat(routingTable.index("test1").shard(i).replicaShards().get(0).state(), equalTo(STARTED));
}
assertThat(routingNodes.node("node1").numberOfShardsWithState(STARTED), equalTo(4));
assertThat(routingNodes.node("node2").numberOfShardsWithState(STARTED), equalTo(4));
assertThat(routingNodes.node("node3").numberOfShardsWithState(STARTED), equalTo(4));
assertThat(routingNodes.node("node1").shardsWithState("test", STARTED).size(), equalTo(2));
assertThat(routingNodes.node("node2").shardsWithState("test", STARTED).size(), equalTo(2));
assertThat(routingNodes.node("node3").shardsWithState("test", STARTED).size(), equalTo(2));
assertThat(routingNodes.node("node1").shardsWithState("test1", STARTED).size(), equalTo(2));
assertThat(routingNodes.node("node2").shardsWithState("test1", STARTED).size(), equalTo(2));
assertThat(routingNodes.node("node3").shardsWithState("test1", STARTED).size(), equalTo(2));
}
@Test
public void testBalanceIncrementallyStartNodes() {
AllocationService strategy = new AllocationService(settingsBuilder()
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.put("cluster.routing.allocation.node_initial_primaries_recoveries", 10)
.put("cluster.routing.allocation.allow_rebalance", "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1).build());
logger.info("Building initial routing table");
MetaData metaData = newMetaDataBuilder().put(newIndexMetaDataBuilder("test").numberOfShards(3).numberOfReplicas(1))
.put(newIndexMetaDataBuilder("test1").numberOfShards(3).numberOfReplicas(1)).build();
RoutingTable routingTable = routingTable().addAsNew(metaData.index("test")).addAsNew(metaData.index("test1")).build();
ClusterState clusterState = newClusterStateBuilder().metaData(metaData).routingTable(routingTable).build();
assertThat(routingTable.index("test").shards().size(), equalTo(3));
for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
assertThat(routingTable.index("test").shard(i).size(), equalTo(2));
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2));
assertThat(routingTable.index("test").shard(i).shards().get(0).state(), equalTo(UNASSIGNED));
assertThat(routingTable.index("test").shard(i).shards().get(1).state(), equalTo(UNASSIGNED));
assertThat(routingTable.index("test").shard(i).shards().get(0).currentNodeId(), nullValue());
assertThat(routingTable.index("test").shard(i).shards().get(1).currentNodeId(), nullValue());
}
assertThat(routingTable.index("test1").shards().size(), equalTo(3));
for (int i = 0; i < routingTable.index("test1").shards().size(); i++) {
assertThat(routingTable.index("test1").shard(i).size(), equalTo(2));
assertThat(routingTable.index("test1").shard(i).shards().size(), equalTo(2));
assertThat(routingTable.index("test1").shard(i).shards().get(0).state(), equalTo(UNASSIGNED));
assertThat(routingTable.index("test1").shard(i).shards().get(1).state(), equalTo(UNASSIGNED));
assertThat(routingTable.index("test1").shard(i).shards().get(0).currentNodeId(), nullValue());
assertThat(routingTable.index("test1").shard(i).shards().get(1).currentNodeId(), nullValue());
}
logger.info("Adding one node and performing rerouting");
clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().put(newNode("node1"))).build();
RoutingTable prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(prevRoutingTable != routingTable, equalTo(true));
assertThat(routingTable.index("test").shards().size(), equalTo(3));
for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
assertThat(routingTable.index("test").shard(i).size(), equalTo(2));
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2));
assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(INITIALIZING));
assertThat(routingTable.index("test").shard(i).replicaShards().size(), equalTo(1));
assertThat(routingTable.index("test").shard(i).replicaShards().get(0).state(), equalTo(UNASSIGNED));
assertThat(routingTable.index("test").shard(i).replicaShards().get(0).currentNodeId(), nullValue());
}
logger.info("Add another node and perform rerouting, nothing will happen since primary not started");
clusterState = newClusterStateBuilder().state(clusterState)
.nodes(newNodesBuilder().putAll(clusterState.nodes()).put(newNode("node2"))).build();
prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(prevRoutingTable == routingTable, equalTo(true));
logger.info("Start the primary shard");
RoutingNodes routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(prevRoutingTable != routingTable, equalTo(true));
assertThat(routingTable.index("test").shards().size(), equalTo(3));
for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
assertThat(routingTable.index("test").shard(i).size(), equalTo(2));
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2));
assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(STARTED));
assertThat(routingTable.index("test").shard(i).replicaShards().size(), equalTo(1));
// backup shards are initializing as well, we make sure that they
// recover from primary *started* shards in the
// IndicesClusterStateService
assertThat(routingTable.index("test").shard(i).replicaShards().get(0).state(), equalTo(INITIALIZING));
}
logger.info("Reroute, nothing should change");
prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState).routingTable();
assertThat(prevRoutingTable == routingTable, equalTo(true));
logger.info("Start the backup shard");
routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
assertThat(prevRoutingTable != routingTable, equalTo(true));
assertThat(routingTable.index("test").shards().size(), equalTo(3));
for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
assertThat(routingTable.index("test").shard(i).size(), equalTo(2));
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2));
assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(STARTED));
assertThat(routingTable.index("test").shard(i).replicaShards().size(), equalTo(1));
assertThat(routingTable.index("test").shard(i).replicaShards().get(0).state(), equalTo(STARTED));
}
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
assertThat(prevRoutingTable != routingTable, equalTo(true));
assertThat(routingTable.index("test1").shards().size(), equalTo(3));
for (int i = 0; i < routingTable.index("test1").shards().size(); i++) {
assertThat(routingTable.index("test1").shard(i).size(), equalTo(2));
assertThat(routingTable.index("test1").shard(i).shards().size(), equalTo(2));
assertThat(routingTable.index("test1").shard(i).primaryShard().state(), equalTo(STARTED));
assertThat(routingTable.index("test1").shard(i).replicaShards().size(), equalTo(1));
assertThat(routingTable.index("test1").shard(i).replicaShards().get(0).state(), equalTo(STARTED));
}
logger.info("Add another node and perform rerouting, nothing will happen since primary not started");
clusterState = newClusterStateBuilder().state(clusterState)
.nodes(newNodesBuilder().putAll(clusterState.nodes()).put(newNode("node3"))).build();
prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(prevRoutingTable != routingTable, equalTo(true));
logger.info("Reroute, nothing should change");
prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState).routingTable();
assertThat(prevRoutingTable == routingTable, equalTo(true));
logger.info("Start the backup shard");
routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
assertThat(prevRoutingTable != routingTable, equalTo(true));
assertThat(routingTable.index("test").shards().size(), equalTo(3));
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
assertThat(prevRoutingTable != routingTable, equalTo(true));
assertThat(routingTable.index("test1").shards().size(), equalTo(3));
assertThat(routingNodes.node("node1").numberOfShardsWithState(STARTED), equalTo(4));
assertThat(routingNodes.node("node2").numberOfShardsWithState(STARTED), equalTo(4));
assertThat(routingNodes.node("node3").numberOfShardsWithState(STARTED), equalTo(4));
assertThat(routingNodes.node("node1").shardsWithState("test", STARTED).size(), equalTo(2));
assertThat(routingNodes.node("node2").shardsWithState("test", STARTED).size(), equalTo(2));
assertThat(routingNodes.node("node3").shardsWithState("test", STARTED).size(), equalTo(2));
assertThat(routingNodes.node("node1").shardsWithState("test1", STARTED).size(), equalTo(2));
assertThat(routingNodes.node("node2").shardsWithState("test1", STARTED).size(), equalTo(2));
assertThat(routingNodes.node("node3").shardsWithState("test1", STARTED).size(), equalTo(2));
}
@Test
public void testBalanceAllNodesStartedAddIndex() {
AllocationService strategy = new AllocationService(settingsBuilder()
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.put("cluster.routing.allocation.node_initial_primaries_recoveries", 10)
.put("cluster.routing.allocation.allow_rebalance", "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1).build());
logger.info("Building initial routing table");
MetaData metaData = newMetaDataBuilder().put(newIndexMetaDataBuilder("test").numberOfShards(3).numberOfReplicas(1)).build();
RoutingTable routingTable = routingTable().addAsNew(metaData.index("test")).build();
ClusterState clusterState = newClusterStateBuilder().metaData(metaData).routingTable(routingTable).build();
assertThat(routingTable.index("test").shards().size(), equalTo(3));
for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
assertThat(routingTable.index("test").shard(i).size(), equalTo(2));
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2));
assertThat(routingTable.index("test").shard(i).shards().get(0).state(), equalTo(UNASSIGNED));
assertThat(routingTable.index("test").shard(i).shards().get(1).state(), equalTo(UNASSIGNED));
assertThat(routingTable.index("test").shard(i).shards().get(0).currentNodeId(), nullValue());
assertThat(routingTable.index("test").shard(i).shards().get(1).currentNodeId(), nullValue());
}
logger.info("Adding three node and performing rerouting");
clusterState = newClusterStateBuilder().state(clusterState)
.nodes(newNodesBuilder().put(newNode("node1")).put(newNode("node2")).put(newNode("node3"))).build();
RoutingTable prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(prevRoutingTable != routingTable, equalTo(true));
assertThat(routingTable.index("test").shards().size(), equalTo(3));
for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
assertThat(routingTable.index("test").shard(i).size(), equalTo(2));
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2));
assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(INITIALIZING));
assertThat(routingTable.index("test").shard(i).replicaShards().size(), equalTo(1));
assertThat(routingTable.index("test").shard(i).replicaShards().get(0).state(), equalTo(UNASSIGNED));
assertThat(routingTable.index("test").shard(i).replicaShards().get(0).currentNodeId(), nullValue());
}
logger.info("Another round of rebalancing");
clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().putAll(clusterState.nodes())).build();
prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(prevRoutingTable == routingTable, equalTo(true));
RoutingNodes routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(prevRoutingTable != routingTable, equalTo(true));
assertThat(routingTable.index("test").shards().size(), equalTo(3));
for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
assertThat(routingTable.index("test").shard(i).size(), equalTo(2));
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2));
assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(STARTED));
assertThat(routingTable.index("test").shard(i).replicaShards().size(), equalTo(1));
// backup shards are initializing as well, we make sure that they
// recover from primary *started* shards in the
// IndicesClusterStateService
assertThat(routingTable.index("test").shard(i).replicaShards().get(0).state(), equalTo(INITIALIZING));
}
logger.info("Reroute, nothing should change");
prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState).routingTable();
assertThat(prevRoutingTable == routingTable, equalTo(true));
logger.info("Start the more shards");
routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
assertThat(prevRoutingTable != routingTable, equalTo(true));
assertThat(routingTable.index("test").shards().size(), equalTo(3));
for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
assertThat(routingTable.index("test").shard(i).size(), equalTo(2));
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2));
assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(STARTED));
assertThat(routingTable.index("test").shard(i).replicaShards().size(), equalTo(1));
}
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
assertThat(prevRoutingTable != routingTable, equalTo(true));
assertThat(routingNodes.node("node1").numberOfShardsWithState(STARTED), equalTo(2));
assertThat(routingNodes.node("node2").numberOfShardsWithState(STARTED), equalTo(2));
assertThat(routingNodes.node("node3").numberOfShardsWithState(STARTED), equalTo(2));
assertThat(routingNodes.node("node1").shardsWithState("test", STARTED).size(), equalTo(2));
assertThat(routingNodes.node("node2").shardsWithState("test", STARTED).size(), equalTo(2));
assertThat(routingNodes.node("node3").shardsWithState("test", STARTED).size(), equalTo(2));
logger.info("Add new index 3 shards 1 replica");
prevRoutingTable = routingTable;
metaData = newMetaDataBuilder().metaData(metaData)
.put(newIndexMetaDataBuilder("test1").settings(ImmutableSettings.settingsBuilder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 3)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
))
.build();
routingTable = routingTable().routingTable(routingTable)
.addAsNew(metaData.index("test1"))
.build();
clusterState = newClusterStateBuilder().state(clusterState).metaData(metaData).routingTable(routingTable).build();
assertThat(routingTable.index("test1").shards().size(), equalTo(3));
prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(prevRoutingTable != routingTable, equalTo(true));
assertThat(routingTable.index("test1").shards().size(), equalTo(3));
for (int i = 0; i < routingTable.index("test1").shards().size(); i++) {
assertThat(routingTable.index("test1").shard(i).size(), equalTo(2));
assertThat(routingTable.index("test1").shard(i).shards().size(), equalTo(2));
assertThat(routingTable.index("test1").shard(i).primaryShard().state(), equalTo(INITIALIZING));
assertThat(routingTable.index("test1").shard(i).replicaShards().size(), equalTo(1));
assertThat(routingTable.index("test1").shard(i).replicaShards().get(0).state(), equalTo(UNASSIGNED));
assertThat(routingTable.index("test1").shard(i).replicaShards().get(0).currentNodeId(), nullValue());
}
logger.info("Another round of rebalancing");
clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().putAll(clusterState.nodes())).build();
prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(prevRoutingTable == routingTable, equalTo(true));
routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(prevRoutingTable != routingTable, equalTo(true));
assertThat(routingTable.index("test1").shards().size(), equalTo(3));
for (int i = 0; i < routingTable.index("test1").shards().size(); i++) {
assertThat(routingTable.index("test1").shard(i).size(), equalTo(2));
assertThat(routingTable.index("test1").shard(i).shards().size(), equalTo(2));
assertThat(routingTable.index("test1").shard(i).primaryShard().state(), equalTo(STARTED));
assertThat(routingTable.index("test1").shard(i).replicaShards().size(), equalTo(1));
// backup shards are initializing as well, we make sure that they
// recover from primary *started* shards in the
// IndicesClusterStateService
assertThat(routingTable.index("test1").shard(i).replicaShards().get(0).state(), equalTo(INITIALIZING));
}
logger.info("Reroute, nothing should change");
prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState).routingTable();
assertThat(prevRoutingTable == routingTable, equalTo(true));
logger.info("Start the more shards");
routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
assertThat(prevRoutingTable != routingTable, equalTo(true));
assertThat(routingTable.index("test1").shards().size(), equalTo(3));
for (int i = 0; i < routingTable.index("test1").shards().size(); i++) {
assertThat(routingTable.index("test1").shard(i).size(), equalTo(2));
assertThat(routingTable.index("test1").shard(i).shards().size(), equalTo(2));
assertThat(routingTable.index("test1").shard(i).primaryShard().state(), equalTo(STARTED));
assertThat(routingTable.index("test1").shard(i).replicaShards().size(), equalTo(1));
}
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
assertThat(prevRoutingTable != routingTable, equalTo(true));
assertThat(routingNodes.node("node1").numberOfShardsWithState(STARTED), equalTo(4));
assertThat(routingNodes.node("node2").numberOfShardsWithState(STARTED), equalTo(4));
assertThat(routingNodes.node("node3").numberOfShardsWithState(STARTED), equalTo(4));
assertThat(routingNodes.node("node1").shardsWithState("test1", STARTED).size(), equalTo(2));
assertThat(routingNodes.node("node2").shardsWithState("test1", STARTED).size(), equalTo(2));
assertThat(routingNodes.node("node3").shardsWithState("test1", STARTED).size(), equalTo(2));
}
}

View File

@ -100,6 +100,9 @@ public class ShardsLimitAllocationTests {
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put("cluster.routing.allocation.node_initial_primaries_recoveries", 10)
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
.put("cluster.routing.allocation.balance.index", 0.0f)
.put("cluster.routing.allocation.balance.replica", 1.0f)
.put("cluster.routing.allocation.balance.primary", 0.0f)
.build());
logger.info("Building initial routing table");
@ -177,23 +180,14 @@ public class ShardsLimitAllocationTests {
assertThat(clusterState.readOnlyRoutingNodes().node("node1").numberOfShardsWithState(STARTED), equalTo(3));
assertThat(clusterState.readOnlyRoutingNodes().node("node1").numberOfShardsWithState(RELOCATING), equalTo(2));
logger.info("start the moving shards, a shard from test1 should move back to node1");
routingNodes = clusterState.routingNodes();
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(clusterState.readOnlyRoutingNodes().node("node1").numberOfShardsWithState(STARTED), equalTo(3));
assertThat(clusterState.readOnlyRoutingNodes().node("node1").numberOfShardsWithState(RELOCATING), equalTo(0));
assertThat(clusterState.readOnlyRoutingNodes().node("node1").numberOfShardsWithState(INITIALIZING), equalTo(2));
assertThat(clusterState.readOnlyRoutingNodes().node("node2").numberOfShardsWithState(STARTED), equalTo(5));
assertThat(clusterState.readOnlyRoutingNodes().node("node2").numberOfShardsWithState(RELOCATING), equalTo(2));
logger.info("finalize movement, another shard will move");
assertThat(clusterState.readOnlyRoutingNodes().node("node2").numberOfShardsWithState(STARTED), equalTo(3));
// the first move will destroy the balance and the balancer will move 2 shards from node2 to node one right after
// moving the nodes to node2 since we consider INITIALIZING nodes during rebalance
routingNodes = clusterState.routingNodes();
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
// now we are done compared to EvenShardCountAllocator since the Balancer is not soely based on the average
assertThat(clusterState.readOnlyRoutingNodes().node("node1").numberOfShardsWithState(STARTED), equalTo(5));
assertThat(clusterState.readOnlyRoutingNodes().node("node2").numberOfShardsWithState(STARTED), equalTo(5));
}

View File

@ -53,6 +53,9 @@ public class TenShardsOneReplicaRoutingTests {
.put("cluster.routing.allocation.node_initial_primaries_recoveries", 10)
.put("cluster.routing.allocation.allow_rebalance", "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
.put("cluster.routing.allocation.balance.index", 0.0f)
.put("cluster.routing.allocation.balance.replica", 1.0f)
.put("cluster.routing.allocation.balance.primary", 0.0f)
.build());
logger.info("Building initial routing table");
@ -173,8 +176,8 @@ public class TenShardsOneReplicaRoutingTests {
assertThat(prevRoutingTable != routingTable, equalTo(true));
assertThat(routingTable.index("test").shards().size(), equalTo(10));
assertThat(routingNodes.node("node1").numberOfShardsWithState(STARTED), equalTo(8));
assertThat(routingNodes.node("node2").numberOfShardsWithState(STARTED), equalTo(6));
assertThat(routingNodes.node("node1").numberOfShardsWithState(STARTED), equalTo(7));
assertThat(routingNodes.node("node2").numberOfShardsWithState(STARTED), equalTo(7));
assertThat(routingNodes.node("node3").numberOfShardsWithState(STARTED), equalTo(6));
}
}