mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-13 00:15:47 +00:00
Improve allocation of unassigned shards with early termination
When we allocate unassigned shards we can terminate early for some shards like if we already tried to allocate a replica we don't need to try the same replica if the first one got rejected. We also can check if certain nodes can't allocate any primaries or shrads at all and take those nodes out of the picture for the current round since it will not change in the current round.
This commit is contained in:
parent
2b6214cff7
commit
79ab05cdcf
@ -147,5 +147,14 @@ public class MutableShardRouting extends ImmutableShardRouting {
|
||||
primary = false;
|
||||
}
|
||||
|
||||
private long hashVersion = version-1;
|
||||
private int hashCode = 0;
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
hashCode = (hashVersion != version ? super.hashCode() : hashCode);
|
||||
hashVersion = version;
|
||||
return hashCode;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -584,6 +584,14 @@ public class RoutingNodes implements Iterable<RoutingNode> {
|
||||
public void copyAll(Collection<MutableShardRouting> others) {
|
||||
others.addAll(unassigned);
|
||||
}
|
||||
|
||||
public MutableShardRouting[] drain() {
|
||||
MutableShardRouting[] mutableShardRoutings = unassigned.toArray(new MutableShardRouting[unassigned.size()]);
|
||||
unassigned.clear();
|
||||
primaries = 0;
|
||||
transactionId++;
|
||||
return mutableShardRoutings;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
@ -20,6 +20,7 @@
|
||||
package org.elasticsearch.cluster.routing.allocation.allocator;
|
||||
|
||||
import com.google.common.base.Predicate;
|
||||
import org.apache.lucene.util.ArrayUtil;
|
||||
import org.apache.lucene.util.IntroSorter;
|
||||
import org.elasticsearch.ElasticSearchIllegalArgumentException;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
@ -30,6 +31,7 @@ import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.Decision.Type;
|
||||
import org.elasticsearch.common.collect.IdentityHashSet;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
@ -367,79 +369,81 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
||||
}
|
||||
final RoutingNodes.UnassignedShards unassigned = routingNodes.unassigned().transactionBegin();
|
||||
boolean changed = initialize(routingNodes, unassigned);
|
||||
NodeSorter sorter = newNodeSorter();
|
||||
if (nodes.size() > 1) { /* skip if we only have one node */
|
||||
for (String index : buildWeightOrderedIndidces(Operation.BALANCE, sorter)) {
|
||||
sorter.reset(Operation.BALANCE, index);
|
||||
final float[] weights = sorter.weights;
|
||||
final ModelNode[] modelNodes = sorter.modelNodes;
|
||||
int lowIdx = 0;
|
||||
int highIdx = weights.length - 1;
|
||||
while (true) {
|
||||
final ModelNode minNode = modelNodes[lowIdx];
|
||||
final ModelNode maxNode = modelNodes[highIdx];
|
||||
advance_range:
|
||||
if (maxNode.numShards(index) > 0) {
|
||||
float delta = weights[highIdx] - weights[lowIdx];
|
||||
delta = delta <= threshold ? delta : sorter.weight(Operation.THRESHOLD_CHECK, maxNode) - sorter.weight(Operation.THRESHOLD_CHECK, minNode);
|
||||
if (delta <= threshold) {
|
||||
if (lowIdx > 0 && highIdx-1 > 0 // is there a chance for a higher delta?
|
||||
&& (weights[highIdx-1] - weights[0] > threshold) // check if we need to break at all
|
||||
) {
|
||||
/* This is a special case if allocations from the "heaviest" to the "lighter" nodes is not possible
|
||||
* due to some allocation decider restrictions like zone awareness. if one zone has for instance
|
||||
* less nodes than another zone. so one zone is horribly overloaded from a balanced perspective but we
|
||||
* can't move to the "lighter" shards since otherwise the zone would go over capacity.
|
||||
*
|
||||
* This break jumps straight to the condition below were we start moving from the high index towards
|
||||
* the low index to shrink the window we are considering for balance from the other direction.
|
||||
* (check shrinking the window from MAX to MIN)
|
||||
* See #3580
|
||||
*/
|
||||
break advance_range;
|
||||
if (!changed) {
|
||||
NodeSorter sorter = newNodeSorter();
|
||||
if (nodes.size() > 1) { /* skip if we only have one node */
|
||||
for (String index : buildWeightOrderedIndidces(Operation.BALANCE, sorter)) {
|
||||
sorter.reset(Operation.BALANCE, index);
|
||||
final float[] weights = sorter.weights;
|
||||
final ModelNode[] modelNodes = sorter.modelNodes;
|
||||
int lowIdx = 0;
|
||||
int highIdx = weights.length - 1;
|
||||
while (true) {
|
||||
final ModelNode minNode = modelNodes[lowIdx];
|
||||
final ModelNode maxNode = modelNodes[highIdx];
|
||||
advance_range:
|
||||
if (maxNode.numShards(index) > 0) {
|
||||
float delta = weights[highIdx] - weights[lowIdx];
|
||||
delta = delta <= threshold ? delta : sorter.weight(Operation.THRESHOLD_CHECK, maxNode) - sorter.weight(Operation.THRESHOLD_CHECK, minNode);
|
||||
if (delta <= threshold) {
|
||||
if (lowIdx > 0 && highIdx-1 > 0 // is there a chance for a higher delta?
|
||||
&& (weights[highIdx-1] - weights[0] > threshold) // check if we need to break at all
|
||||
) {
|
||||
/* This is a special case if allocations from the "heaviest" to the "lighter" nodes is not possible
|
||||
* due to some allocation decider restrictions like zone awareness. if one zone has for instance
|
||||
* less nodes than another zone. so one zone is horribly overloaded from a balanced perspective but we
|
||||
* can't move to the "lighter" shards since otherwise the zone would go over capacity.
|
||||
*
|
||||
* This break jumps straight to the condition below were we start moving from the high index towards
|
||||
* the low index to shrink the window we are considering for balance from the other direction.
|
||||
* (check shrinking the window from MAX to MIN)
|
||||
* See #3580
|
||||
*/
|
||||
break advance_range;
|
||||
}
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Stop balancing index [{}] min_node [{}] weight: [{}] max_node [{}] weight: [{}] delta: [{}]",
|
||||
index, maxNode.getNodeId(), weights[highIdx], minNode.getNodeId(), weights[lowIdx], delta);
|
||||
}
|
||||
break;
|
||||
}
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Stop balancing index [{}] min_node [{}] weight: [{}] max_node [{}] weight: [{}] delta: [{}]",
|
||||
index, maxNode.getNodeId(), weights[highIdx], minNode.getNodeId(), weights[lowIdx], delta);
|
||||
logger.trace("Balancing from node [{}] weight: [{}] to node [{}] weight: [{}] delta: [{}]",
|
||||
maxNode.getNodeId(), weights[highIdx], minNode.getNodeId(), weights[lowIdx], delta);
|
||||
}
|
||||
/* pass the delta to the replication function to prevent relocations that only swap the weights of the two nodes.
|
||||
* a relocation must bring us closer to the balance if we only achive the same delta the relocation is useless */
|
||||
if (tryRelocateShard(Operation.BALANCE, minNode, maxNode, index, delta)) {
|
||||
/*
|
||||
* TODO we could be a bit smarter here, we don't need to fully sort necessarily
|
||||
* we could just find the place to insert linearly but the win might be minor
|
||||
* compared to the added complexity
|
||||
*/
|
||||
weights[lowIdx] = sorter.weight(Operation.BALANCE, modelNodes[lowIdx]);
|
||||
weights[highIdx] = sorter.weight(Operation.BALANCE, modelNodes[highIdx]);
|
||||
sorter.sort(0, weights.length);
|
||||
lowIdx = 0;
|
||||
highIdx = weights.length - 1;
|
||||
changed = true;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
if (lowIdx < highIdx - 1) {
|
||||
/* Shrinking the window from MIN to MAX
|
||||
* we can't move from any shard from the min node lets move on to the next node
|
||||
* and see if the threshold still holds. We either don't have any shard of this
|
||||
* index on this node of allocation deciders prevent any relocation.*/
|
||||
lowIdx++;
|
||||
} else if (lowIdx > 0) {
|
||||
/* Shrinking the window from MAX to MIN
|
||||
* now we go max to min since obviously we can't move anything to the max node
|
||||
* lets pick the next highest */
|
||||
lowIdx = 0;
|
||||
highIdx--;
|
||||
} else {
|
||||
/* we are done here, we either can't relocate anymore or we are balanced */
|
||||
break;
|
||||
}
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Balancing from node [{}] weight: [{}] to node [{}] weight: [{}] delta: [{}]",
|
||||
maxNode.getNodeId(), weights[highIdx], minNode.getNodeId(), weights[lowIdx], delta);
|
||||
}
|
||||
/* pass the delta to the replication function to prevent relocations that only swap the weights of the two nodes.
|
||||
* a relocation must bring us closer to the balance if we only achive the same delta the relocation is useless */
|
||||
if (tryRelocateShard(Operation.BALANCE, minNode, maxNode, index, delta)) {
|
||||
/*
|
||||
* TODO we could be a bit smarter here, we don't need to fully sort necessarily
|
||||
* we could just find the place to insert linearly but the win might be minor
|
||||
* compared to the added complexity
|
||||
*/
|
||||
weights[lowIdx] = sorter.weight(Operation.BALANCE, modelNodes[lowIdx]);
|
||||
weights[highIdx] = sorter.weight(Operation.BALANCE, modelNodes[highIdx]);
|
||||
sorter.sort(0, weights.length);
|
||||
lowIdx = 0;
|
||||
highIdx = weights.length - 1;
|
||||
changed = true;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
if (lowIdx < highIdx - 1) {
|
||||
/* Shrinking the window from MIN to MAX
|
||||
* we can't move from any shard from the min node lets move on to the next node
|
||||
* and see if the threshold still holds. We either don't have any shard of this
|
||||
* index on this node of allocation deciders prevent any relocation.*/
|
||||
lowIdx++;
|
||||
} else if (lowIdx > 0) {
|
||||
/* Shrinking the window from MAX to MIN
|
||||
* now we go max to min since obviously we can't move anything to the max node
|
||||
* lets pick the next highest */
|
||||
lowIdx = 0;
|
||||
highIdx--;
|
||||
} else {
|
||||
/* we are done here, we either can't relocate anymore or we are balanced */
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -521,38 +525,39 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
||||
}
|
||||
final RoutingNodes.UnassignedShards unassigned = routingNodes.unassigned().transactionBegin();
|
||||
boolean changed = initialize(routingNodes, unassigned);
|
||||
if (!changed) {
|
||||
final ModelNode sourceNode = nodes.get(node.nodeId());
|
||||
assert sourceNode != null;
|
||||
final NodeSorter sorter = newNodeSorter();
|
||||
sorter.reset(Operation.MOVE, shard.getIndex());
|
||||
final ModelNode[] nodes = sorter.modelNodes;
|
||||
assert sourceNode.containsShard(shard);
|
||||
/*
|
||||
* the sorter holds the minimum weight node first for the shards index.
|
||||
* We now walk through the nodes until we find a node to allocate the shard.
|
||||
* This is not guaranteed to be balanced after this operation we still try best effort to
|
||||
* allocate on the minimal eligible node.
|
||||
*/
|
||||
|
||||
final ModelNode sourceNode = nodes.get(node.nodeId());
|
||||
assert sourceNode != null;
|
||||
final NodeSorter sorter = newNodeSorter();
|
||||
sorter.reset(Operation.MOVE, shard.getIndex());
|
||||
final ModelNode[] nodes = sorter.modelNodes;
|
||||
assert sourceNode.containsShard(shard);
|
||||
/*
|
||||
* the sorter holds the minimum weight node first for the shards index.
|
||||
* We now walk through the nodes until we find a node to allocate the shard.
|
||||
* This is not guaranteed to be balanced after this operation we still try best effort to
|
||||
* allocate on the minimal eligible node.
|
||||
*/
|
||||
|
||||
for (ModelNode currentNode : nodes) {
|
||||
if (currentNode.getNodeId().equals(node.nodeId())) {
|
||||
continue;
|
||||
}
|
||||
RoutingNode target = routingNodes.node(currentNode.getNodeId());
|
||||
Decision decision = allocation.deciders().canAllocate(shard, target, allocation);
|
||||
if (decision.type() == Type.YES) { // TODO maybe we can respect throttling here too?
|
||||
sourceNode.removeShard(shard);
|
||||
final MutableShardRouting initializingShard = new MutableShardRouting(shard.index(), shard.id(), currentNode.getNodeId(),
|
||||
shard.currentNodeId(), shard.restoreSource(), shard.primary(), INITIALIZING, shard.version() + 1);
|
||||
currentNode.addShard(initializingShard, decision);
|
||||
routingNodes.assign(initializingShard, target.nodeId());
|
||||
routingNodes.relocate(shard, target.nodeId()); // set the node to relocate after we added the initializing shard
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Moved shard [{}] to node [{}]", shard, currentNode.getNodeId());
|
||||
for (ModelNode currentNode : nodes) {
|
||||
if (currentNode.getNodeId().equals(node.nodeId())) {
|
||||
continue;
|
||||
}
|
||||
RoutingNode target = routingNodes.node(currentNode.getNodeId());
|
||||
Decision decision = allocation.deciders().canAllocate(shard, target, allocation);
|
||||
if (decision.type() == Type.YES) { // TODO maybe we can respect throttling here too?
|
||||
sourceNode.removeShard(shard);
|
||||
final MutableShardRouting initializingShard = new MutableShardRouting(shard.index(), shard.id(), currentNode.getNodeId(),
|
||||
shard.currentNodeId(), shard.restoreSource(), shard.primary(), INITIALIZING, shard.version() + 1);
|
||||
currentNode.addShard(initializingShard, decision);
|
||||
routingNodes.assign(initializingShard, target.nodeId());
|
||||
routingNodes.relocate(shard, target.nodeId()); // set the node to relocate after we added the initializing shard
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Moved shard [{}] to node [{}]", shard, currentNode.getNodeId());
|
||||
}
|
||||
changed = true;
|
||||
break;
|
||||
}
|
||||
changed = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
routingNodes.unassigned().transactionEnd(unassigned);
|
||||
@ -603,40 +608,57 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
||||
* use the sorter to save some iterations.
|
||||
*/
|
||||
final AllocationDeciders deciders = allocation.deciders();
|
||||
final Set<MutableShardRouting> currentRound = new TreeSet<MutableShardRouting>(new Comparator<MutableShardRouting>() {
|
||||
final Comparator<MutableShardRouting> comparator = new Comparator<MutableShardRouting>() {
|
||||
@Override
|
||||
public int compare(MutableShardRouting o1,
|
||||
MutableShardRouting o2) {
|
||||
if (o1.primary() ^ o2.primary()) {
|
||||
return o1.primary() ? -1 : o2.primary() ? 1 : 0;
|
||||
}
|
||||
final int indexCmp;
|
||||
if ((indexCmp = o1.index().compareTo(o2.index())) == 0) {
|
||||
if (o1.getId() - o2.getId() == 0) {
|
||||
return o1.primary() ? -1 : o2.primary() ? 1 : 0;
|
||||
}
|
||||
return o1.getId() - o2.getId();
|
||||
|
||||
}
|
||||
return indexCmp;
|
||||
}
|
||||
});
|
||||
};
|
||||
/*
|
||||
* we use 2 arrays and move replicas to the second array once we allocated an identical
|
||||
* replica in the current iteration to make sure all indices get allocated in the same manner.
|
||||
* The arrays are sorted by primaries first and then by index and shard ID so a 2 indices with 2 replica and 1 shard would look like:
|
||||
* [(0,P,IDX1), (0,P,IDX2), (0,R,IDX1), (0,R,IDX1), (0,R,IDX2), (0,R,IDX2)]
|
||||
* if we allocate for instance (0, R, IDX1) we move the second replica to the secondary array and proceed with
|
||||
* the next replica. If we could not find a node to allocate (0,R,IDX1) we move all it's replicas to ingoreUnassigned.
|
||||
*/
|
||||
MutableShardRouting[] primary = unassigned.drain();
|
||||
MutableShardRouting[] secondary = new MutableShardRouting[primary.length];
|
||||
int secondaryLength = 0;
|
||||
int primaryLength = primary.length;
|
||||
ArrayUtil.timSort(primary, comparator);
|
||||
final Set<ModelNode> values = new IdentityHashSet<ModelNode>(nodes.values());
|
||||
do {
|
||||
Iterator<MutableShardRouting> iterator = unassigned.iterator();
|
||||
while (iterator.hasNext()) {
|
||||
/* we treat every index equally here once chunk a time such that we fill up
|
||||
* nodes with all indices at the same time. Only on shard of a shard a time.
|
||||
* Although there might be a primary and a shard of a shard in the set but
|
||||
* primaries will be started first.*/
|
||||
if (currentRound.add(iterator.next())) {
|
||||
iterator.remove();
|
||||
for (int i = 0; i < primaryLength; i++) {
|
||||
MutableShardRouting shard = primary[i];
|
||||
if (!shard.primary()) {
|
||||
boolean drop = deciders.canAllocate(shard, allocation).type() == Type.NO;
|
||||
if (drop) {
|
||||
ignoredUnassigned.add(shard);
|
||||
while(i < primaryLength-1 && comparator.compare(primary[i], primary[i+1]) == 0) {
|
||||
ignoredUnassigned.add(primary[++i]);
|
||||
}
|
||||
continue;
|
||||
} else {
|
||||
while(i < primaryLength-1 && comparator.compare(primary[i], primary[i+1]) == 0) {
|
||||
secondary[secondaryLength++] = primary[++i];
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
boolean iterationChanged = false;
|
||||
for (MutableShardRouting shard : currentRound) {
|
||||
assert !shard.assignedToNode();
|
||||
assert !shard.assignedToNode() : shard;
|
||||
/* find an node with minimal weight we can allocate on*/
|
||||
float minWeight = Float.POSITIVE_INFINITY;
|
||||
ModelNode minNode = null;
|
||||
Decision decision = null;
|
||||
for (ModelNode node : nodes.values()) {
|
||||
for (ModelNode node : values) {
|
||||
/*
|
||||
* The shard we add is removed below to simulate the
|
||||
* addition for weight calculation we use Decision.ALWAYS to
|
||||
@ -696,7 +718,6 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
||||
}
|
||||
assert decision != null && minNode != null || decision == null && minNode == null;
|
||||
if (minNode != null) {
|
||||
iterationChanged = true;
|
||||
minNode.addShard(shard, decision);
|
||||
if (decision.type() == Type.YES) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
@ -705,6 +726,14 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
||||
routingNodes.assign(shard, routingNodes.node(minNode.getNodeId()).nodeId());
|
||||
changed = true;
|
||||
continue; // don't add to ignoreUnassigned
|
||||
} else {
|
||||
final RoutingNode node = routingNodes.node(minNode.getNodeId());
|
||||
if (deciders.canAllocate(node, allocation).type() != Type.YES) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Can not allocate on node [{}] remove from round decisin [{}]", node, decision.type());
|
||||
}
|
||||
values.remove(minNode);
|
||||
}
|
||||
}
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("No eligable node found to assign shard [{}] decision [{}]", shard, decision.type());
|
||||
@ -713,14 +742,18 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
||||
logger.trace("No Node found to assign shard [{}]", shard);
|
||||
}
|
||||
ignoredUnassigned.add(shard);
|
||||
if (!shard.primary()) { // we could not allocate it and we are a replica - check if we can ignore the other replicas
|
||||
while(secondaryLength > 0 && comparator.compare(shard, secondary[secondaryLength-1]) == 0) {
|
||||
ignoredUnassigned.add(secondary[--secondaryLength]);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!iterationChanged && !unassigned.isEmpty()) {
|
||||
unassigned.copyAll(ignoredUnassigned);
|
||||
unassigned.clear();
|
||||
return changed;
|
||||
}
|
||||
currentRound.clear();
|
||||
} while (!unassigned.isEmpty());
|
||||
primaryLength = secondaryLength;
|
||||
MutableShardRouting[] tmp = primary;
|
||||
primary = secondary;
|
||||
secondary = tmp;
|
||||
secondaryLength = 0;
|
||||
} while (primaryLength > 0);
|
||||
// clear everything we have either added it or moved to ingoreUnassigned
|
||||
return changed;
|
||||
}
|
||||
|
@ -64,4 +64,20 @@ public abstract class AllocationDecider extends AbstractComponent {
|
||||
public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
|
||||
return Decision.ALWAYS;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a {@link Decision} whether the given shard routing can be allocated at all at this state of the
|
||||
* {@link RoutingAllocation}. The default is {@link Decision#ALWAYS}.
|
||||
*/
|
||||
public Decision canAllocate(ShardRouting shardRouting, RoutingAllocation allocation) {
|
||||
return Decision.ALWAYS;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a {@link Decision} whether the given node can allow any allocation at all at this state of the
|
||||
* {@link RoutingAllocation}. The default is {@link Decision#ALWAYS}.
|
||||
*/
|
||||
public Decision canAllocate(RoutingNode node, RoutingAllocation allocation) {
|
||||
return Decision.ALWAYS;
|
||||
}
|
||||
}
|
||||
|
@ -153,4 +153,32 @@ public class AllocationDeciders extends AllocationDecider {
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
public Decision canAllocate(ShardRouting shardRouting, RoutingAllocation allocation) {
|
||||
Decision.Multi ret = new Decision.Multi();
|
||||
for (AllocationDecider allocationDecider : allocations) {
|
||||
Decision decision = allocationDecider.canAllocate(shardRouting, allocation);
|
||||
// short track if a NO is returned.
|
||||
if (decision == Decision.NO) {
|
||||
return decision;
|
||||
} else if (decision != Decision.ALWAYS) {
|
||||
ret.add(decision);
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
public Decision canAllocate(RoutingNode node, RoutingAllocation allocation) {
|
||||
Decision.Multi ret = new Decision.Multi();
|
||||
for (AllocationDecider allocationDecider : allocations) {
|
||||
Decision decision = allocationDecider.canAllocate(node, allocation);
|
||||
// short track if a NO is returned.
|
||||
if (decision == Decision.NO) {
|
||||
return decision;
|
||||
} else if (decision != Decision.ALWAYS) {
|
||||
ret.add(decision);
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
|
@ -38,6 +38,10 @@ public class ReplicaAfterPrimaryActiveAllocationDecider extends AllocationDecide
|
||||
|
||||
@Override
|
||||
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
|
||||
return canAllocate(shardRouting, allocation);
|
||||
}
|
||||
|
||||
public Decision canAllocate(ShardRouting shardRouting, RoutingAllocation allocation) {
|
||||
if (shardRouting.primary()) {
|
||||
return Decision.YES;
|
||||
}
|
||||
|
@ -77,7 +77,7 @@ public class ThrottlingAllocationDecider extends AllocationDecider {
|
||||
// primary is unassigned, means we are going to do recovery from gateway
|
||||
// count *just the primary* currently doing recovery on the node and check against concurrent_recoveries
|
||||
int primariesInRecovery = 0;
|
||||
for (MutableShardRouting shard : node) {;
|
||||
for (MutableShardRouting shard : node) {
|
||||
// when a primary shard is INITIALIZING, it can be because of *initial recovery* or *relocation from another node*
|
||||
// we only count initial recoveries here, so we need to make sure that relocating node is null
|
||||
if (shard.state() == ShardRoutingState.INITIALIZING && shard.primary() && shard.relocatingNodeId() == null) {
|
||||
@ -95,13 +95,16 @@ public class ThrottlingAllocationDecider extends AllocationDecider {
|
||||
// either primary or replica doing recovery (from peer shard)
|
||||
|
||||
// count the number of recoveries on the node, its for both target (INITIALIZING) and source (RELOCATING)
|
||||
return canAllocate(node, allocation);
|
||||
}
|
||||
|
||||
public Decision canAllocate(RoutingNode node, RoutingAllocation allocation) {
|
||||
int currentRecoveries = 0;
|
||||
for (MutableShardRouting shard : node) {
|
||||
if (shard.state() == ShardRoutingState.INITIALIZING || shard.state() == ShardRoutingState.RELOCATING) {
|
||||
currentRecoveries++;
|
||||
}
|
||||
}
|
||||
|
||||
if (currentRecoveries >= concurrentRecoveries) {
|
||||
return Decision.THROTTLE;
|
||||
} else {
|
||||
|
@ -39,7 +39,7 @@ public class ClusterAllocationRerouteBenchmark {
|
||||
private static final ESLogger logger = Loggers.getLogger(ClusterAllocationRerouteBenchmark.class);
|
||||
|
||||
public static void main(String[] args) {
|
||||
final int numberOfRuns = 10;
|
||||
final int numberOfRuns = 1;
|
||||
final int numIndices = 5 * 365; // five years
|
||||
final int numShards = 6;
|
||||
final int numReplicas = 2;
|
||||
|
@ -298,6 +298,7 @@ public class FailedShardsRoutingTests extends ElasticsearchTestCase {
|
||||
RoutingTable prevRoutingTable = routingTable;
|
||||
routingTable = strategy.reroute(clusterState).routingTable();
|
||||
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
|
||||
final String nodeHoldingPrimary = routingTable.index("test").shard(0).primaryShard().currentNodeId();
|
||||
|
||||
assertThat(prevRoutingTable != routingTable, equalTo(true));
|
||||
assertThat(routingTable.index("test").shards().size(), equalTo(1));
|
||||
@ -305,14 +306,14 @@ public class FailedShardsRoutingTests extends ElasticsearchTestCase {
|
||||
assertThat(routingTable.index("test").shard(i).size(), equalTo(2));
|
||||
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2));
|
||||
assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(INITIALIZING));
|
||||
assertThat(routingTable.index("test").shard(i).primaryShard().currentNodeId(), equalTo("node1"));
|
||||
assertThat(routingTable.index("test").shard(i).primaryShard().currentNodeId(), equalTo(nodeHoldingPrimary));
|
||||
assertThat(routingTable.index("test").shard(i).replicaShards().size(), equalTo(1));
|
||||
assertThat(routingTable.index("test").shard(i).replicaShards().get(0).state(), equalTo(UNASSIGNED));
|
||||
}
|
||||
|
||||
logger.info("fail the first shard, will start INITIALIZING on the second node");
|
||||
prevRoutingTable = routingTable;
|
||||
routingTable = strategy.applyFailedShard(clusterState, new ImmutableShardRouting("test", 0, "node1", true, INITIALIZING, 0)).routingTable();
|
||||
routingTable = strategy.applyFailedShard(clusterState, new ImmutableShardRouting("test", 0, nodeHoldingPrimary, true, INITIALIZING, 0)).routingTable();
|
||||
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
|
||||
RoutingNodes routingNodes = clusterState.routingNodes();
|
||||
|
||||
@ -322,13 +323,13 @@ public class FailedShardsRoutingTests extends ElasticsearchTestCase {
|
||||
assertThat(routingTable.index("test").shard(i).size(), equalTo(2));
|
||||
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2));
|
||||
assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(INITIALIZING));
|
||||
assertThat(routingTable.index("test").shard(i).primaryShard().currentNodeId(), equalTo("node2"));
|
||||
assertThat(routingTable.index("test").shard(i).primaryShard().currentNodeId(), not(equalTo(nodeHoldingPrimary)));
|
||||
assertThat(routingTable.index("test").shard(i).replicaShards().size(), equalTo(1));
|
||||
assertThat(routingTable.index("test").shard(i).replicaShards().get(0).state(), equalTo(UNASSIGNED));
|
||||
}
|
||||
|
||||
logger.info("fail the shard again, see that nothing happens");
|
||||
assertThat(strategy.applyFailedShard(clusterState, new ImmutableShardRouting("test", 0, "node1", true, INITIALIZING, 0)).changed(), equalTo(false));
|
||||
assertThat(strategy.applyFailedShard(clusterState, new ImmutableShardRouting("test", 0, nodeHoldingPrimary, true, INITIALIZING, 0)).changed(), equalTo(false));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -34,6 +34,7 @@ import static org.elasticsearch.cluster.routing.ShardRoutingState.*;
|
||||
import static org.elasticsearch.cluster.routing.allocation.RoutingAllocationTests.newNode;
|
||||
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.not;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
|
||||
/**
|
||||
@ -73,13 +74,15 @@ public class ReplicaAllocatedAfterPrimaryTests extends ElasticsearchTestCase {
|
||||
RoutingTable prevRoutingTable = routingTable;
|
||||
routingTable = strategy.reroute(clusterState).routingTable();
|
||||
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
|
||||
final String nodeHoldingPrimary = routingTable.index("test").shard(0).primaryShard().currentNodeId();
|
||||
|
||||
|
||||
assertThat(prevRoutingTable != routingTable, equalTo(true));
|
||||
assertThat(routingTable.index("test").shards().size(), equalTo(1));
|
||||
assertThat(routingTable.index("test").shard(0).size(), equalTo(2));
|
||||
assertThat(routingTable.index("test").shard(0).shards().size(), equalTo(2));
|
||||
assertThat(routingTable.index("test").shard(0).primaryShard().state(), equalTo(INITIALIZING));
|
||||
assertThat(routingTable.index("test").shard(0).primaryShard().currentNodeId(), equalTo("node1"));
|
||||
assertThat(routingTable.index("test").shard(0).primaryShard().currentNodeId(), equalTo(nodeHoldingPrimary));
|
||||
assertThat(routingTable.index("test").shard(0).replicaShards().size(), equalTo(1));
|
||||
assertThat(routingTable.index("test").shard(0).replicaShards().get(0).state(), equalTo(UNASSIGNED));
|
||||
assertThat(routingTable.index("test").shard(0).replicaShards().get(0).currentNodeId(), nullValue());
|
||||
@ -87,18 +90,19 @@ public class ReplicaAllocatedAfterPrimaryTests extends ElasticsearchTestCase {
|
||||
logger.info("Start all the primary shards");
|
||||
RoutingNodes routingNodes = clusterState.routingNodes();
|
||||
prevRoutingTable = routingTable;
|
||||
routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node1").shardsWithState(INITIALIZING)).routingTable();
|
||||
routingTable = strategy.applyStartedShards(clusterState, routingNodes.node(nodeHoldingPrimary).shardsWithState(INITIALIZING)).routingTable();
|
||||
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
|
||||
|
||||
final String nodeHoldingReplica = routingTable.index("test").shard(0).replicaShards().get(0).currentNodeId();
|
||||
assertThat(nodeHoldingPrimary, not(equalTo(nodeHoldingReplica)));
|
||||
assertThat(prevRoutingTable != routingTable, equalTo(true));
|
||||
assertThat(routingTable.index("test").shards().size(), equalTo(1));
|
||||
assertThat(routingTable.index("test").shard(0).size(), equalTo(2));
|
||||
assertThat(routingTable.index("test").shard(0).shards().size(), equalTo(2));
|
||||
assertThat(routingTable.index("test").shard(0).primaryShard().state(), equalTo(STARTED));
|
||||
assertThat(routingTable.index("test").shard(0).primaryShard().currentNodeId(), equalTo("node1"));
|
||||
assertThat(routingTable.index("test").shard(0).primaryShard().currentNodeId(), equalTo(nodeHoldingPrimary));
|
||||
assertThat(routingTable.index("test").shard(0).replicaShards().size(), equalTo(1));
|
||||
assertThat(routingTable.index("test").shard(0).replicaShards().get(0).state(), equalTo(INITIALIZING));
|
||||
assertThat(routingTable.index("test").shard(0).replicaShards().get(0).currentNodeId(), equalTo("node2"));
|
||||
assertThat(routingTable.index("test").shard(0).replicaShards().get(0).currentNodeId(), equalTo(nodeHoldingReplica));
|
||||
|
||||
}
|
||||
}
|
||||
|
@ -58,7 +58,7 @@ public class UpdateNumberOfReplicasTests extends ElasticsearchTestCase {
|
||||
logger.info("Start all the primary shards");
|
||||
RoutingNodes routingNodes = clusterState.routingNodes();
|
||||
prevRoutingTable = routingTable;
|
||||
routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node1").shardsWithState(INITIALIZING)).routingTable();
|
||||
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
|
||||
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
|
||||
|
||||
logger.info("Start all the replica shards");
|
||||
@ -66,16 +66,19 @@ public class UpdateNumberOfReplicasTests extends ElasticsearchTestCase {
|
||||
prevRoutingTable = routingTable;
|
||||
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
|
||||
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
|
||||
|
||||
final String nodeHoldingPrimary = routingTable.index("test").shard(0).primaryShard().currentNodeId();
|
||||
final String nodeHoldingReplica = routingTable.index("test").shard(0).replicaShards().get(0).currentNodeId();
|
||||
assertThat(nodeHoldingPrimary, not(equalTo(nodeHoldingReplica)));
|
||||
assertThat(prevRoutingTable != routingTable, equalTo(true));
|
||||
assertThat(routingTable.index("test").shards().size(), equalTo(1));
|
||||
assertThat(routingTable.index("test").shard(0).size(), equalTo(2));
|
||||
assertThat(routingTable.index("test").shard(0).shards().size(), equalTo(2));
|
||||
assertThat(routingTable.index("test").shard(0).primaryShard().state(), equalTo(STARTED));
|
||||
assertThat(routingTable.index("test").shard(0).primaryShard().currentNodeId(), equalTo("node1"));
|
||||
assertThat(routingTable.index("test").shard(0).primaryShard().currentNodeId(), equalTo(nodeHoldingPrimary));
|
||||
assertThat(routingTable.index("test").shard(0).replicaShards().size(), equalTo(1));
|
||||
assertThat(routingTable.index("test").shard(0).replicaShards().get(0).state(), equalTo(STARTED));
|
||||
assertThat(routingTable.index("test").shard(0).replicaShards().get(0).currentNodeId(), equalTo("node2"));
|
||||
assertThat(routingTable.index("test").shard(0).replicaShards().get(0).currentNodeId(), equalTo(nodeHoldingReplica));
|
||||
|
||||
|
||||
logger.info("add another replica");
|
||||
routingNodes = clusterState.routingNodes();
|
||||
@ -90,10 +93,10 @@ public class UpdateNumberOfReplicasTests extends ElasticsearchTestCase {
|
||||
assertThat(routingTable.index("test").shards().size(), equalTo(1));
|
||||
assertThat(routingTable.index("test").shard(0).size(), equalTo(3));
|
||||
assertThat(routingTable.index("test").shard(0).primaryShard().state(), equalTo(STARTED));
|
||||
assertThat(routingTable.index("test").shard(0).primaryShard().currentNodeId(), equalTo("node1"));
|
||||
assertThat(routingTable.index("test").shard(0).primaryShard().currentNodeId(), equalTo(nodeHoldingPrimary));
|
||||
assertThat(routingTable.index("test").shard(0).replicaShards().size(), equalTo(2));
|
||||
assertThat(routingTable.index("test").shard(0).replicaShards().get(0).state(), equalTo(STARTED));
|
||||
assertThat(routingTable.index("test").shard(0).replicaShards().get(0).currentNodeId(), equalTo("node2"));
|
||||
assertThat(routingTable.index("test").shard(0).replicaShards().get(0).currentNodeId(), equalTo(nodeHoldingReplica));
|
||||
assertThat(routingTable.index("test").shard(0).replicaShards().get(1).state(), equalTo(UNASSIGNED));
|
||||
|
||||
logger.info("Add another node and start the added replica");
|
||||
@ -106,10 +109,10 @@ public class UpdateNumberOfReplicasTests extends ElasticsearchTestCase {
|
||||
assertThat(routingTable.index("test").shards().size(), equalTo(1));
|
||||
assertThat(routingTable.index("test").shard(0).size(), equalTo(3));
|
||||
assertThat(routingTable.index("test").shard(0).primaryShard().state(), equalTo(STARTED));
|
||||
assertThat(routingTable.index("test").shard(0).primaryShard().currentNodeId(), equalTo("node1"));
|
||||
assertThat(routingTable.index("test").shard(0).primaryShard().currentNodeId(), equalTo(nodeHoldingPrimary));
|
||||
assertThat(routingTable.index("test").shard(0).replicaShards().size(), equalTo(2));
|
||||
assertThat(routingTable.index("test").shard(0).replicaShards().get(0).state(), equalTo(STARTED));
|
||||
assertThat(routingTable.index("test").shard(0).replicaShards().get(0).currentNodeId(), equalTo("node2"));
|
||||
assertThat(routingTable.index("test").shard(0).replicaShards().get(0).currentNodeId(), equalTo(nodeHoldingReplica));
|
||||
assertThat(routingTable.index("test").shard(0).replicaShards().get(1).state(), equalTo(INITIALIZING));
|
||||
assertThat(routingTable.index("test").shard(0).replicaShards().get(1).currentNodeId(), equalTo("node3"));
|
||||
|
||||
@ -122,10 +125,10 @@ public class UpdateNumberOfReplicasTests extends ElasticsearchTestCase {
|
||||
assertThat(routingTable.index("test").shards().size(), equalTo(1));
|
||||
assertThat(routingTable.index("test").shard(0).size(), equalTo(3));
|
||||
assertThat(routingTable.index("test").shard(0).primaryShard().state(), equalTo(STARTED));
|
||||
assertThat(routingTable.index("test").shard(0).primaryShard().currentNodeId(), equalTo("node1"));
|
||||
assertThat(routingTable.index("test").shard(0).primaryShard().currentNodeId(), equalTo(nodeHoldingPrimary));
|
||||
assertThat(routingTable.index("test").shard(0).replicaShards().size(), equalTo(2));
|
||||
assertThat(routingTable.index("test").shard(0).replicaShards().get(0).state(), equalTo(STARTED));
|
||||
assertThat(routingTable.index("test").shard(0).replicaShards().get(0).currentNodeId(), equalTo("node2"));
|
||||
assertThat(routingTable.index("test").shard(0).replicaShards().get(0).currentNodeId(), equalTo(nodeHoldingReplica));
|
||||
assertThat(routingTable.index("test").shard(0).replicaShards().get(1).state(), equalTo(STARTED));
|
||||
assertThat(routingTable.index("test").shard(0).replicaShards().get(1).currentNodeId(), equalTo("node3"));
|
||||
|
||||
@ -142,10 +145,10 @@ public class UpdateNumberOfReplicasTests extends ElasticsearchTestCase {
|
||||
assertThat(routingTable.index("test").shards().size(), equalTo(1));
|
||||
assertThat(routingTable.index("test").shard(0).size(), equalTo(2));
|
||||
assertThat(routingTable.index("test").shard(0).primaryShard().state(), equalTo(STARTED));
|
||||
assertThat(routingTable.index("test").shard(0).primaryShard().currentNodeId(), equalTo("node1"));
|
||||
assertThat(routingTable.index("test").shard(0).primaryShard().currentNodeId(), equalTo(nodeHoldingPrimary));
|
||||
assertThat(routingTable.index("test").shard(0).replicaShards().size(), equalTo(1));
|
||||
assertThat(routingTable.index("test").shard(0).replicaShards().get(0).state(), equalTo(STARTED));
|
||||
assertThat(routingTable.index("test").shard(0).replicaShards().get(0).currentNodeId(), anyOf(equalTo("node2"), equalTo("node3")));
|
||||
assertThat(routingTable.index("test").shard(0).replicaShards().get(0).currentNodeId(), anyOf(equalTo(nodeHoldingReplica), equalTo("node3")));
|
||||
|
||||
logger.info("do a reroute, should remain the same");
|
||||
prevRoutingTable = routingTable;
|
||||
|
Loading…
x
Reference in New Issue
Block a user