Even shard count distribution counts relocations as two, closes #1354.
This commit is contained in:
parent
7adf2fd497
commit
55d112b043
|
@ -26,9 +26,6 @@ import org.elasticsearch.cluster.metadata.MetaData;
|
||||||
import org.elasticsearch.common.trove.map.hash.TObjectIntHashMap;
|
import org.elasticsearch.common.trove.map.hash.TObjectIntHashMap;
|
||||||
import org.elasticsearch.common.util.concurrent.NotThreadSafe;
|
import org.elasticsearch.common.util.concurrent.NotThreadSafe;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.Comparator;
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -233,22 +230,6 @@ public class RoutingNodes implements Iterable<RoutingNode> {
|
||||||
return shards;
|
return shards;
|
||||||
}
|
}
|
||||||
|
|
||||||
public List<RoutingNode> sortedNodesLeastToHigh() {
|
|
||||||
return nodesToShardsSorted(new Comparator<RoutingNode>() {
|
|
||||||
@Override public int compare(RoutingNode o1, RoutingNode o2) {
|
|
||||||
return o1.shards().size() - o2.shards().size();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
public List<RoutingNode> nodesToShardsSorted(Comparator<RoutingNode> comparator) {
|
|
||||||
List<RoutingNode> nodes = new ArrayList<RoutingNode>(nodesToShards.values());
|
|
||||||
if (comparator != null) {
|
|
||||||
Collections.sort(nodes, comparator);
|
|
||||||
}
|
|
||||||
return nodes;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String prettyPrint() {
|
public String prettyPrint() {
|
||||||
StringBuilder sb = new StringBuilder("routing_nodes:\n");
|
StringBuilder sb = new StringBuilder("routing_nodes:\n");
|
||||||
for (RoutingNode routingNode : this) {
|
for (RoutingNode routingNode : this) {
|
||||||
|
|
|
@ -22,13 +22,17 @@ package org.elasticsearch.cluster.routing.allocation.allocator;
|
||||||
import org.elasticsearch.cluster.routing.MutableShardRouting;
|
import org.elasticsearch.cluster.routing.MutableShardRouting;
|
||||||
import org.elasticsearch.cluster.routing.RoutingNode;
|
import org.elasticsearch.cluster.routing.RoutingNode;
|
||||||
import org.elasticsearch.cluster.routing.RoutingNodes;
|
import org.elasticsearch.cluster.routing.RoutingNodes;
|
||||||
|
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||||
import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation;
|
import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation;
|
||||||
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
|
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
|
||||||
import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation;
|
import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation;
|
||||||
import org.elasticsearch.common.component.AbstractComponent;
|
import org.elasticsearch.common.component.AbstractComponent;
|
||||||
import org.elasticsearch.common.inject.Inject;
|
import org.elasticsearch.common.inject.Inject;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
import org.elasticsearch.common.trove.map.hash.TObjectIntHashMap;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Comparator;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
@ -53,7 +57,7 @@ public class EvenShardsCountAllocator extends AbstractComponent implements Shard
|
||||||
RoutingNodes routingNodes = allocation.routingNodes();
|
RoutingNodes routingNodes = allocation.routingNodes();
|
||||||
|
|
||||||
|
|
||||||
List<RoutingNode> nodes = routingNodes.sortedNodesLeastToHigh();
|
RoutingNode[] nodes = sortedNodesLeastToHigh(allocation);
|
||||||
|
|
||||||
Iterator<MutableShardRouting> unassignedIterator = routingNodes.unassigned().iterator();
|
Iterator<MutableShardRouting> unassignedIterator = routingNodes.unassigned().iterator();
|
||||||
int lastNode = 0;
|
int lastNode = 0;
|
||||||
|
@ -61,10 +65,10 @@ public class EvenShardsCountAllocator extends AbstractComponent implements Shard
|
||||||
while (unassignedIterator.hasNext()) {
|
while (unassignedIterator.hasNext()) {
|
||||||
MutableShardRouting shard = unassignedIterator.next();
|
MutableShardRouting shard = unassignedIterator.next();
|
||||||
// do the allocation, finding the least "busy" node
|
// do the allocation, finding the least "busy" node
|
||||||
for (int i = 0; i < nodes.size(); i++) {
|
for (int i = 0; i < nodes.length; i++) {
|
||||||
RoutingNode node = nodes.get(lastNode);
|
RoutingNode node = nodes[lastNode];
|
||||||
lastNode++;
|
lastNode++;
|
||||||
if (lastNode == nodes.size()) {
|
if (lastNode == nodes.length) {
|
||||||
lastNode = 0;
|
lastNode = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -86,7 +90,7 @@ public class EvenShardsCountAllocator extends AbstractComponent implements Shard
|
||||||
for (Iterator<MutableShardRouting> it = routingNodes.unassigned().iterator(); it.hasNext(); ) {
|
for (Iterator<MutableShardRouting> it = routingNodes.unassigned().iterator(); it.hasNext(); ) {
|
||||||
MutableShardRouting shard = it.next();
|
MutableShardRouting shard = it.next();
|
||||||
// go over the nodes and try and allocate the remaining ones
|
// go over the nodes and try and allocate the remaining ones
|
||||||
for (RoutingNode routingNode : routingNodes.sortedNodesLeastToHigh()) {
|
for (RoutingNode routingNode : sortedNodesLeastToHigh(allocation)) {
|
||||||
if (allocation.deciders().canAllocate(shard, routingNode, allocation).allocate()) {
|
if (allocation.deciders().canAllocate(shard, routingNode, allocation).allocate()) {
|
||||||
changed = true;
|
changed = true;
|
||||||
routingNode.add(shard);
|
routingNode.add(shard);
|
||||||
|
@ -100,18 +104,18 @@ public class EvenShardsCountAllocator extends AbstractComponent implements Shard
|
||||||
|
|
||||||
@Override public boolean rebalance(RoutingAllocation allocation) {
|
@Override public boolean rebalance(RoutingAllocation allocation) {
|
||||||
boolean changed = false;
|
boolean changed = false;
|
||||||
List<RoutingNode> sortedNodesLeastToHigh = allocation.routingNodes().sortedNodesLeastToHigh();
|
RoutingNode[] sortedNodesLeastToHigh = sortedNodesLeastToHigh(allocation);
|
||||||
if (sortedNodesLeastToHigh.isEmpty()) {
|
if (sortedNodesLeastToHigh.length == 0) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
int lowIndex = 0;
|
int lowIndex = 0;
|
||||||
int highIndex = sortedNodesLeastToHigh.size() - 1;
|
int highIndex = sortedNodesLeastToHigh.length - 1;
|
||||||
boolean relocationPerformed;
|
boolean relocationPerformed;
|
||||||
do {
|
do {
|
||||||
relocationPerformed = false;
|
relocationPerformed = false;
|
||||||
while (lowIndex != highIndex) {
|
while (lowIndex != highIndex) {
|
||||||
RoutingNode lowRoutingNode = sortedNodesLeastToHigh.get(lowIndex);
|
RoutingNode lowRoutingNode = sortedNodesLeastToHigh[lowIndex];
|
||||||
RoutingNode highRoutingNode = sortedNodesLeastToHigh.get(highIndex);
|
RoutingNode highRoutingNode = sortedNodesLeastToHigh[highIndex];
|
||||||
int averageNumOfShards = allocation.routingNodes().requiredAverageNumberOfShardsPerNode();
|
int averageNumOfShards = allocation.routingNodes().requiredAverageNumberOfShardsPerNode();
|
||||||
|
|
||||||
// only active shards can be removed so must count only active ones.
|
// only active shards can be removed so must count only active ones.
|
||||||
|
@ -156,8 +160,8 @@ public class EvenShardsCountAllocator extends AbstractComponent implements Shard
|
||||||
@Override public boolean move(MutableShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
|
@Override public boolean move(MutableShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
|
||||||
assert shardRouting.started();
|
assert shardRouting.started();
|
||||||
boolean changed = false;
|
boolean changed = false;
|
||||||
List<RoutingNode> sortedNodesLeastToHigh = allocation.routingNodes().sortedNodesLeastToHigh();
|
RoutingNode[] sortedNodesLeastToHigh = sortedNodesLeastToHigh(allocation);
|
||||||
if (sortedNodesLeastToHigh.isEmpty()) {
|
if (sortedNodesLeastToHigh.length == 0) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -179,4 +183,23 @@ public class EvenShardsCountAllocator extends AbstractComponent implements Shard
|
||||||
|
|
||||||
return changed;
|
return changed;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private RoutingNode[] sortedNodesLeastToHigh(RoutingAllocation allocation) {
|
||||||
|
// create count per node id, taking into account relocations
|
||||||
|
final TObjectIntHashMap<String> nodeCounts = new TObjectIntHashMap<String>();
|
||||||
|
for (RoutingNode node : allocation.routingNodes()) {
|
||||||
|
for (int i = 0; i < node.shards().size(); i++) {
|
||||||
|
ShardRouting shardRouting = node.shards().get(i);
|
||||||
|
String nodeId = shardRouting.relocating() ? shardRouting.relocatingNodeId() : shardRouting.currentNodeId();
|
||||||
|
nodeCounts.adjustOrPutValue(nodeId, 1, 1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
RoutingNode[] nodes = allocation.routingNodes().nodesToShards().values().toArray(new RoutingNode[allocation.routingNodes().nodesToShards().values().size()]);
|
||||||
|
Arrays.sort(nodes, new Comparator<RoutingNode>() {
|
||||||
|
@Override public int compare(RoutingNode o1, RoutingNode o2) {
|
||||||
|
return nodeCounts.get(o1.nodeId()) - nodeCounts.get(o2.nodeId());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
return nodes;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue