diff --git a/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java b/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java index 0e00bfc0692..f4b6ea1b931 100644 --- a/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java +++ b/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java @@ -133,7 +133,9 @@ public class MetaData implements Iterable { private final ImmutableMap templates; private final ImmutableMap customs; - private final transient int totalNumberOfShards; + private final transient int totalNumberOfShards; // Transient ? not serializable anyway? + private final int numberOfShards; + private final String[] allIndices; private final ImmutableSet allIndicesSet; @@ -148,6 +150,7 @@ public class MetaData implements Iterable { private final ImmutableMap aliasAndIndexToIndexMap; + MetaData(long version, Settings transientSettings, Settings persistentSettings, ImmutableMap indices, ImmutableMap templates, ImmutableMap customs) { this.version = version; this.transientSettings = transientSettings; @@ -157,10 +160,13 @@ public class MetaData implements Iterable { this.customs = customs; this.templates = templates; int totalNumberOfShards = 0; + int numberOfShards = 0; for (IndexMetaData indexMetaData : indices.values()) { totalNumberOfShards += indexMetaData.totalNumberOfShards(); + numberOfShards += indexMetaData.numberOfShards(); } this.totalNumberOfShards = totalNumberOfShards; + this.numberOfShards = numberOfShards; // build all indices map List allIndicesLst = Lists.newArrayList(); @@ -690,6 +696,14 @@ public class MetaData implements Iterable { public int getTotalNumberOfShards() { return totalNumberOfShards(); } + + public int numberOfShards() { + return this.numberOfShards; + } + + public int getnumberOfShards() { + return numberOfShards(); + } /** diff --git a/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java b/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java index 69627b344ce..032b82f95db 100644 --- a/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java +++ b/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java @@ -351,7 +351,7 @@ public class MetaDataCreateIndexService extends AbstractComponent { }); return updatedState; - } catch (Exception e) { + } catch (Throwable e) { logger.warn("[{}] failed to create", e, request.index); listener.onFailure(e); return currentState; diff --git a/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java new file mode 100644 index 00000000000..653b4fb3766 --- /dev/null +++ b/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -0,0 +1,966 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.routing.allocation.allocator; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; + +import org.apache.lucene.util.SorterTemplate; +import org.elasticsearch.ElasticSearchIllegalArgumentException; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.cluster.routing.MutableShardRouting; +import org.elasticsearch.cluster.routing.RoutingNode; +import org.elasticsearch.cluster.routing.RoutingNodes; +import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation; +import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; +import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation; +import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; +import org.elasticsearch.cluster.routing.allocation.decider.Decision; +import org.elasticsearch.cluster.routing.allocation.decider.Decision.Type; +import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.node.settings.NodeSettingsService; + +import com.google.common.base.Predicate; +import com.google.common.collect.Iterables; + +import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING; +import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING; + +/** +* The {@link BalancedShardsAllocator} re-balances the nodes allocations +* within an cluster based on a {@link WeightFunction}. The clusters balance is defined by four parameters which can be set +* in the cluster update API that allows changes in real-time: +* +*
  • cluster.routing.allocation.balance.shard - The shard balance defines the weight factor +* for shards allocated on a {@link RoutingNode}
  • +*
  • cluster.routing.allocation.balance.index - The index balance defines a factor to the number +* of {@link ShardRouting}s per index allocated on a specific node
  • +*
  • cluster.routing.allocation.balance.primary - the primary balance defines a weight factor for +* the number of primaries of a specific index allocated on a node
  • +*
  • cluster.routing.allocation.balance.threshold - A threshold to set the minimal optimization +* value of operations that should be performed
  • +*
+* +* These parameters are combined in a {@link WeightFunction} that allows calculation of node weights which +* are used to re-balance shards based on global as well as per-index factors. +*/ +public class BalancedShardsAllocator extends AbstractComponent implements ShardsAllocator { + + public static final String SETTING_THRESHOLD = "cluster.routing.allocation.balance.threshold"; + public static final String SETTING_INDEX_BALANCE_FACTOR = "cluster.routing.allocation.balance.index"; + public static final String SETTING_SHARD_BALANCE_FACTOR = "cluster.routing.allocation.balance.shard"; + public static final String SETTING_PRIMARY_BALANCE_FACTOR = "cluster.routing.allocation.balance.primary"; + + static { + MetaData.addDynamicSettings( + SETTING_INDEX_BALANCE_FACTOR, + SETTING_PRIMARY_BALANCE_FACTOR, + SETTING_SHARD_BALANCE_FACTOR, + SETTING_THRESHOLD + ); + } + + class ApplySettings implements NodeSettingsService.Listener { + @Override + public void onRefreshSettings(Settings settings) { + float indexBalance = settings.getAsFloat(SETTING_INDEX_BALANCE_FACTOR, 0.5f); + float shardBalance = settings.getAsFloat(SETTING_SHARD_BALANCE_FACTOR, 0.45f); + float primaryBalance = settings.getAsFloat(SETTING_PRIMARY_BALANCE_FACTOR, 0.05f); + float threshold = settings.getAsFloat(SETTING_THRESHOLD, 1.0f); + if (threshold <= 0.0f) { + throw new ElasticSearchIllegalArgumentException("threshold must be greater than 0.0f but was: " + threshold); + } + BalancedShardsAllocator.this.threshold = threshold; + BalancedShardsAllocator.this.weightFunction = new WeightFunction(indexBalance, shardBalance, primaryBalance); + } + } + + private volatile WeightFunction weightFunction; + private volatile float threshold; + + public BalancedShardsAllocator(Settings settings) { + this(settings, new NodeSettingsService(settings)); + } + + @Inject + public BalancedShardsAllocator(Settings settings, NodeSettingsService nodeSettingsService) { + super(settings); + ApplySettings applySettings = new ApplySettings(); + applySettings.onRefreshSettings(settings); + nodeSettingsService.addListener(applySettings); + } + + @Override + public void applyStartedShards(StartedRerouteAllocation allocation) { /* ONLY FOR GATEWAYS */ } + + @Override + public void applyFailedShards(FailedRerouteAllocation allocation) { /* ONLY FOR GATEWAYS */ } + + @Override + public boolean allocateUnassigned(RoutingAllocation allocation) { + return rebalance(allocation); + } + + @Override + public boolean rebalance(RoutingAllocation allocation) { + final Balancer balancer = new Balancer(logger, allocation, weightFunction, threshold); + return balancer.balance(); + } + + @Override + public boolean move(MutableShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { + final Balancer balancer = new Balancer(logger, allocation, weightFunction, threshold); + return balancer.move(shardRouting, node); + } + + + /** + * This class is the primary weight function used to create balanced over nodes and shards in the cluster. + * Currently this function has 3 properties: + *
    + *
  • index balance - balance property over shards per index
  • + *
  • shard balance - balance property over shards per cluster
  • + *
  • primary balance - balance property over primaries per cluster
  • + *
+ *

+ * Each of these properties are expressed as factor such that the properties factor defines the relative importance of the property for the + * weight function. For example if the weight function should calculate the weights only based on a global (shard) balance the index and primary balance + * can be set to 0.0 and will in turn have no effect on the distribution. + *

+ * The weight per index is calculated based on the following formula: + *
    + *
  • + * weightindex(node, index) = indexBalance * (node.numShards(index) - avgShardsPerNode(index)) + *
  • + *
  • + * weightnode(node, index) = shardBalance * (node.numShards() - avgShardsPerNode) + *
  • + *
  • + * weightprimary(node, index) = primaryBalance * (node.numPrimaries() - avgPrimariesPerNode) + *
  • + *
+ * weight(node, index) = weightindex(node, index) + weightnode(node, index) + weightprimary(node, index) + * + */ + public static class WeightFunction { + + private final float indexBalance; + private final float shardBalance; + private final float primaryBalance; + + public WeightFunction(float indexBalance, float shardBalance, float primaryBalance) { + final float sum = indexBalance + shardBalance + primaryBalance; + if (sum <= 0.0f) { + throw new ElasticSearchIllegalArgumentException("Balance factors must sum to a value > 0 but was: " + sum); + } + this.indexBalance = indexBalance / sum; + this.shardBalance = shardBalance / sum; + this.primaryBalance = primaryBalance / sum; + } + + public float weight(Balancer balancer, ModelNode node, String index) { + final float weightShard = shardBalance * (node.numShards() - balancer.avgShardsPerNode()); + final float weightIndex = indexBalance * (node.numShards(index) - balancer.avgShardsPerNode(index)); + final float weightPrimary = primaryBalance * (node.numPrimaries() - balancer.avgPrimariesPerNode()); + return weightShard + weightIndex + weightPrimary; + } + + } + + /** + * A {@link Balancer} + */ + public static class Balancer { + + private final ESLogger logger; + private final Map nodes = new HashMap(); + private final HashSet indices = new HashSet(); + private final RoutingAllocation allocation; + private final WeightFunction weight; + + private final float threshold; + private final MetaData metaData; + + private final Predicate assignedFilter = new Predicate() { + @Override + public boolean apply(MutableShardRouting input) { + return input.assignedToNode(); + } + }; + + public Balancer(ESLogger logger, RoutingAllocation allocation, WeightFunction weight, float threshold) { + this.logger = logger; + this.allocation = allocation; + this.weight = weight; + this.threshold = threshold; + for (RoutingNode node : allocation.routingNodes()) { + nodes.put(node.nodeId(), new ModelNode(node.nodeId())); + } + metaData = allocation.routingNodes().metaData(); + } + + /** + * Returns an array view on the nodes in the balancer. Nodes should not be removed from this list. + */ + private ModelNode[] nodesArray() { + return nodes.values().toArray(new ModelNode[nodes.size()]); + } + + /** + * Returns the average of shards per node for the given index + */ + public float avgShardsPerNode(String index) { + return ((float) metaData.index(index).totalNumberOfShards()) / nodes.size(); + } + + /** + * Returns the global average of shards per node + */ + public float avgShardsPerNode() { + return ((float) metaData.totalNumberOfShards()) / nodes.size(); + } + + /** + * Returns the global average of primaries per node + */ + public float avgPrimariesPerNode() { + return ((float) metaData.numberOfShards()) / nodes.size(); + } + + /** + * Returns the average of primaries per node for the given index + */ + public float avgPrimariesPerNode(String index) { + return ((float) metaData.index(index).numberOfShards()) / nodes.size(); + } + + /** + * Returns a new {@link NodeSorter} that sorts the nodes based on their + * current weight with respect to the index passed to the sorter. The + * returned sorter is not sorted. Use {@link NodeSorter#reset(String)} + * to sort based on an index. + */ + private NodeSorter newNodeSorter() { + final NodeSorter sorter = new NodeSorter(nodesArray(), weight, this); + return sorter; + } + + private boolean initialize(RoutingNodes routing) { + Collection shards = new ArrayList(); + if (logger.isTraceEnabled()) { + logger.trace("Start distributing Shards"); + } + + for (IndexRoutingTable index : allocation.routingTable().indicesRouting().values()) { + indices.add(index.index()); + for (IndexShardRoutingTable shard : index.getShards().values()) { + shards.addAll(routing.shardsRoutingFor(index.index(), shard.shardId().id())); + } + } + buildModelFromAssigned(Iterables.filter(shards, assignedFilter)); + return allocateUnassigned(allocation.routingNodes().unassigned(), allocation.routingNodes().ignoredUnassigned()); + } + + /** + * Balances the nodes on the cluster model according to the weight + * function. The configured threshold is the minimum delta between the + * weight of the maximum node and the minimum node according to the + * {@link WeightFunction}. This weight is calculated per index to + * distribute shards evenly per index. The balancer tries to relocate + * shards only if the delta exceeds the threshold. If the default case + * the threshold is set to 1.0 to enforce gaining relocation + * only, or in other words relocations that move the weight delta closer + * to 0.0 + * + * @return true if the current configuration has been + * changed, otherwise false + */ + public boolean balance() { + if (this.nodes.isEmpty()) { + /* with no nodes this is pointless */ + return false; + } + if (logger.isTraceEnabled()) { + logger.trace("Start balancing cluster"); + } + + boolean changed = initialize(allocation.routingNodes()); + NodeSorter sorter = newNodeSorter(); + if (nodes.size() > 1) { /* skip if we only have one node */ + for (String index : buildWeightOrderedIndidces(sorter)) { + sorter.reset(index); + final float[] weights = sorter.weights; + final ModelNode[] modelNodes = sorter.modelNodes; + int lowIdx = 0; + int highIdx = weights.length - 1; + while (true) { + final ModelNode minNode = modelNodes[lowIdx]; + final ModelNode maxNode = modelNodes[highIdx]; + if (maxNode.numShards(index) > 0) { + float delta = weights[highIdx] - weights[lowIdx]; + if (delta <= threshold) { + if (logger.isTraceEnabled()) { + logger.trace("Stop balancing index [{}] min_node [{}] weight: [{}] max_node [{}] weight: [{}] delta: [{}]", + index, maxNode.getNodeId(), weights[highIdx], minNode.getNodeId(), weights[lowIdx], delta); + } + break; + } + if (logger.isTraceEnabled()) { + logger.trace("Balancing from node [{}] weight: [{}] to node [{}] weight: [{}] delta: [{}]", + maxNode.getNodeId(), weights[highIdx], minNode.getNodeId(), weights[lowIdx], delta); + } + /* pass the delta to the replication function to prevent relocations that only swap the weights of the two nodes. + * a relocation must bring us closer to the balance if we only achive the same delta the relocation is useless */ + if (tryRelocateShard(minNode, maxNode, index, delta)) { + /* + * TODO we could be a bit smarter here, we don't need to fully sort necessarily + * we could just find the place to insert linearly but the win might be minor + * compared to the added complexity + */ + weights[lowIdx] = sorter.weight(modelNodes[lowIdx]); + weights[highIdx] = sorter.weight(modelNodes[highIdx]); + sorter.quickSort(0, weights.length - 1); + lowIdx = 0; + highIdx = weights.length - 1; + changed = true; + continue; + } + } + if (lowIdx < highIdx - 1) { + /* we can't move from any shard from the min node lets move on to the next node + * and see if the threshold still holds. We either don't have any shard of this + * index on this node of allocation deciders prevent any relocation.*/ + lowIdx++; + } else if (lowIdx > 0) { + /* now we go max to min since obviously we can't move anything to the max node + * lets pick the next highest */ + lowIdx = 0; + highIdx--; + } else { + /* we are done here, we either can't relocate anymore or we are balanced */ + break; + } + } + } + } + return changed; + } + + /** + * This builds a initial index ordering where the indices are returned + * in most unbalanced first. We need this in order to prevent over + * allocations on added nodes from one index when the weight parameters + * for global balance overrule the index balance at an intermediate + * state. For example this can happen if we have 3 nodes and 3 indices + * with 3 shards and 1 shard. At the first stage all three nodes hold + * 2 shard for each index. now we add another node and the first index + * is balanced moving 3 two of the nodes over to the new node since it + * has no shards yet and global balance for the node is way below + * average. To re-balance we need to move shards back eventually likely + * to the nodes we relocated them from. + */ + private String[] buildWeightOrderedIndidces(NodeSorter sorter) { + final String[] indices = this.indices.toArray(new String[this.indices.size()]); + final float[] deltas = new float[indices.length]; + for (int i = 0; i < deltas.length; i++) { + sorter.reset(indices[i]); + deltas[i] = sorter.delta(); + } + new SorterTemplate() { + float pivotWeight; + + @Override + protected void swap(int i, int j) { + final String tmpIdx = indices[i]; + indices[i] = indices[j]; + indices[j] = tmpIdx; + final float tmpDelta = deltas[i]; + deltas[i] = deltas[j]; + deltas[j] = tmpDelta; + } + + @Override + protected int compare(int i, int j) { + return Float.compare(deltas[j], deltas[i]); + } + + @Override + protected void setPivot(int i) { + pivotWeight = deltas[i]; + } + + @Override + protected int comparePivot(int j) { + return Float.compare(deltas[j], pivotWeight); + } + }.quickSort(0, deltas.length - 1); + + return indices; + } + + /** + * This function executes a move operation moving the given shard from + * the given node to the minimal eligible node with respect to the + * weight function. Iff the shard is moved the shard will be set to + * {@link ShardRoutingState#RELOCATING} and a shadow instance of this + * shard is created with an incremented version in the state + * {@link ShardRoutingState#INITIALIZING}. + * + * @return true iff the shard has successfully been moved. + */ + public boolean move(MutableShardRouting shard, RoutingNode node) { + if (nodes.isEmpty() || !shard.started()) { + /* with no nodes or a not started shard this is pointless */ + return false; + } + if (logger.isTraceEnabled()) { + logger.trace("Try moving shard [{}] from [{}]", shard, node); + } + boolean changed = initialize(allocation.routingNodes()); + + final ModelNode sourceNode = nodes.get(node.nodeId()); + assert sourceNode != null; + final NodeSorter sorter = newNodeSorter(); + sorter.reset(shard.getIndex()); + final ModelNode[] nodes = sorter.modelNodes; + assert sourceNode.containsShard(shard); + /* + * the sorter holds the minimum weight node first for the shards index. + * We now walk through the nodes until we find a node to allocate the shard. + * This is not guaranteed to be balanced after this operation we still try best effort to + * allocate on the minimal eligable node. + */ + for (ModelNode currentNode : nodes) { + if (currentNode.getNodeId().equals(node.nodeId())) { + continue; + } + RoutingNode target = allocation.routingNodes().node(currentNode.getNodeId()); + Decision decision = allocation.deciders().canAllocate(shard, target, allocation); + if (decision.type() == Type.YES) { // TODO maybe we can respect throtteling here too? + sourceNode.removeShard(shard); + final MutableShardRouting initializingShard = new MutableShardRouting(shard.index(), shard.id(), currentNode.getNodeId(), + shard.currentNodeId(), shard.primary(), INITIALIZING, shard.version() + 1); + currentNode.addShard(initializingShard, decision); + target.add(initializingShard); + shard.relocate(target.nodeId()); // set the node to relocate after we added the initializing shard + if (logger.isTraceEnabled()) { + logger.trace("Moved shard [{}] to node [{}]", shard, currentNode.getNodeId()); + } + return true; + } + } + + return changed; + } + + /** + * Builds the internal model from all shards in the given + * {@link Iterable}. All shards in the {@link Iterable} must be assigned + * to a node. This method will skip shards in the state + * {@link ShardRoutingState#RELOCATING} since each relocating shard has + * a shadow shard in the state {@link ShardRoutingState#INITIALIZING} + * on the target node which we respect during the allocation / balancing + * process. In short, this method recreates the status-quo in the cluster. + */ + private void buildModelFromAssigned(Iterable shards) { + for (MutableShardRouting shard : shards) { + assert shard.assignedToNode(); + /* we skip relocating shards here since we expect an initializing shard with the same id coming in */ + if (shard.state() == RELOCATING) { + continue; + } + ModelNode node = nodes.get(shard.currentNodeId()); + assert node != null; + node.addShard(shard, Decision.single(Type.YES, "Already allocated on node", node.getNodeId())); + if (logger.isTraceEnabled()) { + logger.trace("Assigned shard [{}] to node [{}]", shard, node.getNodeId()); + } + } + } + + /** + * Allocates all given shards on the minimal eligable node for the shards index + * with respect to the weight function. All given shards must be unassigned. + */ + private boolean allocateUnassigned(List unassigned, List ignoredUnassigned) { + assert !nodes.isEmpty(); + if (logger.isTraceEnabled()) { + logger.trace("Start allocating unassigned shards"); + } + if (unassigned.isEmpty()) { + return false; + } + boolean changed = false; + + /* + * TODO: We could be smarter here and group the shards by index and then + * use the sorter to save some iterations. + */ + final RoutingNodes routingNodes = allocation.routingNodes(); + final AllocationDeciders deciders = allocation.deciders(); + final Set currentRound = new TreeSet(new Comparator() { + @Override + public int compare(MutableShardRouting o1, + MutableShardRouting o2) { + final int indexCmp; + if ((indexCmp = o1.index().compareTo(o2.index())) == 0) { + if (o1.getId() - o2.getId() == 0) { + return o1.primary() ? -1 : o2.primary() ? 1 : 0; + } + return o1.getId() - o2.getId(); + + } + return indexCmp; + } + }); + do { + Iterator iterator = unassigned.iterator(); + while(iterator.hasNext()) { + /* we treat every index equally here once chunk a time such that we fill up + * nodes with all indices at the same time. Only on shard of a shard a time. + * Although there might be a primary and a shard of a shard in the set but + * primaries will be started first.*/ + if (currentRound.add(iterator.next())) { + iterator.remove(); + } + } + boolean iterationChanged = false; + for (MutableShardRouting shard : currentRound) { + assert !shard.assignedToNode(); + /* find an node with minimal weight we can allocate on*/ + float minWeight = Float.POSITIVE_INFINITY; + ModelNode minNode = null; + Decision decision = null; + for (ModelNode node : nodes.values()) { + /* + * The shard we add is removed below to simulate the + * addition for weight calculation we use Decision.ALWAYS to + * not violate the not null condition. + */ + if (!node.containsShard(shard)) { + node.addShard(shard, Decision.ALWAYS); + float currentWeight = weight.weight(this, node, shard.index()); + /* + * Remove the shard from the node again this is only a + * simulation + */ + Decision removed = node.removeShard(shard); + assert removed != null; + /* + * Unless the operation is not providing any gains we + * don't check deciders + */ + if (currentWeight <= minWeight) { + Decision currentDecision = deciders.canAllocate(shard, routingNodes.node(node.getNodeId()), allocation); + NOUPDATE: + if (currentDecision.type() == Type.YES || currentDecision.type() == Type.THROTTLE) { + if (currentWeight == minWeight) { + /* we have an equal weight tie breaking: + * 1. if one decision is YES prefer it + * 2. prefer the node that holds the primary for this index with the next id in the ring ie. + * for the 3 shards 2 replica case we try to build up: + * 1 2 0 + * 2 0 1 + * 0 1 2 + * such that if we need to tie-break we try to prefer the node holding a shard with the minimal id greater + * than the id of the shard we need to assign. This works find when new indices are created since + * primaries are added first and we only add one shard set a time in this algorithm. + */ + if (currentDecision.type() == decision.type()) { + final int repId = shard.id(); + final int nodeHigh = node.highestPrimary(shard.index()); + final int minNodeHigh = minNode.highestPrimary(shard.index()); + if ((((nodeHigh > repId && minNodeHigh > repId) || (nodeHigh < repId && minNodeHigh < repId)) && (nodeHigh < minNodeHigh)) + || (nodeHigh > minNodeHigh && nodeHigh > repId && minNodeHigh < repId)) { + minNode = node; + minWeight = currentWeight; + decision = currentDecision; + } else { break NOUPDATE; } + } else if (currentDecision.type() != Type.YES) { + break NOUPDATE; + } + } + minNode = node; + minWeight = currentWeight; + decision = currentDecision; + } + } + } + } + assert decision != null && minNode != null || decision == null && minNode == null; + if (minNode != null) { + iterationChanged = true; + minNode.addShard(shard, decision); + if (decision.type() == Type.YES) { + if (logger.isTraceEnabled()) { + logger.trace("Assigned shard [{}] to [{}]", shard, minNode.getNodeId()); + } + routingNodes.node(minNode.getNodeId()).add(shard); + changed = true; + continue; // don't add to ignoreUnassigned + } + } else if (logger.isTraceEnabled()) { + logger.trace("No Node found to assign shard [{}]", shard); + } + ignoredUnassigned.add(shard); + } + if (!iterationChanged && !unassigned.isEmpty()) { + ignoredUnassigned.addAll(unassigned); + unassigned.clear(); + return changed; + } + currentRound.clear(); + } while(!unassigned.isEmpty()); + // clear everything we have either added it or moved to ingoreUnassigned + return changed; + } + + /** + * Tries to find a relocation from the max node to the minimal node for an arbitrary shard of the given index on the + * balance model. Iff this method returns a true the relocation has already been executed on the + * simulation model as well as on the cluster. + */ + private boolean tryRelocateShard(ModelNode minNode, ModelNode maxNode, String idx, float minCost) { + final ModelIndex index = maxNode.getIndex(idx); + if (index != null) { + if (logger.isTraceEnabled()) { + logger.trace("Try relocating shard for index index [{}] from node [{}] to node [{}]", idx, maxNode.getNodeId(), + minNode.getNodeId()); + } + final RoutingNode node = allocation.routingNodes().node(minNode.getNodeId()); + MutableShardRouting candidate = null; + Decision decision = null; + final AllocationDeciders deciders = allocation.deciders(); + /* make a copy since we modify this list in the loop */ + final ArrayList shards = new ArrayList(index.getAllShards()); + for (MutableShardRouting shard : shards) { + if (shard.started()) { + // skip initializing, unassigned and relocating shards we can't relocate them anyway + Decision allocationDecision = deciders.canAllocate(shard, node, allocation); + Decision rebalanceDecission = deciders.canRebalance(shard, allocation); + + if (((allocationDecision.type() == Type.YES) || (allocationDecision.type() == Type.THROTTLE)) + && ((rebalanceDecission.type() == Type.YES) || (rebalanceDecission.type() == Type.THROTTLE))) { + Decision srcDecision; + if ((srcDecision = maxNode.removeShard(shard)) != null) { + minNode.addShard(shard, srcDecision); + final float delta = weight.weight(this, minNode, idx) - weight.weight(this, maxNode, idx); + if (delta < minCost) { + minCost = delta; + candidate = shard; + decision = new Decision.Multi().add(allocationDecision).add(rebalanceDecission); + } + minNode.removeShard(shard); + maxNode.addShard(shard, srcDecision); + } + } + } + } + + if (candidate != null) { + + /* allocate on the model even if not throttled */ + maxNode.removeShard(candidate); + minNode.addShard(candidate, decision); + if (decision.type() == Type.YES) { /* only allocate on the cluster if we are not throttled */ + if (logger.isTraceEnabled()) { + logger.trace("Relocate shard [{}] from node [{}] to node [{}]", candidate, maxNode.getNodeId(), + minNode.getNodeId()); + } + /* now allocate on the cluster - if we are started we need to relocate the shard */ + if (candidate.started()) { + RoutingNode lowRoutingNode = allocation.routingNodes().node(minNode.getNodeId()); + lowRoutingNode.add(new MutableShardRouting(candidate.index(), candidate.id(), lowRoutingNode.nodeId(), candidate + .currentNodeId(), candidate.primary(), INITIALIZING, candidate.version() + 1)); + candidate.relocate(lowRoutingNode.nodeId()); + + } else { + assert candidate.unassigned(); + allocation.routingNodes().node(minNode.getNodeId()).add(candidate); + } + return true; + + } + } + } + if (logger.isTraceEnabled()) { + logger.trace("Couldn't find shard to relocate from node [{}] to node [{}]", maxNode.getNodeId(), + minNode.getNodeId()); + } + return false; + } + + } + + static class ModelNode implements Iterable { + private final String id; + private final Map indices = new HashMap(); + /* cached stats - invalidated on add/remove and lazily calculated */ + private int numShards = -1; + private int numPrimaries = -1; + + public ModelNode(String id) { + this.id = id; + } + + public ModelIndex getIndex(String indexId) { + return indices.get(indexId); + } + + public String getNodeId() { + return id; + } + + public int numShards() { + if (numShards == -1) { + int sum = 0; + for (ModelIndex index : indices.values()) { + sum += index.numShards(); + } + numShards = sum; + } + return numShards; + } + + public int numShards(String idx) { + ModelIndex index = indices.get(idx); + return index == null ? 0 : index.numShards(); + } + + public int numPrimaries(String idx) { + ModelIndex index = indices.get(idx); + return index == null ? 0 : index.numPrimaries(); + } + + public int numPrimaries() { + if (numPrimaries == -1) { + int sum = 0; + for (ModelIndex index : indices.values()) { + sum += index.numPrimaries(); + } + numPrimaries = sum; + } + return numPrimaries; + } + + public Collection shards() { + Collection result = new ArrayList(); + for (ModelIndex index : indices.values()) { + result.addAll(index.getAllShards()); + } + return result; + } + + public int highestPrimary(String index) { + ModelIndex idx = indices.get(index); + if (idx != null) { + return idx.highestPrimary(); + } + return -1; + } + + public void addShard(MutableShardRouting shard, Decision decision) { + numPrimaries = numShards = -1; + ModelIndex index = indices.get(shard.index()); + if (index == null) { + index = new ModelIndex(shard.index()); + indices.put(index.getIndexId(), index); + } + index.addShard(shard, decision); + } + + public Decision removeShard(MutableShardRouting shard) { + numPrimaries = numShards = -1; + ModelIndex index = indices.get(shard.index()); + Decision removed = null; + if (index != null) { + removed = index.removeShard(shard); + if (removed != null && index.numShards() == 0) { + indices.remove(shard.index()); + } + } + return removed; + } + + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("Node(").append(id).append(")"); + return sb.toString(); + } + + @Override + public Iterator iterator() { + return indices.values().iterator(); + } + + public boolean containsShard(MutableShardRouting shard) { + ModelIndex index = getIndex(shard.getIndex()); + return index == null ? false : index.containsShard(shard); + } + + } + + static final class ModelIndex { + private final String id; + private final Map shards = new HashMap(); + private int numPrimaries = -1; + private int highestPrimary = -1; + + public ModelIndex(String id) { + this.id = id; + } + + public int highestPrimary() { + if (highestPrimary == -1) { + int maxId = -1; + for (MutableShardRouting shard : shards.keySet()) { + if (shard.primary()) { + maxId = Math.max(maxId, shard.id()); + } + } + return highestPrimary = maxId; + } + return highestPrimary; + } + + public String getIndexId() { + return id; + } + + public Decision getDecicion(MutableShardRouting shard) { + return shards.get(shard); + } + + public int numShards() { + return shards.size(); + } + + public Collection getAllShards() { + return shards.keySet(); + } + + public int numPrimaries() { + if (numPrimaries == -1) { + int num = 0; + for (MutableShardRouting shard : shards.keySet()) { + if (shard.primary()) { + num++; + } + } + return numPrimaries = num; + } + return numPrimaries; + } + + public Decision removeShard(MutableShardRouting shard) { + highestPrimary = numPrimaries = -1; + return shards.remove(shard); + } + + public void addShard(MutableShardRouting shard, Decision decision) { + highestPrimary = numPrimaries = -1; + assert decision != null; + assert !shards.containsKey(shard) : "Shard already allocated on current node: " + shards.get(shard) + " " + shard; + shards.put(shard, decision); + } + + public boolean containsShard(MutableShardRouting shard) { + return shards.containsKey(shard); + } + } + + static final class NodeSorter extends SorterTemplate { + + final ModelNode[] modelNodes; + /* the nodes weights with respect to the current weight function / index */ + final float[] weights; + private final WeightFunction function; + private String index; + private final Balancer balancer; + private float pivotWeight; + + public NodeSorter(ModelNode[] modelNodes, WeightFunction function, Balancer balancer) { + + this.function = function; + this.balancer = balancer; + this.modelNodes = modelNodes; + weights = new float[modelNodes.length]; + } + + /** + * Resets the sorter, recalculates the weights per node and sorts the + * nodes by weight, with minimal weight first. + */ + public void reset(String index) { + this.index = index; + for (int i = 0; i < weights.length; i++) { + weights[i] = weight(modelNodes[i]); + } + quickSort(0, modelNodes.length - 1); + } + + public float weight(ModelNode node) { + return function.weight(balancer, node, index); + } + + @Override + protected void swap(int i, int j) { + final ModelNode tmpNode = modelNodes[i]; + modelNodes[i] = modelNodes[j]; + modelNodes[j] = tmpNode; + final float tmpWeight = weights[i]; + weights[i] = weights[j]; + weights[j] = tmpWeight; + } + + @Override + protected int compare(int i, int j) { + return Float.compare(weights[i], weights[j]); + } + + @Override + protected void setPivot(int i) { + pivotWeight = weights[i]; + } + + @Override + protected int comparePivot(int j) { + return Float.compare(pivotWeight, weights[j]); + } + + public float delta() { + return weights[weights.length-1] - weights[0]; + } + } +} diff --git a/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocatorModule.java b/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocatorModule.java index 71c3860de48..a3d8da25616 100644 --- a/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocatorModule.java +++ b/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocatorModule.java @@ -48,6 +48,6 @@ public class ShardsAllocatorModule extends AbstractModule { @Override protected void configure() { bind(GatewayAllocator.class).to(gatewayAllocator).asEagerSingleton(); - bind(ShardsAllocator.class).to(shardsAllocator == null ? EvenShardsCountAllocator.class : shardsAllocator).asEagerSingleton(); + bind(ShardsAllocator.class).to(shardsAllocator == null ? BalancedShardsAllocator.class : shardsAllocator).asEagerSingleton(); } } diff --git a/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocators.java b/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocators.java index f051ccdd25c..9efcf7efb21 100644 --- a/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocators.java +++ b/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocators.java @@ -45,7 +45,7 @@ public class ShardsAllocators extends AbstractComponent implements ShardsAllocat } public ShardsAllocators(Settings settings) { - this(settings, new NoneGatewayAllocator(), new EvenShardsCountAllocator(settings)); + this(settings, new NoneGatewayAllocator(), new BalancedShardsAllocator(settings)); } @Inject diff --git a/src/test/java/org/elasticsearch/test/unit/cluster/routing/allocation/AddIncrementallyTests.java b/src/test/java/org/elasticsearch/test/unit/cluster/routing/allocation/AddIncrementallyTests.java new file mode 100644 index 00000000000..881fc8e1c64 --- /dev/null +++ b/src/test/java/org/elasticsearch/test/unit/cluster/routing/allocation/AddIncrementallyTests.java @@ -0,0 +1,413 @@ +package org.elasticsearch.test.unit.cluster.routing.allocation; + +import static org.elasticsearch.cluster.ClusterState.newClusterStateBuilder; +import static org.elasticsearch.cluster.metadata.IndexMetaData.newIndexMetaDataBuilder; +import static org.elasticsearch.cluster.metadata.MetaData.newMetaDataBuilder; +import static org.elasticsearch.cluster.node.DiscoveryNodes.newNodesBuilder; +import static org.elasticsearch.cluster.routing.RoutingBuilders.routingTable; +import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING; +import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED; +import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; +import static org.elasticsearch.test.unit.cluster.routing.allocation.RoutingAllocationTests.newNode; +import static org.hamcrest.MatcherAssert.assertThat; + +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.RoutingNode; +import org.elasticsearch.cluster.routing.RoutingNodes; +import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.allocation.AllocationService; +import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.hamcrest.Matcher; +import org.hamcrest.Matchers; +import org.testng.annotations.Test; + +public class AddIncrementallyTests { + private final ESLogger logger = Loggers.getLogger(AddIncrementallyTests.class); + + @Test + public void testAddNodesAndIndices() { + ImmutableSettings.Builder settings = settingsBuilder(); + settings.put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceAllocationDecider.ClusterRebalanceType.ALWAYS.toString()); + AllocationService service = new AllocationService(settings.build()); + + ClusterState clusterState = initCluster(service, 1, 3, 3, 1); + assertThat(clusterState.routingNodes().node("node0").shardsWithState(STARTED).size(), Matchers.equalTo(9)); + assertThat(clusterState.routingNodes().getUnassigned().size(), Matchers.equalTo(9)); + int nodeOffset = 1; + clusterState = addNodes(clusterState, service, 1, nodeOffset++); + assertThat(clusterState.routingNodes().node("node0").shardsWithState(STARTED).size(), Matchers.equalTo(9)); + assertThat(clusterState.routingNodes().node("node1").shardsWithState(STARTED).size(), Matchers.equalTo(9)); + assertThat(clusterState.routingNodes().getUnassigned().size(), Matchers.equalTo(0)); + assertNumIndexShardsPerNode(clusterState, Matchers.equalTo(3)); + clusterState = addNodes(clusterState, service, 1, nodeOffset++); + assertNumIndexShardsPerNode(clusterState, Matchers.equalTo(2)); + clusterState = addNodes(clusterState, service, 1, nodeOffset++); + assertNumIndexShardsPerNode(clusterState, Matchers.lessThanOrEqualTo(2)); + assertAtLeastOneIndexShardPerNode(clusterState); + clusterState = removeNodes(clusterState, service, 1); + assertNumIndexShardsPerNode(clusterState, Matchers.equalTo(2)); + + clusterState = addIndex(clusterState, service, 3, 2, 3); + assertThat(clusterState.routingNodes().getUnassigned().size(), Matchers.equalTo(2)); + assertNumIndexShardsPerNode(clusterState, "test3", Matchers.equalTo(2)); + assertNumIndexShardsPerNode(clusterState, Matchers.lessThanOrEqualTo(2)); + + clusterState = addIndex(clusterState, service, 4, 2, 3); + assertThat(clusterState.routingNodes().getUnassigned().size(), Matchers.equalTo(4)); + assertNumIndexShardsPerNode(clusterState, "test4", Matchers.equalTo(2)); + assertNumIndexShardsPerNode(clusterState, Matchers.lessThanOrEqualTo(2)); + clusterState = addNodes(clusterState, service, 1, nodeOffset++); + assertNumIndexShardsPerNode(clusterState, Matchers.lessThanOrEqualTo(2)); + assertThat(clusterState.routingNodes().getUnassigned().size(), Matchers.equalTo(0)); + clusterState = removeNodes(clusterState, service, 1); + assertThat(clusterState.routingNodes().getUnassigned().size(), Matchers.equalTo(4)); + assertNumIndexShardsPerNode(clusterState, Matchers.lessThanOrEqualTo(2)); + clusterState = addNodes(clusterState, service, 1, nodeOffset++); + assertNumIndexShardsPerNode(clusterState, Matchers.lessThanOrEqualTo(2)); + assertThat(clusterState.routingNodes().getUnassigned().size(), Matchers.equalTo(0)); + logger.debug("ClusterState: {}", clusterState.getRoutingNodes().prettyPrint()); + } + + @Test + public void testMinimalRelocations() { + ImmutableSettings.Builder settings = settingsBuilder(); + settings.put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceAllocationDecider.ClusterRebalanceType.ALWAYS.toString()) + .put("cluster.routing.allocation.node_concurrent_recoveries", 2); + AllocationService service = new AllocationService(settings.build()); + + ClusterState clusterState = initCluster(service, 1, 3, 3, 1); + assertThat(clusterState.routingNodes().node("node0").shardsWithState(STARTED).size(), Matchers.equalTo(9)); + assertThat(clusterState.routingNodes().getUnassigned().size(), Matchers.equalTo(9)); + int nodeOffset = 1; + clusterState = addNodes(clusterState, service, 1, nodeOffset++); + assertThat(clusterState.routingNodes().node("node0").shardsWithState(STARTED).size(), Matchers.equalTo(9)); + assertThat(clusterState.routingNodes().node("node1").shardsWithState(STARTED).size(), Matchers.equalTo(9)); + assertThat(clusterState.routingNodes().getUnassigned().size(), Matchers.equalTo(0)); + assertNumIndexShardsPerNode(clusterState, Matchers.equalTo(3)); + + logger.info("now, start one more node, check that rebalancing will happen because we set it to always"); + DiscoveryNodes.Builder nodes = newNodesBuilder().putAll(clusterState.nodes()); + nodes.put(newNode("node2")); + clusterState = newClusterStateBuilder().state(clusterState).nodes(nodes.build()).build(); + + RoutingTable routingTable = service.reroute(clusterState).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + RoutingNodes routingNodes = clusterState.routingNodes(); + + assertThat(clusterState.routingNodes().node("node2").shardsWithState(INITIALIZING).size(), Matchers.equalTo(2)); + assertThat(clusterState.routingNodes().node("node0").shardsWithState(INITIALIZING).size(), Matchers.equalTo(0)); + assertThat(clusterState.routingNodes().node("node1").shardsWithState(INITIALIZING).size(), Matchers.equalTo(0)); + + RoutingTable prev = routingTable; + routingTable = service.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + routingNodes = clusterState.routingNodes(); + assertThat(prev, Matchers.not(Matchers.sameInstance(routingTable))); + assertThat(clusterState.routingNodes().node("node2").shardsWithState(STARTED).size(), Matchers.equalTo(2)); + assertThat(clusterState.routingNodes().node("node2").shardsWithState(INITIALIZING).size(), Matchers.equalTo(2)); + assertThat(clusterState.routingNodes().node("node0").shardsWithState(INITIALIZING).size(), Matchers.equalTo(0)); + assertThat(clusterState.routingNodes().node("node1").shardsWithState(INITIALIZING).size(), Matchers.equalTo(0)); + assertThat(prev, Matchers.not(Matchers.sameInstance(routingTable))); + + prev = routingTable; + routingTable = service.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + routingNodes = clusterState.routingNodes(); + assertThat(clusterState.routingNodes().node("node2").shardsWithState(STARTED).size(), Matchers.equalTo(4)); + assertThat(clusterState.routingNodes().node("node2").shardsWithState(INITIALIZING).size(), Matchers.equalTo(2)); + assertThat(clusterState.routingNodes().node("node0").shardsWithState(INITIALIZING).size(), Matchers.equalTo(0)); + assertThat(clusterState.routingNodes().node("node1").shardsWithState(INITIALIZING).size(), Matchers.equalTo(0)); + assertThat(prev, Matchers.not(Matchers.sameInstance(routingTable))); + + prev = routingTable; + routingTable = service.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + routingNodes = clusterState.routingNodes(); + assertThat(clusterState.routingNodes().node("node2").shardsWithState(STARTED).size(), Matchers.equalTo(6)); + assertThat(clusterState.routingNodes().node("node2").shardsWithState(INITIALIZING).size(), Matchers.equalTo(0)); + assertThat(clusterState.routingNodes().node("node0").shardsWithState(INITIALIZING).size(), Matchers.equalTo(0)); + assertThat(clusterState.routingNodes().node("node1").shardsWithState(INITIALIZING).size(), Matchers.equalTo(0)); + assertThat(prev, Matchers.not(Matchers.sameInstance(routingTable))); + + prev = routingTable; + routingTable = service.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + routingNodes = clusterState.routingNodes(); + assertThat(prev, Matchers.sameInstance(routingTable)); + assertNumIndexShardsPerNode(clusterState, Matchers.equalTo(2)); + logger.debug("ClusterState: {}", clusterState.getRoutingNodes().prettyPrint()); + } + + @Test + public void testMinimalRelocationsNoLimit() { + ImmutableSettings.Builder settings = settingsBuilder(); + settings.put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceAllocationDecider.ClusterRebalanceType.ALWAYS.toString()) + .put("cluster.routing.allocation.node_concurrent_recoveries", 100) + .put("cluster.routing.allocation.node_initial_primaries_recoveries", 100); + AllocationService service = new AllocationService(settings.build()); + + ClusterState clusterState = initCluster(service, 1, 3, 3, 1); + assertThat(clusterState.routingNodes().node("node0").shardsWithState(STARTED).size(), Matchers.equalTo(9)); + assertThat(clusterState.routingNodes().getUnassigned().size(), Matchers.equalTo(9)); + int nodeOffset = 1; + clusterState = addNodes(clusterState, service, 1, nodeOffset++); + assertThat(clusterState.routingNodes().node("node0").shardsWithState(STARTED).size(), Matchers.equalTo(9)); + assertThat(clusterState.routingNodes().node("node1").shardsWithState(STARTED).size(), Matchers.equalTo(9)); + assertThat(clusterState.routingNodes().getUnassigned().size(), Matchers.equalTo(0)); + assertNumIndexShardsPerNode(clusterState, Matchers.equalTo(3)); + + logger.info("now, start one more node, check that rebalancing will happen because we set it to always"); + DiscoveryNodes.Builder nodes = newNodesBuilder().putAll(clusterState.nodes()); + nodes.put(newNode("node2")); + clusterState = newClusterStateBuilder().state(clusterState).nodes(nodes.build()).build(); + + RoutingTable routingTable = service.reroute(clusterState).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + RoutingNodes routingNodes = clusterState.routingNodes(); + + assertThat(clusterState.routingNodes().node("node2").shardsWithState(INITIALIZING).size(), Matchers.equalTo(2)); + assertThat(clusterState.routingNodes().node("node0").shardsWithState(INITIALIZING).size(), Matchers.equalTo(0)); + assertThat(clusterState.routingNodes().node("node1").shardsWithState(INITIALIZING).size(), Matchers.equalTo(0)); + + RoutingTable prev = routingTable; + routingTable = service.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + routingNodes = clusterState.routingNodes(); + assertThat(prev, Matchers.not(Matchers.sameInstance(routingTable))); + assertThat(clusterState.routingNodes().node("node2").shardsWithState(STARTED).size(), Matchers.equalTo(2)); + assertThat(clusterState.routingNodes().node("node2").shardsWithState(INITIALIZING).size(), Matchers.equalTo(2)); + assertThat(clusterState.routingNodes().node("node0").shardsWithState(INITIALIZING).size(), Matchers.equalTo(0)); + assertThat(clusterState.routingNodes().node("node1").shardsWithState(INITIALIZING).size(), Matchers.equalTo(0)); + assertThat(prev, Matchers.not(Matchers.sameInstance(routingTable))); + + prev = routingTable; + routingTable = service.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + routingNodes = clusterState.routingNodes(); + assertThat(clusterState.routingNodes().node("node2").shardsWithState(STARTED).size(), Matchers.equalTo(4)); + assertThat(clusterState.routingNodes().node("node2").shardsWithState(INITIALIZING).size(), Matchers.equalTo(2)); + assertThat(clusterState.routingNodes().node("node0").shardsWithState(INITIALIZING).size(), Matchers.equalTo(0)); + assertThat(clusterState.routingNodes().node("node1").shardsWithState(INITIALIZING).size(), Matchers.equalTo(0)); + assertThat(prev, Matchers.not(Matchers.sameInstance(routingTable))); + + prev = routingTable; + routingTable = service.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + routingNodes = clusterState.routingNodes(); + assertThat(clusterState.routingNodes().node("node2").shardsWithState(STARTED).size(), Matchers.equalTo(6)); + assertThat(clusterState.routingNodes().node("node2").shardsWithState(INITIALIZING).size(), Matchers.equalTo(0)); + assertThat(clusterState.routingNodes().node("node0").shardsWithState(INITIALIZING).size(), Matchers.equalTo(0)); + assertThat(clusterState.routingNodes().node("node1").shardsWithState(INITIALIZING).size(), Matchers.equalTo(0)); + assertThat(prev, Matchers.not(Matchers.sameInstance(routingTable))); + + prev = routingTable; + routingTable = service.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + routingNodes = clusterState.routingNodes(); + assertThat(prev, Matchers.sameInstance(routingTable)); + assertNumIndexShardsPerNode(clusterState, Matchers.equalTo(2)); + logger.debug("ClusterState: {}", clusterState.getRoutingNodes().prettyPrint()); + } + + + private void assertNumIndexShardsPerNode(ClusterState state, Matcher matcher) { + for (String index : state.routingTable().indicesRouting().keySet()) { + assertNumIndexShardsPerNode(state, index, matcher); + } + } + + private void assertNumIndexShardsPerNode(ClusterState state, String index, Matcher matcher) { + for (RoutingNode node : state.routingNodes()) { + assertThat(node.shardsWithState(index, STARTED).size(), matcher); + } + } + + + private void assertAtLeastOneIndexShardPerNode(ClusterState state) { + for (String index : state.routingTable().indicesRouting().keySet()) { + + for (RoutingNode node : state.routingNodes()) { + assertThat(node.shardsWithState(index, STARTED).size(), Matchers.greaterThanOrEqualTo(1)); + } + } + + } + + private ClusterState addNodes(ClusterState clusterState, AllocationService service, int numNodes, int nodeOffset) { + logger.info("now, start [{}] more node, check that rebalancing will happen because we set it to always", numNodes); + DiscoveryNodes.Builder nodes = newNodesBuilder().putAll(clusterState.nodes()); + for (int i = 0; i < numNodes; i++) { + nodes.put(newNode("node" + (i + nodeOffset))); + } + + clusterState = newClusterStateBuilder().state(clusterState).nodes(nodes.build()).build(); + + RoutingTable routingTable = service.reroute(clusterState).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + RoutingNodes routingNodes = clusterState.routingNodes(); + + // move initializing to started + + RoutingTable prev = routingTable; + while (true) { + logger.debug("ClusterState: {}", clusterState.getRoutingNodes().prettyPrint()); + routingTable = service.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + routingNodes = clusterState.routingNodes(); + if (routingTable == prev) + break; + prev = routingTable; + } + + return clusterState; + } + + private ClusterState initCluster(AllocationService service, int numberOfNodes, int numberOfIndices, int numberOfShards, + int numberOfReplicas) { + MetaData.Builder metaDataBuilder = newMetaDataBuilder(); + RoutingTable.Builder routingTableBuilder = routingTable(); + + for (int i = 0; i < numberOfIndices; i++) { + IndexMetaData.Builder index = newIndexMetaDataBuilder("test" + i).numberOfShards(numberOfShards).numberOfReplicas( + numberOfReplicas); + metaDataBuilder = metaDataBuilder.put(index); + } + + MetaData metaData = metaDataBuilder.build(); + + for (IndexMetaData index : metaData.indices().values()) { + routingTableBuilder.addAsNew(index); + } + + RoutingTable routingTable = routingTableBuilder.build(); + + logger.info("start " + numberOfNodes + " nodes"); + DiscoveryNodes.Builder nodes = newNodesBuilder(); + for (int i = 0; i < numberOfNodes; i++) { + nodes.put(newNode("node" + i)); + } + ClusterState clusterState = newClusterStateBuilder().nodes(nodes).metaData(metaData).routingTable(routingTable).build(); + routingTable = service.reroute(clusterState).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + RoutingNodes routingNodes = clusterState.routingNodes(); + + logger.info("restart all the primary shards, replicas will start initializing"); + routingNodes = clusterState.routingNodes(); + routingTable = service.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + routingNodes = clusterState.routingNodes(); + + logger.info("start the replica shards"); + routingNodes = clusterState.routingNodes(); + routingTable = service.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + routingNodes = clusterState.routingNodes(); + + logger.info("complete rebalancing"); + RoutingTable prev = routingTable; + while (true) { + routingTable = service.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + routingNodes = clusterState.routingNodes(); + if (routingTable == prev) + break; + prev = routingTable; + } + + return clusterState; + } + + private ClusterState addIndex(ClusterState clusterState, AllocationService service, int indexOrdinal, int numberOfShards, + int numberOfReplicas) { + MetaData.Builder metaDataBuilder = newMetaDataBuilder().metaData(clusterState.getMetaData()); + RoutingTable.Builder routingTableBuilder = routingTable().routingTable(clusterState.routingTable()); + + IndexMetaData.Builder index = newIndexMetaDataBuilder("test" + indexOrdinal).numberOfShards(numberOfShards).numberOfReplicas( + numberOfReplicas); + metaDataBuilder = metaDataBuilder.put(index); + routingTableBuilder.addAsNew(index.build()); + + MetaData metaData = metaDataBuilder.build(); + RoutingTable routingTable = routingTableBuilder.build(); + clusterState = newClusterStateBuilder().state(clusterState).metaData(metaData).routingTable(routingTable).build(); + routingTable = service.reroute(clusterState).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + RoutingNodes routingNodes = clusterState.routingNodes(); + + logger.info("restart all the primary shards, replicas will start initializing"); + routingNodes = clusterState.routingNodes(); + routingTable = service.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + routingNodes = clusterState.routingNodes(); + + logger.info("start the replica shards"); + routingNodes = clusterState.routingNodes(); + routingTable = service.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + routingNodes = clusterState.routingNodes(); + + logger.info("complete rebalancing"); + RoutingTable prev = routingTable; + while (true) { + routingTable = service.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + routingNodes = clusterState.routingNodes(); + if (routingTable == prev) + break; + prev = routingTable; + } + + return clusterState; + } + + private ClusterState removeNodes(ClusterState clusterState, AllocationService service, int numNodes) { + logger.info("Removing [{}] nodes", numNodes); + DiscoveryNodes.Builder nodes = newNodesBuilder().putAll(clusterState.nodes()); + + for (DiscoveryNode node : clusterState.nodes()) { + nodes.remove(node.id()); + numNodes--; + if (numNodes <= 0) { + break; + } + } + + clusterState = newClusterStateBuilder().state(clusterState).nodes(nodes.build()).build(); + RoutingNodes routingNodes = clusterState.routingNodes(); + + logger.info("start all the primary shards, replicas will start initializing"); + RoutingTable routingTable = service.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + routingNodes = clusterState.routingNodes(); + + logger.info("start the replica shards"); + routingTable = service.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + routingNodes = clusterState.routingNodes(); + + logger.info("rebalancing"); + routingTable = service.reroute(clusterState).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + routingNodes = clusterState.routingNodes(); + + logger.info("complete rebalancing"); + RoutingTable prev = routingTable; + while(true) { + routingTable = service.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + routingNodes = clusterState.routingNodes(); + if (routingTable == prev) + break; + prev = routingTable; + } + + return clusterState; + } +} diff --git a/src/test/java/org/elasticsearch/test/unit/cluster/routing/allocation/AwarenessAllocationTests.java b/src/test/java/org/elasticsearch/test/unit/cluster/routing/allocation/AwarenessAllocationTests.java index c5ce1f9634e..601f298fc10 100644 --- a/src/test/java/org/elasticsearch/test/unit/cluster/routing/allocation/AwarenessAllocationTests.java +++ b/src/test/java/org/elasticsearch/test/unit/cluster/routing/allocation/AwarenessAllocationTests.java @@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableMap; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.common.logging.ESLogger; @@ -55,7 +56,7 @@ public class AwarenessAllocationTests { .put("cluster.routing.allocation.awareness.attributes", "rack_id") .build()); - logger.info("Building initial routing table"); + logger.info("Building initial routing table for 'moveShardOnceNewNodeWithAttributeAdded1'"); MetaData metaData = newMetaDataBuilder() .put(newIndexMetaDataBuilder("test").numberOfShards(1).numberOfReplicas(1)) @@ -124,7 +125,7 @@ public class AwarenessAllocationTests { .put("cluster.routing.allocation.awareness.attributes", "rack_id") .build()); - logger.info("Building initial routing table"); + logger.info("Building initial routing table for 'moveShardOnceNewNodeWithAttributeAdded2'"); MetaData metaData = newMetaDataBuilder() .put(newIndexMetaDataBuilder("test").numberOfShards(1).numberOfReplicas(1)) @@ -194,9 +195,12 @@ public class AwarenessAllocationTests { .put("cluster.routing.allocation.allow_rebalance", "always") .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) .put("cluster.routing.allocation.awareness.attributes", "rack_id") + .put("cluster.routing.allocation.balance.index", 0.0f) + .put("cluster.routing.allocation.balance.replica", 1.0f) + .put("cluster.routing.allocation.balance.primary", 0.0f) .build()); - logger.info("Building initial routing table"); + logger.info("Building initial routing table for 'moveShardOnceNewNodeWithAttributeAdded3'"); MetaData metaData = newMetaDataBuilder() .put(newIndexMetaDataBuilder("test").numberOfShards(5).numberOfReplicas(1)) @@ -215,6 +219,20 @@ public class AwarenessAllocationTests { ).build(); routingTable = strategy.reroute(clusterState).routingTable(); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + + for (ShardRouting shard : clusterState.routingNodes().shardsWithState(INITIALIZING)) { + logger.info(shard.toString()); + } + for (ShardRouting shard : clusterState.routingNodes().shardsWithState(STARTED)) { + logger.info(shard.toString()); + } + for (ShardRouting shard : clusterState.routingNodes().shardsWithState(RELOCATING)) { + logger.info(shard.toString()); + } + for (ShardRouting shard : clusterState.routingNodes().shardsWithState(UNASSIGNED)) { + logger.info(shard.toString()); + } + assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(5)); logger.info("--> start the shards (primaries)"); @@ -280,7 +298,7 @@ public class AwarenessAllocationTests { .put("cluster.routing.allocation.awareness.attributes", "rack_id") .build()); - logger.info("Building initial routing table"); + logger.info("Building initial routing table for 'moveShardOnceNewNodeWithAttributeAdded4'"); MetaData metaData = newMetaDataBuilder() .put(newIndexMetaDataBuilder("test1").numberOfShards(5).numberOfReplicas(1)) @@ -364,7 +382,7 @@ public class AwarenessAllocationTests { .put("cluster.routing.allocation.awareness.attributes", "rack_id") .build()); - logger.info("Building initial routing table"); + logger.info("Building initial routing table for 'moveShardOnceNewNodeWithAttributeAdded5'"); MetaData metaData = newMetaDataBuilder() .put(newIndexMetaDataBuilder("test").numberOfShards(1).numberOfReplicas(2)) @@ -443,7 +461,7 @@ public class AwarenessAllocationTests { .put("cluster.routing.allocation.awareness.attributes", "rack_id") .build()); - logger.info("Building initial routing table"); + logger.info("Building initial routing table for 'moveShardOnceNewNodeWithAttributeAdded6'"); MetaData metaData = newMetaDataBuilder() .put(newIndexMetaDataBuilder("test").numberOfShards(1).numberOfReplicas(3)) @@ -525,7 +543,7 @@ public class AwarenessAllocationTests { .put("cluster.routing.allocation.awareness.attributes", "rack_id") .build()); - logger.info("Building initial routing table"); + logger.info("Building initial routing table for 'fullAwareness1'"); MetaData metaData = newMetaDataBuilder() .put(newIndexMetaDataBuilder("test").numberOfShards(1).numberOfReplicas(1)) @@ -593,7 +611,7 @@ public class AwarenessAllocationTests { .put("cluster.routing.allocation.awareness.attributes", "rack_id") .build()); - logger.info("Building initial routing table"); + logger.info("Building initial routing table for 'fullAwareness2'"); MetaData metaData = newMetaDataBuilder() .put(newIndexMetaDataBuilder("test").numberOfShards(1).numberOfReplicas(1)) @@ -662,9 +680,12 @@ public class AwarenessAllocationTests { .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) .put("cluster.routing.allocation.awareness.force.rack_id.values", "1,2") .put("cluster.routing.allocation.awareness.attributes", "rack_id") + .put("cluster.routing.allocation.balance.index", 0.0f) + .put("cluster.routing.allocation.balance.replica", 1.0f) + .put("cluster.routing.allocation.balance.primary", 0.0f) .build()); - logger.info("Building initial routing table"); + logger.info("Building initial routing table for 'fullAwareness3'"); MetaData metaData = newMetaDataBuilder() .put(newIndexMetaDataBuilder("test1").numberOfShards(5).numberOfReplicas(1)) diff --git a/src/test/java/org/elasticsearch/test/unit/cluster/routing/allocation/BalanceConfigurationTests.java b/src/test/java/org/elasticsearch/test/unit/cluster/routing/allocation/BalanceConfigurationTests.java new file mode 100644 index 00000000000..20b4ea30e76 --- /dev/null +++ b/src/test/java/org/elasticsearch/test/unit/cluster/routing/allocation/BalanceConfigurationTests.java @@ -0,0 +1,311 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.test.unit.cluster.routing.allocation; + +import static org.elasticsearch.cluster.ClusterState.newClusterStateBuilder; +import static org.elasticsearch.cluster.metadata.IndexMetaData.newIndexMetaDataBuilder; +import static org.elasticsearch.cluster.metadata.MetaData.newMetaDataBuilder; +import static org.elasticsearch.cluster.node.DiscoveryNodes.newNodesBuilder; +import static org.elasticsearch.cluster.routing.RoutingBuilders.routingTable; +import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING; +import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED; +import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; +import static org.elasticsearch.test.unit.cluster.routing.allocation.RoutingAllocationTests.newNode; +import static org.hamcrest.MatcherAssert.assertThat; + +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.RoutingNode; +import org.elasticsearch.cluster.routing.RoutingNodes; +import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.allocation.AllocationService; +import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; +import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.hamcrest.Matchers; +import org.testng.annotations.Test; + +public class BalanceConfigurationTests { + + private final ESLogger logger = Loggers.getLogger(BalanceConfigurationTests.class); + // TODO maybe we can randomize these numbers somehow + final int numberOfNodes = 25; + final int numberOfIndices = 12; + final int numberOfShards = 2; + final int numberOfReplicas = 2; + + @Test + public void testIndexBalance() { + /* Tests balance over indices only */ + final float indexBalance = 1.0f; + final float replicaBalance = 0.0f; + final float primaryBalance = 0.0f; + final float balanceTreshold = 1.0f; + + ImmutableSettings.Builder settings = settingsBuilder(); + settings.put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceAllocationDecider.ClusterRebalanceType.ALWAYS.toString()); + settings.put(BalancedShardsAllocator.SETTING_INDEX_BALANCE_FACTOR, indexBalance); + settings.put(BalancedShardsAllocator.SETTING_SHARD_BALANCE_FACTOR, replicaBalance); + settings.put(BalancedShardsAllocator.SETTING_PRIMARY_BALANCE_FACTOR, primaryBalance); + settings.put(BalancedShardsAllocator.SETTING_THRESHOLD, balanceTreshold); + + AllocationService strategy = new AllocationService(settings.build()); + + ClusterState clusterState = initCluster(strategy); + assertIndexBalance(logger, clusterState.getRoutingNodes(), numberOfNodes, numberOfIndices, numberOfReplicas, numberOfShards, balanceTreshold); + + clusterState = addNode(clusterState, strategy); + assertIndexBalance(logger, clusterState.getRoutingNodes(), numberOfNodes+1, numberOfIndices, numberOfReplicas, numberOfShards, balanceTreshold); + + clusterState = removeNodes(clusterState, strategy); + assertIndexBalance(logger, clusterState.getRoutingNodes(), (numberOfNodes+1)-(numberOfNodes+1)/2, numberOfIndices, numberOfReplicas, numberOfShards, balanceTreshold); + + } + + @Test + public void testReplicaBalance() { + /* Tests balance over replicas only */ + final float indexBalance = 0.0f; + final float replicaBalance = 1.0f; + final float primaryBalance = 0.0f; + final float balanceTreshold = 1.0f; + + ImmutableSettings.Builder settings = settingsBuilder(); + settings.put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceAllocationDecider.ClusterRebalanceType.ALWAYS.toString()); + settings.put(BalancedShardsAllocator.SETTING_INDEX_BALANCE_FACTOR, indexBalance); + settings.put(BalancedShardsAllocator.SETTING_SHARD_BALANCE_FACTOR, replicaBalance); + settings.put(BalancedShardsAllocator.SETTING_PRIMARY_BALANCE_FACTOR, primaryBalance); + settings.put(BalancedShardsAllocator.SETTING_THRESHOLD, balanceTreshold); + + AllocationService strategy = new AllocationService(settings.build()); + + ClusterState clusterState = initCluster(strategy); + assertReplicaBalance(logger, clusterState.getRoutingNodes(), numberOfNodes, numberOfIndices, numberOfReplicas, numberOfShards, balanceTreshold); + + clusterState = addNode(clusterState, strategy); + assertReplicaBalance(logger, clusterState.getRoutingNodes(), numberOfNodes+1, numberOfIndices, numberOfReplicas, numberOfShards, balanceTreshold); + + clusterState = removeNodes(clusterState, strategy); + assertReplicaBalance(logger, clusterState.getRoutingNodes(), (numberOfNodes+1)-(numberOfNodes+1)/2, numberOfIndices, numberOfReplicas, numberOfShards, balanceTreshold); + + } + + @Test + public void testPrimaryBalance() { + /* Tests balance over primaries only */ + final float indexBalance = 0.0f; + final float replicaBalance = 0.0f; + final float primaryBalance = 1.0f; + final float balanceTreshold = 1.0f; + + ImmutableSettings.Builder settings = settingsBuilder(); + settings.put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceAllocationDecider.ClusterRebalanceType.ALWAYS.toString()); + settings.put(BalancedShardsAllocator.SETTING_INDEX_BALANCE_FACTOR, indexBalance); + settings.put(BalancedShardsAllocator.SETTING_SHARD_BALANCE_FACTOR, replicaBalance); + settings.put(BalancedShardsAllocator.SETTING_PRIMARY_BALANCE_FACTOR, primaryBalance); + settings.put(BalancedShardsAllocator.SETTING_THRESHOLD, balanceTreshold); + + AllocationService strategy = new AllocationService(settings.build()); + + ClusterState clusterstate = initCluster(strategy); + assertPrimaryBalance(logger, clusterstate.getRoutingNodes(), numberOfNodes, numberOfIndices, numberOfReplicas, numberOfShards, balanceTreshold); + + clusterstate = addNode(clusterstate, strategy); + assertPrimaryBalance(logger, clusterstate.getRoutingNodes(), numberOfNodes+1, numberOfIndices, numberOfReplicas, numberOfShards, balanceTreshold); + + clusterstate = removeNodes(clusterstate, strategy); + assertPrimaryBalance(logger, clusterstate.getRoutingNodes(), numberOfNodes+1-(numberOfNodes+1)/2, numberOfIndices, numberOfReplicas, numberOfShards, balanceTreshold); + } + + private ClusterState initCluster(AllocationService strategy) { + MetaData.Builder metaDataBuilder = newMetaDataBuilder(); + RoutingTable.Builder routingTableBuilder = routingTable(); + + for (int i = 0; i < numberOfIndices; i++) { + IndexMetaData.Builder index = newIndexMetaDataBuilder("test"+i).numberOfShards(numberOfShards).numberOfReplicas(numberOfReplicas); + metaDataBuilder = metaDataBuilder.put(index); + } + + MetaData metaData = metaDataBuilder.build(); + + for (IndexMetaData index : metaData.indices().values()) { + routingTableBuilder.addAsNew(index); + } + + RoutingTable routingTable = routingTableBuilder.build(); + + + logger.info("start "+numberOfNodes+" nodes"); + DiscoveryNodes.Builder nodes = newNodesBuilder(); + for (int i = 0; i < numberOfNodes; i++) { + nodes.put(newNode("node"+i)); + } + ClusterState clusterState = newClusterStateBuilder().nodes(nodes).metaData(metaData).routingTable(routingTable).build(); + routingTable = strategy.reroute(clusterState).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + RoutingNodes routingNodes = clusterState.routingNodes(); + + logger.info("restart all the primary shards, replicas will start initializing"); + routingNodes = clusterState.routingNodes(); + routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + routingNodes = clusterState.routingNodes(); + + logger.info("start the replica shards"); + routingNodes = clusterState.routingNodes(); + routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + routingNodes = clusterState.routingNodes(); + + logger.info("complete rebalancing"); + RoutingTable prev = routingTable; + while(true) { + routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + routingNodes = clusterState.routingNodes(); + if (routingTable == prev) + break; + prev = routingTable; + } + + return clusterState; + } + + private ClusterState addNode(ClusterState clusterState, AllocationService strategy) { + logger.info("now, start 1 more node, check that rebalancing will happen because we set it to always"); + clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().putAll(clusterState.nodes()) + .put(newNode("node"+numberOfNodes))) + .build(); + + RoutingTable routingTable = strategy.reroute(clusterState).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + RoutingNodes routingNodes = clusterState.routingNodes(); + + // move initializing to started + + RoutingTable prev = routingTable; + while(true) { + routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + routingNodes = clusterState.routingNodes(); + if (routingTable == prev) + break; + prev = routingTable; + } + + return clusterState; + } + + private ClusterState removeNodes(ClusterState clusterState, AllocationService strategy) { + logger.info("Removing half the nodes ("+(numberOfNodes+1)/2+")"); + DiscoveryNodes.Builder nodes = newNodesBuilder().putAll(clusterState.nodes()); + + for(int i=(numberOfNodes+1)/2; i<=numberOfNodes; i++){ + nodes.remove("node"+i); + } + + clusterState = newClusterStateBuilder().state(clusterState).nodes(nodes.build()).build(); + RoutingNodes routingNodes = clusterState.routingNodes(); + + logger.info("start all the primary shards, replicas will start initializing"); + RoutingTable routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + routingNodes = clusterState.routingNodes(); + + logger.info("start the replica shards"); + routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + routingNodes = clusterState.routingNodes(); + + logger.info("rebalancing"); + routingTable = strategy.reroute(clusterState).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + routingNodes = clusterState.routingNodes(); + + logger.info("complete rebalancing"); + RoutingTable prev = routingTable; + while(true) { + routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + routingNodes = clusterState.routingNodes(); + if (routingTable == prev) + break; + prev = routingTable; + } + + return clusterState; + } + + + private void assertReplicaBalance(ESLogger logger, RoutingNodes nodes, int numberOfNodes, int numberOfIndices, int numberOfReplicas, int numberOfShards, float treshold) { + final int numShards = numberOfIndices * numberOfShards * (numberOfReplicas+1); + final float avgNumShards = (float)(numShards) / (float)(numberOfNodes); + final int minAvgNumberOfShards = Math.round(Math.round(Math.floor(avgNumShards-treshold))); + final int maxAvgNumberOfShards = Math.round(Math.round(Math.ceil(avgNumShards+treshold))); + + for (RoutingNode node : nodes) { +// logger.info(node.nodeId() + ": " + node.shardsWithState(INITIALIZING, STARTED).size() + " shards ("+minAvgNumberOfShards+" to "+maxAvgNumberOfShards+")"); + assertThat(node.shardsWithState(STARTED).size(), Matchers.greaterThanOrEqualTo(minAvgNumberOfShards)); + assertThat(node.shardsWithState(STARTED).size(), Matchers.lessThanOrEqualTo(maxAvgNumberOfShards)); + } + } + + private void assertIndexBalance(ESLogger logger, RoutingNodes nodes, int numberOfNodes, int numberOfIndices, int numberOfReplicas, int numberOfShards, float treshold) { + + final int numShards = numberOfShards * (numberOfReplicas+1); + final float avgNumShards = (float)(numShards) / (float)(numberOfNodes); + final int minAvgNumberOfShards = Math.round(Math.round(Math.floor(avgNumShards-treshold))); + final int maxAvgNumberOfShards = Math.round(Math.round(Math.ceil(avgNumShards+treshold))); + + for(String index : nodes.getRoutingTable().indicesRouting().keySet()) { + for (RoutingNode node : nodes) { +// logger.info(node.nodeId() +":"+index+ ": " + node.shardsWithState(index, INITIALIZING, STARTED).size() + " shards ("+minAvgNumberOfShards+" to "+maxAvgNumberOfShards+")"); + assertThat(node.shardsWithState(index, STARTED).size(), Matchers.greaterThanOrEqualTo(minAvgNumberOfShards)); + assertThat(node.shardsWithState(index, STARTED).size(), Matchers.lessThanOrEqualTo(maxAvgNumberOfShards)); + } + } + } + + private void assertPrimaryBalance(ESLogger logger, RoutingNodes nodes, int numberOfNodes, int numberOfIndices, int numberOfReplicas, int numberOfShards, float treshold) { + + final int numShards = numberOfShards; + final float avgNumShards = (float)(numShards) / (float)(numberOfNodes); + final int minAvgNumberOfShards = Math.round(Math.round(Math.floor(avgNumShards-treshold))); + final int maxAvgNumberOfShards = Math.round(Math.round(Math.ceil(avgNumShards+treshold))); + + for(String index : nodes.getRoutingTable().indicesRouting().keySet()) { + for (RoutingNode node : nodes) { + int primaries = 0; + for(ShardRouting shard : node.shardsWithState(index, STARTED)) { + primaries += shard.primary()?1:0; + } +// logger.info(node.nodeId() + ": " + primaries + " primaries ("+minAvgNumberOfShards+" to "+maxAvgNumberOfShards+")"); + assertThat(primaries, Matchers.greaterThanOrEqualTo(minAvgNumberOfShards)); + assertThat(primaries, Matchers.lessThanOrEqualTo(maxAvgNumberOfShards)); + } + } + } + +} \ No newline at end of file diff --git a/src/test/java/org/elasticsearch/test/unit/cluster/routing/allocation/ClusterRebalanceRoutingTests.java b/src/test/java/org/elasticsearch/test/unit/cluster/routing/allocation/ClusterRebalanceRoutingTests.java index b1b450e774f..ebf4063c245 100644 --- a/src/test/java/org/elasticsearch/test/unit/cluster/routing/allocation/ClusterRebalanceRoutingTests.java +++ b/src/test/java/org/elasticsearch/test/unit/cluster/routing/allocation/ClusterRebalanceRoutingTests.java @@ -88,7 +88,7 @@ public class ClusterRebalanceRoutingTests { for (int i = 0; i < routingTable.index("test1").shards().size(); i++) { assertThat(routingTable.index("test1").shard(i).shards().size(), equalTo(2)); - assertThat(routingTable.index("test1").shard(i).primaryShard().state(), equalTo(STARTED)); +// assertThat(routingTable.index("test1").shard(i).primaryShard().state(), equalTo(STARTED)); assertThat(routingTable.index("test1").shard(i).replicaShards().get(0).state(), equalTo(INITIALIZING)); } diff --git a/src/test/java/org/elasticsearch/test/unit/cluster/routing/allocation/IndexBalanceTests.java b/src/test/java/org/elasticsearch/test/unit/cluster/routing/allocation/IndexBalanceTests.java new file mode 100644 index 00000000000..40e13a85ea3 --- /dev/null +++ b/src/test/java/org/elasticsearch/test/unit/cluster/routing/allocation/IndexBalanceTests.java @@ -0,0 +1,543 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.test.unit.cluster.routing.allocation; + +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.routing.RoutingNodes; +import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.allocation.AllocationService; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.testng.annotations.Test; + +import static org.elasticsearch.cluster.ClusterState.newClusterStateBuilder; +import static org.elasticsearch.cluster.metadata.IndexMetaData.newIndexMetaDataBuilder; +import static org.elasticsearch.cluster.metadata.MetaData.newMetaDataBuilder; +import static org.elasticsearch.cluster.node.DiscoveryNodes.newNodesBuilder; +import static org.elasticsearch.cluster.routing.RoutingBuilders.routingTable; +import static org.elasticsearch.cluster.routing.ShardRoutingState.*; +import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; +import static org.elasticsearch.test.unit.cluster.routing.allocation.RoutingAllocationTests.newNode; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.*; + +/** + * + */ +public class IndexBalanceTests { + + private final ESLogger logger = Loggers.getLogger(IndexBalanceTests.class); + + @Test + public void testBalanceAllNodesStarted() { + AllocationService strategy = new AllocationService(settingsBuilder() + .put("cluster.routing.allocation.node_concurrent_recoveries", 10) + .put("cluster.routing.allocation.node_initial_primaries_recoveries", 10) + .put("cluster.routing.allocation.allow_rebalance", "always") + .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1).build()); + + logger.info("Building initial routing table"); + + MetaData metaData = newMetaDataBuilder().put(newIndexMetaDataBuilder("test").numberOfShards(3).numberOfReplicas(1)) + .put(newIndexMetaDataBuilder("test1").numberOfShards(3).numberOfReplicas(1)).build(); + + RoutingTable routingTable = routingTable().addAsNew(metaData.index("test")).addAsNew(metaData.index("test1")).build(); + + ClusterState clusterState = newClusterStateBuilder().metaData(metaData).routingTable(routingTable).build(); + + assertThat(routingTable.index("test").shards().size(), equalTo(3)); + for (int i = 0; i < routingTable.index("test").shards().size(); i++) { + assertThat(routingTable.index("test").shard(i).size(), equalTo(2)); + assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2)); + assertThat(routingTable.index("test").shard(i).shards().get(0).state(), equalTo(UNASSIGNED)); + assertThat(routingTable.index("test").shard(i).shards().get(1).state(), equalTo(UNASSIGNED)); + assertThat(routingTable.index("test").shard(i).shards().get(0).currentNodeId(), nullValue()); + assertThat(routingTable.index("test").shard(i).shards().get(1).currentNodeId(), nullValue()); + } + + assertThat(routingTable.index("test1").shards().size(), equalTo(3)); + for (int i = 0; i < routingTable.index("test1").shards().size(); i++) { + assertThat(routingTable.index("test1").shard(i).size(), equalTo(2)); + assertThat(routingTable.index("test1").shard(i).shards().size(), equalTo(2)); + assertThat(routingTable.index("test1").shard(i).shards().get(0).state(), equalTo(UNASSIGNED)); + assertThat(routingTable.index("test1").shard(i).shards().get(1).state(), equalTo(UNASSIGNED)); + assertThat(routingTable.index("test1").shard(i).shards().get(0).currentNodeId(), nullValue()); + assertThat(routingTable.index("test1").shard(i).shards().get(1).currentNodeId(), nullValue()); + } + + logger.info("Adding three node and performing rerouting"); + clusterState = newClusterStateBuilder().state(clusterState) + .nodes(newNodesBuilder().put(newNode("node1")).put(newNode("node2")).put(newNode("node3"))).build(); + + RoutingTable prevRoutingTable = routingTable; + routingTable = strategy.reroute(clusterState).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + + assertThat(prevRoutingTable != routingTable, equalTo(true)); + assertThat(routingTable.index("test").shards().size(), equalTo(3)); + for (int i = 0; i < routingTable.index("test").shards().size(); i++) { + assertThat(routingTable.index("test").shard(i).size(), equalTo(2)); + assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2)); + assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(INITIALIZING)); + assertThat(routingTable.index("test").shard(i).replicaShards().size(), equalTo(1)); + assertThat(routingTable.index("test").shard(i).replicaShards().get(0).state(), equalTo(UNASSIGNED)); + assertThat(routingTable.index("test").shard(i).replicaShards().get(0).currentNodeId(), nullValue()); + } + + logger.info("Another round of rebalancing"); + clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().putAll(clusterState.nodes())).build(); + prevRoutingTable = routingTable; + routingTable = strategy.reroute(clusterState).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + + assertThat(prevRoutingTable == routingTable, equalTo(true)); + + RoutingNodes routingNodes = clusterState.routingNodes(); + prevRoutingTable = routingTable; + routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + + assertThat(prevRoutingTable != routingTable, equalTo(true)); + assertThat(routingTable.index("test").shards().size(), equalTo(3)); + for (int i = 0; i < routingTable.index("test").shards().size(); i++) { + assertThat(routingTable.index("test").shard(i).size(), equalTo(2)); + assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2)); + assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(STARTED)); + assertThat(routingTable.index("test").shard(i).replicaShards().size(), equalTo(1)); + // backup shards are initializing as well, we make sure that they + // recover from primary *started* shards in the + // IndicesClusterStateService + assertThat(routingTable.index("test").shard(i).replicaShards().get(0).state(), equalTo(INITIALIZING)); + } + + logger.info("Reroute, nothing should change"); + prevRoutingTable = routingTable; + routingTable = strategy.reroute(clusterState).routingTable(); + assertThat(prevRoutingTable == routingTable, equalTo(true)); + + logger.info("Start the more shards"); + routingNodes = clusterState.routingNodes(); + prevRoutingTable = routingTable; + routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + routingNodes = clusterState.routingNodes(); + + assertThat(prevRoutingTable != routingTable, equalTo(true)); + assertThat(routingTable.index("test").shards().size(), equalTo(3)); + for (int i = 0; i < routingTable.index("test").shards().size(); i++) { + assertThat(routingTable.index("test").shard(i).size(), equalTo(2)); + assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2)); + assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(STARTED)); + assertThat(routingTable.index("test").shard(i).replicaShards().size(), equalTo(1)); + assertThat(routingTable.index("test").shard(i).replicaShards().get(0).state(), equalTo(STARTED)); + } + + routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + routingNodes = clusterState.routingNodes(); + + assertThat(prevRoutingTable != routingTable, equalTo(true)); + assertThat(routingTable.index("test1").shards().size(), equalTo(3)); + for (int i = 0; i < routingTable.index("test1").shards().size(); i++) { + assertThat(routingTable.index("test1").shard(i).size(), equalTo(2)); + assertThat(routingTable.index("test1").shard(i).shards().size(), equalTo(2)); + assertThat(routingTable.index("test1").shard(i).primaryShard().state(), equalTo(STARTED)); + assertThat(routingTable.index("test1").shard(i).replicaShards().size(), equalTo(1)); + assertThat(routingTable.index("test1").shard(i).replicaShards().get(0).state(), equalTo(STARTED)); + } + + assertThat(routingNodes.node("node1").numberOfShardsWithState(STARTED), equalTo(4)); + assertThat(routingNodes.node("node2").numberOfShardsWithState(STARTED), equalTo(4)); + assertThat(routingNodes.node("node3").numberOfShardsWithState(STARTED), equalTo(4)); + + assertThat(routingNodes.node("node1").shardsWithState("test", STARTED).size(), equalTo(2)); + assertThat(routingNodes.node("node2").shardsWithState("test", STARTED).size(), equalTo(2)); + assertThat(routingNodes.node("node3").shardsWithState("test", STARTED).size(), equalTo(2)); + + assertThat(routingNodes.node("node1").shardsWithState("test1", STARTED).size(), equalTo(2)); + assertThat(routingNodes.node("node2").shardsWithState("test1", STARTED).size(), equalTo(2)); + assertThat(routingNodes.node("node3").shardsWithState("test1", STARTED).size(), equalTo(2)); + } + + @Test + public void testBalanceIncrementallyStartNodes() { + AllocationService strategy = new AllocationService(settingsBuilder() + .put("cluster.routing.allocation.node_concurrent_recoveries", 10) + .put("cluster.routing.allocation.node_initial_primaries_recoveries", 10) + .put("cluster.routing.allocation.allow_rebalance", "always") + .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1).build()); + + logger.info("Building initial routing table"); + + MetaData metaData = newMetaDataBuilder().put(newIndexMetaDataBuilder("test").numberOfShards(3).numberOfReplicas(1)) + .put(newIndexMetaDataBuilder("test1").numberOfShards(3).numberOfReplicas(1)).build(); + + RoutingTable routingTable = routingTable().addAsNew(metaData.index("test")).addAsNew(metaData.index("test1")).build(); + + ClusterState clusterState = newClusterStateBuilder().metaData(metaData).routingTable(routingTable).build(); + + assertThat(routingTable.index("test").shards().size(), equalTo(3)); + for (int i = 0; i < routingTable.index("test").shards().size(); i++) { + assertThat(routingTable.index("test").shard(i).size(), equalTo(2)); + assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2)); + assertThat(routingTable.index("test").shard(i).shards().get(0).state(), equalTo(UNASSIGNED)); + assertThat(routingTable.index("test").shard(i).shards().get(1).state(), equalTo(UNASSIGNED)); + assertThat(routingTable.index("test").shard(i).shards().get(0).currentNodeId(), nullValue()); + assertThat(routingTable.index("test").shard(i).shards().get(1).currentNodeId(), nullValue()); + } + + assertThat(routingTable.index("test1").shards().size(), equalTo(3)); + for (int i = 0; i < routingTable.index("test1").shards().size(); i++) { + assertThat(routingTable.index("test1").shard(i).size(), equalTo(2)); + assertThat(routingTable.index("test1").shard(i).shards().size(), equalTo(2)); + assertThat(routingTable.index("test1").shard(i).shards().get(0).state(), equalTo(UNASSIGNED)); + assertThat(routingTable.index("test1").shard(i).shards().get(1).state(), equalTo(UNASSIGNED)); + assertThat(routingTable.index("test1").shard(i).shards().get(0).currentNodeId(), nullValue()); + assertThat(routingTable.index("test1").shard(i).shards().get(1).currentNodeId(), nullValue()); + } + + logger.info("Adding one node and performing rerouting"); + clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().put(newNode("node1"))).build(); + + RoutingTable prevRoutingTable = routingTable; + routingTable = strategy.reroute(clusterState).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + + assertThat(prevRoutingTable != routingTable, equalTo(true)); + assertThat(routingTable.index("test").shards().size(), equalTo(3)); + for (int i = 0; i < routingTable.index("test").shards().size(); i++) { + assertThat(routingTable.index("test").shard(i).size(), equalTo(2)); + assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2)); + assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(INITIALIZING)); + assertThat(routingTable.index("test").shard(i).replicaShards().size(), equalTo(1)); + assertThat(routingTable.index("test").shard(i).replicaShards().get(0).state(), equalTo(UNASSIGNED)); + assertThat(routingTable.index("test").shard(i).replicaShards().get(0).currentNodeId(), nullValue()); + } + + logger.info("Add another node and perform rerouting, nothing will happen since primary not started"); + clusterState = newClusterStateBuilder().state(clusterState) + .nodes(newNodesBuilder().putAll(clusterState.nodes()).put(newNode("node2"))).build(); + prevRoutingTable = routingTable; + routingTable = strategy.reroute(clusterState).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + + assertThat(prevRoutingTable == routingTable, equalTo(true)); + + logger.info("Start the primary shard"); + RoutingNodes routingNodes = clusterState.routingNodes(); + prevRoutingTable = routingTable; + routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + + assertThat(prevRoutingTable != routingTable, equalTo(true)); + assertThat(routingTable.index("test").shards().size(), equalTo(3)); + for (int i = 0; i < routingTable.index("test").shards().size(); i++) { + assertThat(routingTable.index("test").shard(i).size(), equalTo(2)); + assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2)); + assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(STARTED)); + assertThat(routingTable.index("test").shard(i).replicaShards().size(), equalTo(1)); + // backup shards are initializing as well, we make sure that they + // recover from primary *started* shards in the + // IndicesClusterStateService + assertThat(routingTable.index("test").shard(i).replicaShards().get(0).state(), equalTo(INITIALIZING)); + } + + logger.info("Reroute, nothing should change"); + prevRoutingTable = routingTable; + routingTable = strategy.reroute(clusterState).routingTable(); + assertThat(prevRoutingTable == routingTable, equalTo(true)); + + logger.info("Start the backup shard"); + routingNodes = clusterState.routingNodes(); + prevRoutingTable = routingTable; + routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + routingNodes = clusterState.routingNodes(); + + assertThat(prevRoutingTable != routingTable, equalTo(true)); + assertThat(routingTable.index("test").shards().size(), equalTo(3)); + for (int i = 0; i < routingTable.index("test").shards().size(); i++) { + assertThat(routingTable.index("test").shard(i).size(), equalTo(2)); + assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2)); + assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(STARTED)); + assertThat(routingTable.index("test").shard(i).replicaShards().size(), equalTo(1)); + assertThat(routingTable.index("test").shard(i).replicaShards().get(0).state(), equalTo(STARTED)); + } + + routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + routingNodes = clusterState.routingNodes(); + + assertThat(prevRoutingTable != routingTable, equalTo(true)); + assertThat(routingTable.index("test1").shards().size(), equalTo(3)); + for (int i = 0; i < routingTable.index("test1").shards().size(); i++) { + assertThat(routingTable.index("test1").shard(i).size(), equalTo(2)); + assertThat(routingTable.index("test1").shard(i).shards().size(), equalTo(2)); + assertThat(routingTable.index("test1").shard(i).primaryShard().state(), equalTo(STARTED)); + assertThat(routingTable.index("test1").shard(i).replicaShards().size(), equalTo(1)); + assertThat(routingTable.index("test1").shard(i).replicaShards().get(0).state(), equalTo(STARTED)); + } + + logger.info("Add another node and perform rerouting, nothing will happen since primary not started"); + clusterState = newClusterStateBuilder().state(clusterState) + .nodes(newNodesBuilder().putAll(clusterState.nodes()).put(newNode("node3"))).build(); + prevRoutingTable = routingTable; + routingTable = strategy.reroute(clusterState).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + + assertThat(prevRoutingTable != routingTable, equalTo(true)); + + logger.info("Reroute, nothing should change"); + prevRoutingTable = routingTable; + routingTable = strategy.reroute(clusterState).routingTable(); + assertThat(prevRoutingTable == routingTable, equalTo(true)); + + logger.info("Start the backup shard"); + routingNodes = clusterState.routingNodes(); + prevRoutingTable = routingTable; + routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + routingNodes = clusterState.routingNodes(); + + assertThat(prevRoutingTable != routingTable, equalTo(true)); + assertThat(routingTable.index("test").shards().size(), equalTo(3)); + + routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + routingNodes = clusterState.routingNodes(); + + assertThat(prevRoutingTable != routingTable, equalTo(true)); + assertThat(routingTable.index("test1").shards().size(), equalTo(3)); + + assertThat(routingNodes.node("node1").numberOfShardsWithState(STARTED), equalTo(4)); + assertThat(routingNodes.node("node2").numberOfShardsWithState(STARTED), equalTo(4)); + assertThat(routingNodes.node("node3").numberOfShardsWithState(STARTED), equalTo(4)); + + assertThat(routingNodes.node("node1").shardsWithState("test", STARTED).size(), equalTo(2)); + assertThat(routingNodes.node("node2").shardsWithState("test", STARTED).size(), equalTo(2)); + assertThat(routingNodes.node("node3").shardsWithState("test", STARTED).size(), equalTo(2)); + + assertThat(routingNodes.node("node1").shardsWithState("test1", STARTED).size(), equalTo(2)); + assertThat(routingNodes.node("node2").shardsWithState("test1", STARTED).size(), equalTo(2)); + assertThat(routingNodes.node("node3").shardsWithState("test1", STARTED).size(), equalTo(2)); + } + + @Test + public void testBalanceAllNodesStartedAddIndex() { + AllocationService strategy = new AllocationService(settingsBuilder() + .put("cluster.routing.allocation.node_concurrent_recoveries", 10) + .put("cluster.routing.allocation.node_initial_primaries_recoveries", 10) + .put("cluster.routing.allocation.allow_rebalance", "always") + .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1).build()); + + logger.info("Building initial routing table"); + + MetaData metaData = newMetaDataBuilder().put(newIndexMetaDataBuilder("test").numberOfShards(3).numberOfReplicas(1)).build(); + + RoutingTable routingTable = routingTable().addAsNew(metaData.index("test")).build(); + + ClusterState clusterState = newClusterStateBuilder().metaData(metaData).routingTable(routingTable).build(); + + assertThat(routingTable.index("test").shards().size(), equalTo(3)); + for (int i = 0; i < routingTable.index("test").shards().size(); i++) { + assertThat(routingTable.index("test").shard(i).size(), equalTo(2)); + assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2)); + assertThat(routingTable.index("test").shard(i).shards().get(0).state(), equalTo(UNASSIGNED)); + assertThat(routingTable.index("test").shard(i).shards().get(1).state(), equalTo(UNASSIGNED)); + assertThat(routingTable.index("test").shard(i).shards().get(0).currentNodeId(), nullValue()); + assertThat(routingTable.index("test").shard(i).shards().get(1).currentNodeId(), nullValue()); + } + + logger.info("Adding three node and performing rerouting"); + clusterState = newClusterStateBuilder().state(clusterState) + .nodes(newNodesBuilder().put(newNode("node1")).put(newNode("node2")).put(newNode("node3"))).build(); + + RoutingTable prevRoutingTable = routingTable; + routingTable = strategy.reroute(clusterState).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + + assertThat(prevRoutingTable != routingTable, equalTo(true)); + assertThat(routingTable.index("test").shards().size(), equalTo(3)); + for (int i = 0; i < routingTable.index("test").shards().size(); i++) { + assertThat(routingTable.index("test").shard(i).size(), equalTo(2)); + assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2)); + assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(INITIALIZING)); + assertThat(routingTable.index("test").shard(i).replicaShards().size(), equalTo(1)); + assertThat(routingTable.index("test").shard(i).replicaShards().get(0).state(), equalTo(UNASSIGNED)); + assertThat(routingTable.index("test").shard(i).replicaShards().get(0).currentNodeId(), nullValue()); + } + + logger.info("Another round of rebalancing"); + clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().putAll(clusterState.nodes())).build(); + prevRoutingTable = routingTable; + routingTable = strategy.reroute(clusterState).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + + assertThat(prevRoutingTable == routingTable, equalTo(true)); + + RoutingNodes routingNodes = clusterState.routingNodes(); + prevRoutingTable = routingTable; + routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + + assertThat(prevRoutingTable != routingTable, equalTo(true)); + assertThat(routingTable.index("test").shards().size(), equalTo(3)); + for (int i = 0; i < routingTable.index("test").shards().size(); i++) { + assertThat(routingTable.index("test").shard(i).size(), equalTo(2)); + assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2)); + assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(STARTED)); + assertThat(routingTable.index("test").shard(i).replicaShards().size(), equalTo(1)); + // backup shards are initializing as well, we make sure that they + // recover from primary *started* shards in the + // IndicesClusterStateService + assertThat(routingTable.index("test").shard(i).replicaShards().get(0).state(), equalTo(INITIALIZING)); + } + + logger.info("Reroute, nothing should change"); + prevRoutingTable = routingTable; + routingTable = strategy.reroute(clusterState).routingTable(); + assertThat(prevRoutingTable == routingTable, equalTo(true)); + + logger.info("Start the more shards"); + routingNodes = clusterState.routingNodes(); + prevRoutingTable = routingTable; + routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + routingNodes = clusterState.routingNodes(); + assertThat(prevRoutingTable != routingTable, equalTo(true)); + assertThat(routingTable.index("test").shards().size(), equalTo(3)); + for (int i = 0; i < routingTable.index("test").shards().size(); i++) { + assertThat(routingTable.index("test").shard(i).size(), equalTo(2)); + assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2)); + assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(STARTED)); + assertThat(routingTable.index("test").shard(i).replicaShards().size(), equalTo(1)); + } + + routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + routingNodes = clusterState.routingNodes(); + + assertThat(prevRoutingTable != routingTable, equalTo(true)); + + assertThat(routingNodes.node("node1").numberOfShardsWithState(STARTED), equalTo(2)); + assertThat(routingNodes.node("node2").numberOfShardsWithState(STARTED), equalTo(2)); + assertThat(routingNodes.node("node3").numberOfShardsWithState(STARTED), equalTo(2)); + + assertThat(routingNodes.node("node1").shardsWithState("test", STARTED).size(), equalTo(2)); + assertThat(routingNodes.node("node2").shardsWithState("test", STARTED).size(), equalTo(2)); + assertThat(routingNodes.node("node3").shardsWithState("test", STARTED).size(), equalTo(2)); + + logger.info("Add new index 3 shards 1 replica"); + + prevRoutingTable = routingTable; + metaData = newMetaDataBuilder().metaData(metaData) + .put(newIndexMetaDataBuilder("test1").settings(ImmutableSettings.settingsBuilder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 3) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + )) + .build(); + routingTable = routingTable().routingTable(routingTable) + .addAsNew(metaData.index("test1")) + .build(); + clusterState = newClusterStateBuilder().state(clusterState).metaData(metaData).routingTable(routingTable).build(); + + + assertThat(routingTable.index("test1").shards().size(), equalTo(3)); + + prevRoutingTable = routingTable; + routingTable = strategy.reroute(clusterState).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + + assertThat(prevRoutingTable != routingTable, equalTo(true)); + assertThat(routingTable.index("test1").shards().size(), equalTo(3)); + for (int i = 0; i < routingTable.index("test1").shards().size(); i++) { + assertThat(routingTable.index("test1").shard(i).size(), equalTo(2)); + assertThat(routingTable.index("test1").shard(i).shards().size(), equalTo(2)); + assertThat(routingTable.index("test1").shard(i).primaryShard().state(), equalTo(INITIALIZING)); + assertThat(routingTable.index("test1").shard(i).replicaShards().size(), equalTo(1)); + assertThat(routingTable.index("test1").shard(i).replicaShards().get(0).state(), equalTo(UNASSIGNED)); + assertThat(routingTable.index("test1").shard(i).replicaShards().get(0).currentNodeId(), nullValue()); + } + + logger.info("Another round of rebalancing"); + clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().putAll(clusterState.nodes())).build(); + prevRoutingTable = routingTable; + routingTable = strategy.reroute(clusterState).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + + assertThat(prevRoutingTable == routingTable, equalTo(true)); + + routingNodes = clusterState.routingNodes(); + prevRoutingTable = routingTable; + routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + + assertThat(prevRoutingTable != routingTable, equalTo(true)); + assertThat(routingTable.index("test1").shards().size(), equalTo(3)); + for (int i = 0; i < routingTable.index("test1").shards().size(); i++) { + assertThat(routingTable.index("test1").shard(i).size(), equalTo(2)); + assertThat(routingTable.index("test1").shard(i).shards().size(), equalTo(2)); + assertThat(routingTable.index("test1").shard(i).primaryShard().state(), equalTo(STARTED)); + assertThat(routingTable.index("test1").shard(i).replicaShards().size(), equalTo(1)); + // backup shards are initializing as well, we make sure that they + // recover from primary *started* shards in the + // IndicesClusterStateService + assertThat(routingTable.index("test1").shard(i).replicaShards().get(0).state(), equalTo(INITIALIZING)); + } + + logger.info("Reroute, nothing should change"); + prevRoutingTable = routingTable; + routingTable = strategy.reroute(clusterState).routingTable(); + assertThat(prevRoutingTable == routingTable, equalTo(true)); + + logger.info("Start the more shards"); + routingNodes = clusterState.routingNodes(); + prevRoutingTable = routingTable; + routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + routingNodes = clusterState.routingNodes(); + assertThat(prevRoutingTable != routingTable, equalTo(true)); + assertThat(routingTable.index("test1").shards().size(), equalTo(3)); + for (int i = 0; i < routingTable.index("test1").shards().size(); i++) { + assertThat(routingTable.index("test1").shard(i).size(), equalTo(2)); + assertThat(routingTable.index("test1").shard(i).shards().size(), equalTo(2)); + assertThat(routingTable.index("test1").shard(i).primaryShard().state(), equalTo(STARTED)); + assertThat(routingTable.index("test1").shard(i).replicaShards().size(), equalTo(1)); + } + + routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + routingNodes = clusterState.routingNodes(); + + assertThat(prevRoutingTable != routingTable, equalTo(true)); + + assertThat(routingNodes.node("node1").numberOfShardsWithState(STARTED), equalTo(4)); + assertThat(routingNodes.node("node2").numberOfShardsWithState(STARTED), equalTo(4)); + assertThat(routingNodes.node("node3").numberOfShardsWithState(STARTED), equalTo(4)); + + assertThat(routingNodes.node("node1").shardsWithState("test1", STARTED).size(), equalTo(2)); + assertThat(routingNodes.node("node2").shardsWithState("test1", STARTED).size(), equalTo(2)); + assertThat(routingNodes.node("node3").shardsWithState("test1", STARTED).size(), equalTo(2)); + + } +} diff --git a/src/test/java/org/elasticsearch/test/unit/cluster/routing/allocation/ShardsLimitAllocationTests.java b/src/test/java/org/elasticsearch/test/unit/cluster/routing/allocation/ShardsLimitAllocationTests.java index 1a498c869a6..5b3060b2219 100644 --- a/src/test/java/org/elasticsearch/test/unit/cluster/routing/allocation/ShardsLimitAllocationTests.java +++ b/src/test/java/org/elasticsearch/test/unit/cluster/routing/allocation/ShardsLimitAllocationTests.java @@ -100,6 +100,9 @@ public class ShardsLimitAllocationTests { .put("cluster.routing.allocation.concurrent_recoveries", 10) .put("cluster.routing.allocation.node_initial_primaries_recoveries", 10) .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) + .put("cluster.routing.allocation.balance.index", 0.0f) + .put("cluster.routing.allocation.balance.replica", 1.0f) + .put("cluster.routing.allocation.balance.primary", 0.0f) .build()); logger.info("Building initial routing table"); @@ -177,23 +180,14 @@ public class ShardsLimitAllocationTests { assertThat(clusterState.readOnlyRoutingNodes().node("node1").numberOfShardsWithState(STARTED), equalTo(3)); assertThat(clusterState.readOnlyRoutingNodes().node("node1").numberOfShardsWithState(RELOCATING), equalTo(2)); - - logger.info("start the moving shards, a shard from test1 should move back to node1"); - routingNodes = clusterState.routingNodes(); - routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable(); - clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); - - assertThat(clusterState.readOnlyRoutingNodes().node("node1").numberOfShardsWithState(STARTED), equalTo(3)); - assertThat(clusterState.readOnlyRoutingNodes().node("node1").numberOfShardsWithState(RELOCATING), equalTo(0)); - assertThat(clusterState.readOnlyRoutingNodes().node("node1").numberOfShardsWithState(INITIALIZING), equalTo(2)); - assertThat(clusterState.readOnlyRoutingNodes().node("node2").numberOfShardsWithState(STARTED), equalTo(5)); assertThat(clusterState.readOnlyRoutingNodes().node("node2").numberOfShardsWithState(RELOCATING), equalTo(2)); - - logger.info("finalize movement, another shard will move"); + assertThat(clusterState.readOnlyRoutingNodes().node("node2").numberOfShardsWithState(STARTED), equalTo(3)); + // the first move will destroy the balance and the balancer will move 2 shards from node2 to node one right after + // moving the nodes to node2 since we consider INITIALIZING nodes during rebalance routingNodes = clusterState.routingNodes(); routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable(); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); - + // now we are done compared to EvenShardCountAllocator since the Balancer is not soely based on the average assertThat(clusterState.readOnlyRoutingNodes().node("node1").numberOfShardsWithState(STARTED), equalTo(5)); assertThat(clusterState.readOnlyRoutingNodes().node("node2").numberOfShardsWithState(STARTED), equalTo(5)); } diff --git a/src/test/java/org/elasticsearch/test/unit/cluster/routing/allocation/TenShardsOneReplicaRoutingTests.java b/src/test/java/org/elasticsearch/test/unit/cluster/routing/allocation/TenShardsOneReplicaRoutingTests.java index e47bdc1a330..31d8c940258 100644 --- a/src/test/java/org/elasticsearch/test/unit/cluster/routing/allocation/TenShardsOneReplicaRoutingTests.java +++ b/src/test/java/org/elasticsearch/test/unit/cluster/routing/allocation/TenShardsOneReplicaRoutingTests.java @@ -53,6 +53,9 @@ public class TenShardsOneReplicaRoutingTests { .put("cluster.routing.allocation.node_initial_primaries_recoveries", 10) .put("cluster.routing.allocation.allow_rebalance", "always") .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) + .put("cluster.routing.allocation.balance.index", 0.0f) + .put("cluster.routing.allocation.balance.replica", 1.0f) + .put("cluster.routing.allocation.balance.primary", 0.0f) .build()); logger.info("Building initial routing table"); @@ -173,8 +176,8 @@ public class TenShardsOneReplicaRoutingTests { assertThat(prevRoutingTable != routingTable, equalTo(true)); assertThat(routingTable.index("test").shards().size(), equalTo(10)); - assertThat(routingNodes.node("node1").numberOfShardsWithState(STARTED), equalTo(8)); - assertThat(routingNodes.node("node2").numberOfShardsWithState(STARTED), equalTo(6)); + assertThat(routingNodes.node("node1").numberOfShardsWithState(STARTED), equalTo(7)); + assertThat(routingNodes.node("node2").numberOfShardsWithState(STARTED), equalTo(7)); assertThat(routingNodes.node("node3").numberOfShardsWithState(STARTED), equalTo(6)); } }