Optimizes performance of AllocationDecider execution. Instead of using loops over all ShardRoutings, do accounting in RoutingNodes.

Speeds up recalculating cluster state on large clusters.
This commit is contained in:
Sebastian Geidies 2013-11-25 14:17:21 +01:00 committed by Simon Willnauer
parent 6a856c86e8
commit 6af80d5017
16 changed files with 973 additions and 150 deletions

View File

@ -55,8 +55,9 @@ public class MutableShardRouting extends ImmutableShardRouting {
*
* @param nodeId id of the node to assign this shard to
*/
public void assignToNode(String nodeId) {
void assignToNode(String nodeId) {
version++;
if (currentNodeId == null) {
assert state == ShardRoutingState.UNASSIGNED;
@ -76,7 +77,7 @@ public class MutableShardRouting extends ImmutableShardRouting {
*
* @param relocatingNodeId id of the node to relocate the shard
*/
public void relocate(String relocatingNodeId) {
void relocate(String relocatingNodeId) {
version++;
assert state == ShardRoutingState.STARTED;
state = ShardRoutingState.RELOCATING;
@ -87,7 +88,7 @@ public class MutableShardRouting extends ImmutableShardRouting {
* Cancel relocation of a shard. The shards state must be set
* to <code>RELOCATING</code>.
*/
public void cancelRelocation() {
void cancelRelocation() {
version++;
assert state == ShardRoutingState.RELOCATING;
assert assignedToNode();
@ -101,7 +102,7 @@ public class MutableShardRouting extends ImmutableShardRouting {
* Set the shards state to <code>UNASSIGNED</code>.
* //TODO document the state
*/
public void deassignNode() {
void deassignNode() {
version++;
assert state != ShardRoutingState.UNASSIGNED;
@ -115,7 +116,7 @@ public class MutableShardRouting extends ImmutableShardRouting {
* <code>INITIALIZING</code> or <code>RELOCATING</code>. Any relocation will be
* canceled.
*/
public void moveToStarted() {
void moveToStarted() {
version++;
assert state == ShardRoutingState.INITIALIZING || state == ShardRoutingState.RELOCATING;
relocatingNodeId = null;
@ -127,7 +128,7 @@ public class MutableShardRouting extends ImmutableShardRouting {
* Make the shard primary unless it's not Primary
* //TODO: doc exception
*/
public void moveToPrimary() {
void moveToPrimary() {
version++;
if (primary) {
throw new IllegalShardRoutingStateException(this, "Already primary, can't move to primary");
@ -138,7 +139,7 @@ public class MutableShardRouting extends ImmutableShardRouting {
/**
* Set the primary shard to non-primary
*/
public void moveFromPrimary() {
void moveFromPrimary() {
version++;
if (!primary) {
throw new IllegalShardRoutingStateException(this, "Not primary, can't move to replica");
@ -146,12 +147,5 @@ public class MutableShardRouting extends ImmutableShardRouting {
primary = false;
}
public void restoreFrom(RestoreSource restoreSource) {
version++;
if (!primary) {
throw new IllegalShardRoutingStateException(this, "Not primary, can't restore from snapshot to replica");
}
this.restoreSource = restoreSource;
}
}

View File

@ -84,27 +84,14 @@ public class RoutingNode implements Iterable<MutableShardRouting> {
* Add a new shard to this node
* @param shard Shard to crate on this Node
*/
public void add(MutableShardRouting shard) {
void add(MutableShardRouting shard) {
// TODO use Set with ShardIds for faster lookup.
for (MutableShardRouting shardRouting : shards) {
if (shardRouting.shardId().equals(shard.shardId())) {
throw new ElasticSearchIllegalStateException("Trying to add a shard [" + shard.shardId().index().name() + "][" + shard.shardId().id() + "] to a node [" + nodeId + "] where it already exists");
}
}
shards.add(shard);
shard.assignToNode(node.id());
}
/**
* Remove a shard from this node
* @param shardId id of the shard to remove
*/
public void removeByShardId(int shardId) {
for (Iterator<MutableShardRouting> it = shards.iterator(); it.hasNext(); ) {
MutableShardRouting shard = it.next();
if (shard.id() == shardId) {
it.remove();
}
}
}
/**

View File

@ -35,6 +35,7 @@ import java.util.*;
import static com.google.common.collect.Lists.newArrayList;
import static com.google.common.collect.Maps.newHashMap;
import static com.google.common.collect.Sets.newHashSet;
/**
* {@link RoutingNodes} represents a copy the routing information contained in
@ -54,6 +55,16 @@ public class RoutingNodes implements Iterable<RoutingNode> {
private final List<MutableShardRouting> ignoredUnassigned = newArrayList();
private final Map<ShardId, List<MutableShardRouting>> replicaSets = newHashMap();
private int unassignedPrimaryCount = 0;
private int inactivePrimaryCount = 0;
private int inactiveShardCount = 0;
Set<ShardId> relocatingReplicaSets = new HashSet<ShardId>();
private Set<ShardId> clearPostAllocationFlag;
private final Map<String, ObjectIntOpenHashMap<String>> nodesPerAttributeNames = new HashMap<String, ObjectIntOpenHashMap<String>>();
@ -62,6 +73,7 @@ public class RoutingNodes implements Iterable<RoutingNode> {
this.metaData = clusterState.metaData();
this.blocks = clusterState.blocks();
this.routingTable = clusterState.routingTable();
Map<String, List<MutableShardRouting>> nodesToShards = newHashMap();
// fill in the nodeToShards with the "live" nodes
for (ObjectCursor<DiscoveryNode> cursor : clusterState.nodes().dataNodes().values()) {
@ -69,29 +81,50 @@ public class RoutingNodes implements Iterable<RoutingNode> {
}
// fill in the inverse of node -> shards allocated
// also fill replicaSet information
for (IndexRoutingTable indexRoutingTable : routingTable.indicesRouting().values()) {
for (IndexShardRoutingTable indexShard : indexRoutingTable) {
for (ShardRouting shard : indexShard) {
// to get all the shards belonging to an index, including the replicas,
// we define a replica set and keep track of it. A replica set is identified
// by the ShardId, as this is common for primary and replicas.
// A replica Set might have one (and not more) replicas with the state of RELOCATING.
if (shard.assignedToNode()) {
List<MutableShardRouting> entries = nodesToShards.get(shard.currentNodeId());
if (entries == null) {
entries = newArrayList();
nodesToShards.put(shard.currentNodeId(), entries);
}
entries.add(new MutableShardRouting(shard));
MutableShardRouting sr = new MutableShardRouting(shard);
entries.add(sr);
addToReplicaSet(sr);
if (shard.relocating()) {
entries = nodesToShards.get(shard.relocatingNodeId());
relocatingReplicaSets.add(shard.shardId());
if (entries == null) {
entries = newArrayList();
nodesToShards.put(shard.relocatingNodeId(), entries);
}
// add the counterpart shard with relocatingNodeId reflecting the source from which
// it's relocating from.
entries.add(new MutableShardRouting(shard.index(), shard.id(), shard.relocatingNodeId(),
shard.currentNodeId(), shard.primary(), ShardRoutingState.INITIALIZING, shard.version()));
sr = new MutableShardRouting(shard.index(), shard.id(), shard.relocatingNodeId(),
shard.currentNodeId(), shard.primary(), ShardRoutingState.INITIALIZING, shard.version());
entries.add(sr);
addToReplicaSet(sr);
} else if (!shard.active()) { // shards that are initializing without being relocated
if (shard.primary()) {
inactivePrimaryCount++;
}
inactiveShardCount++;
}
} else {
unassigned.add(new MutableShardRouting(shard));
MutableShardRouting sr = new MutableShardRouting(shard);
addToReplicaSet(sr);
unassigned.add(sr);
if (shard.primary()) {
unassignedPrimaryCount++;
}
}
}
}
@ -204,18 +237,37 @@ public class RoutingNodes implements Iterable<RoutingNode> {
return nodesPerAttributesCounts;
}
public boolean hasUnassignedPrimaries() {
return unassignedPrimaryCount > 0;
}
public boolean hasUnassignedShards() {
return !unassigned.isEmpty();
}
public boolean hasInactivePrimaries() {
return inactivePrimaryCount > 0;
}
public boolean hasInactiveShards() {
return inactiveShardCount > 0;
}
public int getRelocatingShardCount() {
return relocatingReplicaSets.size();
}
public MutableShardRouting findPrimaryForReplica(ShardRouting shard) {
assert !shard.primary();
for (RoutingNode routingNode : nodesToShards.values()) {
List<MutableShardRouting> shards = routingNode.shards();
for (int i = 0; i < shards.size(); i++) {
MutableShardRouting shardRouting = shards.get(i);
if (shardRouting.shardId().equals(shard.shardId()) && shardRouting.primary()) {
return shardRouting;
}
MutableShardRouting primary = null;
for (MutableShardRouting shardRouting : shardsRoutingFor(shard)) {
if (shardRouting.primary()) {
primary = shardRouting;
break;
}
}
return null;
assert primary != null;
return primary;
}
public List<MutableShardRouting> shardsRoutingFor(ShardRouting shardRouting) {
@ -223,22 +275,10 @@ public class RoutingNodes implements Iterable<RoutingNode> {
}
public List<MutableShardRouting> shardsRoutingFor(String index, int shardId) {
List<MutableShardRouting> shards = newArrayList();
for (RoutingNode routingNode : this) {
List<MutableShardRouting> nShards = routingNode.shards();
for (int i = 0; i < nShards.size(); i++) {
MutableShardRouting shardRouting = nShards.get(i);
if (shardRouting.index().equals(index) && shardRouting.id() == shardId) {
shards.add(shardRouting);
}
}
}
for (int i = 0; i < unassigned.size(); i++) {
MutableShardRouting shardRouting = unassigned.get(i);
if (shardRouting.index().equals(index) && shardRouting.id() == shardId) {
shards.add(shardRouting);
}
}
ShardId sid = new ShardId(index, shardId);
List<MutableShardRouting> shards = replicaSetFor(sid);
assert shards != null;
// no need to check unassigned array, since the ShardRoutings are in the replica set.
return shards;
}
@ -291,4 +331,227 @@ public class RoutingNodes implements Iterable<RoutingNode> {
}
return sb.toString();
}
/**
* calculates RoutingNodes statistics by iterating over all {@link MutableShardRouting}s
* in the cluster to ensure the {@link RoutingManager} book-keeping is correct.
* For performance reasons, this should only be called from test cases.
*
* @return true if all counts are the same, false if either of the book-keeping numbers is off.
*/
public boolean assertShardStats() {
int unassignedPrimaryCount = 0;
int inactivePrimaryCount = 0;
int inactiveShardCount = 0;
int totalShards = 0;
Set<ShardId> seenShards = newHashSet();
for (RoutingNode node : this) {
for (MutableShardRouting shard : node) {
if (!shard.active()) {
if (!shard.relocating()) {
inactiveShardCount++;
if (shard.primary()){
inactivePrimaryCount++;
}
}
}
totalShards++;
seenShards.add(shard.shardId());
}
}
for (MutableShardRouting shard : unassigned) {
if (shard.primary()) {
unassignedPrimaryCount++;
}
totalShards++;
seenShards.add(shard.shardId());
}
for (ShardId shardId : seenShards) {
assert replicaSetFor(shardId) != null;
}
assert unassignedPrimaryCount == 0 || hasUnassignedPrimaries();
assert inactivePrimaryCount == 0 || hasInactivePrimaries();
assert inactiveShardCount == 0 || hasInactiveShards();
assert hasUnassignedPrimaries() || unassignedPrimaryCount == 0;
assert hasInactivePrimaries() || inactivePrimaryCount == 0;
assert hasInactiveShards() || inactiveShardCount == 0;
return true;
}
/**
* Assign a shard to a node. This will increment the inactiveShardCount counter
* and the inactivePrimaryCount counter if the shard is the primary.
* In case the shard is already assigned and started, it will be marked as
* relocating, which is accounted for, too, so the number of concurrent relocations
* can be retrieved easily.
* This method can be called several times for the same shard, only the first time
* will change the state.
*
* INITIALIZING => INITIALIZING
* UNASSIGNED => INITIALIZING
* STARTED => RELOCATING
* RELOCATING => RELOCATING
*
* @param shard the shard to be assigned
* @param nodeId the nodeId this shard should initialize on or relocate from
*/
public void assignShardToNode(MutableShardRouting shard, String nodeId) {
// state will not change if the shard is already initializing.
ShardRoutingState oldState = shard.state();
shard.assignToNode(nodeId);
node(nodeId).add(shard);
if (oldState == ShardRoutingState.UNASSIGNED) {
inactiveShardCount++;
if (shard.primary()) {
unassignedPrimaryCount--;
inactivePrimaryCount++;
}
}
if (shard.state() == ShardRoutingState.RELOCATING) {
// this a HashSet. double add no worry.
relocatingReplicaSets.add(shard.shardId());
}
// possibly double/triple adding it to a replica set doesn't matter
// but make sure we know about the shard.
addToReplicaSet(shard);
}
/**
* Relocate a shard to another node.
*
* STARTED => RELOCATING
*
* @param shard the shard to relocate
* @param nodeId the node to relocate to
*/
public void relocateShard(MutableShardRouting shard, String nodeId) {
relocatingReplicaSets.add(shard.shardId());
shard.relocate(nodeId);
}
/**
* Cancels the relocation of a shard.
*
* RELOCATING => STARTED
*
* @param shard the shard that was relocating previously and now should be started again.
*/
public void cancelRelocationForShard(MutableShardRouting shard) {
relocatingReplicaSets.remove(shard.shardId());
shard.cancelRelocation();
}
/**
* Unassigns shard from a node.
* Both relocating and started shards that are deallocated need a new
* primary elected.
*
* RELOCATING => null
* STARTED => null
* INITIALIZING => null
*
* @param shard the shard to be unassigned.
*/
public void deassignShard(MutableShardRouting shard) {
if (shard.state() == ShardRoutingState.RELOCATING) {
cancelRelocationForShard(shard);
}
if (shard.primary())
unassignedPrimaryCount++;
shard.deassignNode();
}
/**
* Mark a shard as started.
* Decreases the counters and marks a replication complete or failed,
* which is the same for accounting in this class.
*
* INITIALIZING => STARTED
* RELOCATIng => STARTED
*
* @param shard the shard to be marked as started
*/
public void markShardStarted(MutableShardRouting shard) {
if (!relocatingReplicaSets.contains(shard.shardId()) && shard.state() == ShardRoutingState.INITIALIZING) {
inactiveShardCount--;
if (shard.primary()) {
inactivePrimaryCount--;
}
}
if (shard.state() == ShardRoutingState.INITIALIZING
&& shard.relocatingNodeId() != null) {
relocatingReplicaSets.remove(shard.shardId());
}
shard.moveToStarted();
}
/**
* Return a list of shards belonging to a replica set
*
* @param shard the shard for which to retrieve the replica set
* @return an unmodifiable List of the replica set
*/
public List<MutableShardRouting> replicaSetFor(MutableShardRouting shard) {
return replicaSetFor(shard.shardId());
}
/**
* Return a list of shards belonging to a replica set
*
* @param shardId the {@link ShardId} for which to retrieve the replica set
* @return an unmodifiable List of the replica set
*/
public List<MutableShardRouting> replicaSetFor(ShardId shardId) {
List<MutableShardRouting> replicaSet = replicaSets.get(shardId);
assert replicaSet != null;
return Collections.unmodifiableList(replicaSet);
}
/**
* Let this class know about a shard, which it then sorts into
* its replica set. Package private as only {@link RoutingNodes}
* should notify this class of shards during initialization.
*
* @param shard the shard to be sorted into its replica set
*/
private void addToReplicaSet(MutableShardRouting shard) {
List<MutableShardRouting> replicaSet = replicaSets.get(shard.shardId());
if (replicaSet == null) {
replicaSet = new ArrayList<MutableShardRouting>();
replicaSets.put(shard.shardId(), replicaSet);
}
replicaSet.add(shard);
}
/**
* marks a replica set as relocating.
*
* @param shard a member of the relocating replica set
*/
private void markRelocating(MutableShardRouting shard) {
relocatingReplicaSets.add(shard.shardId());
}
/**
* swaps the status of a shard, making replicas primary and vice versa.
*
* @param shard the shard to have its primary status swapped.
*/
public void changePrimaryStatusForShard(MutableShardRouting... shards) {
for (MutableShardRouting shard : shards) {
if (shard.primary()) {
shard.moveFromPrimary();
} else {
shard.moveToPrimary();
}
}
}
}

View File

@ -211,7 +211,6 @@ public class AllocationService extends AbstractComponent {
// rebalance
changed |= shardsAllocators.rebalance(allocation);
return changed;
}
@ -222,9 +221,10 @@ public class AllocationService extends AbstractComponent {
List<MutableShardRouting> shards = new ArrayList<MutableShardRouting>();
int index = 0;
boolean found = true;
RoutingNodes routingNodes = allocation.routingNodes();
while (found) {
found = false;
for (RoutingNode routingNode : allocation.routingNodes()) {
for (RoutingNode routingNode : routingNodes) {
if (index >= routingNode.shards().size()) {
continue;
}
@ -239,7 +239,7 @@ public class AllocationService extends AbstractComponent {
if (!shardRouting.started()) {
continue;
}
RoutingNode routingNode = allocation.routingNodes().node(shardRouting.currentNodeId());
RoutingNode routingNode = routingNodes.node(shardRouting.currentNodeId());
Decision decision = allocation.deciders().canRemain(shardRouting, routingNode, allocation);
if (decision.type() == Decision.Type.NO) {
logger.debug("[{}][{}] allocated on [{}], but can no longer be allocated on it, moving...", shardRouting.index(), shardRouting.id(), routingNode.node());
@ -269,8 +269,7 @@ public class AllocationService extends AbstractComponent {
assert !shardEntry2.primary();
changed = true;
shardEntry.moveFromPrimary();
shardEntry2.moveToPrimary();
routingNodes.changePrimaryStatusForShard( shardEntry, shardEntry2 );
if (shardEntry2.relocatingNodeId() != null) {
// its also relocating, make sure to move the other routing to primary
@ -278,7 +277,7 @@ public class AllocationService extends AbstractComponent {
if (node != null) {
for (MutableShardRouting shardRouting : node) {
if (shardRouting.shardId().equals(shardEntry2.shardId()) && !shardRouting.primary()) {
shardRouting.moveToPrimary();
routingNodes.changePrimaryStatusForShard( shardRouting );
break;
}
}
@ -327,11 +326,12 @@ public class AllocationService extends AbstractComponent {
* new nodes);
*/
private void applyNewNodes(RoutingAllocation allocation) {
RoutingNodes routingNodes = allocation.routingNodes();
for (ObjectCursor<DiscoveryNode> cursor : allocation.nodes().dataNodes().values()) {
DiscoveryNode node = cursor.value;
if (!allocation.routingNodes().nodesToShards().containsKey(node.id())) {
if (!routingNodes.nodesToShards().containsKey(node.id())) {
RoutingNode routingNode = new RoutingNode(node.id(), node);
allocation.routingNodes().nodesToShards().put(node.id(), routingNode);
routingNodes.nodesToShards().put(node.id(), routingNode);
}
}
}
@ -362,7 +362,7 @@ public class AllocationService extends AbstractComponent {
for (ShardRouting startedShard : startedShardEntries) {
assert startedShard.state() == INITIALIZING;
// retrieve the relocating node id before calling moveToStarted().
// retrieve the relocating node id before calling startedShard().
String relocatingNodeId = null;
RoutingNode currentRoutingNode = routingNodes.nodesToShards().get(startedShard.currentNodeId());
@ -372,7 +372,7 @@ public class AllocationService extends AbstractComponent {
relocatingNodeId = shard.relocatingNodeId();
if (!shard.started()) {
dirty = true;
shard.moveToStarted();
routingNodes.markShardStarted( shard );
}
break;
}
@ -419,20 +419,23 @@ public class AllocationService extends AbstractComponent {
return false;
}
RoutingNodes routingNodes = allocation.routingNodes();
if (failedShard.relocatingNodeId() != null) {
// the shard is relocating, either in initializing (recovery from another node) or relocating (moving to another node)
if (failedShard.state() == INITIALIZING) {
// the shard is initializing and recovering from another node
boolean dirty = false;
// first, we need to cancel the current node that is being initialized
RoutingNode initializingNode = allocation.routingNodes().node(failedShard.currentNodeId());
RoutingNode initializingNode = routingNodes.node(failedShard.currentNodeId());
if (initializingNode != null) {
for (Iterator<MutableShardRouting> it = initializingNode.iterator(); it.hasNext(); ) {
MutableShardRouting shardRouting = it.next();
if (shardRouting.equals(failedShard)) {
dirty = true;
it.remove();
shardRouting.deassignNode();
routingNodes.deassignShard( shardRouting );
if (addToIgnoreList) {
// make sure we ignore this shard on the relevant node
@ -445,13 +448,13 @@ public class AllocationService extends AbstractComponent {
}
if (dirty) {
// now, find the node that we are relocating *from*, and cancel its relocation
RoutingNode relocatingFromNode = allocation.routingNodes().node(failedShard.relocatingNodeId());
RoutingNode relocatingFromNode = routingNodes.node(failedShard.relocatingNodeId());
if (relocatingFromNode != null) {
for (Iterator<MutableShardRouting> it = relocatingFromNode.iterator(); it.hasNext(); ) {
MutableShardRouting shardRouting = it.next();
if (shardRouting.shardId().equals(failedShard.shardId()) && shardRouting.state() == RELOCATING) {
dirty = true;
shardRouting.cancelRelocation();
routingNodes.cancelRelocationForShard( shardRouting );
break;
}
}
@ -464,20 +467,20 @@ public class AllocationService extends AbstractComponent {
// first, we need to cancel the current relocation from the current node
// now, find the node that we are recovering from, cancel the relocation, remove it from the node
// and add it to the unassigned shards list...
RoutingNode relocatingFromNode = allocation.routingNodes().node(failedShard.currentNodeId());
RoutingNode relocatingFromNode = routingNodes.node(failedShard.currentNodeId());
if (relocatingFromNode != null) {
for (Iterator<MutableShardRouting> it = relocatingFromNode.iterator(); it.hasNext(); ) {
MutableShardRouting shardRouting = it.next();
if (shardRouting.equals(failedShard)) {
dirty = true;
shardRouting.cancelRelocation();
routingNodes.cancelRelocationForShard( shardRouting );
it.remove();
if (addToIgnoreList) {
// make sure we ignore this shard on the relevant node
allocation.addIgnoreShardForNode(failedShard.shardId(), failedShard.currentNodeId());
}
allocation.routingNodes().unassigned().add(new MutableShardRouting(failedShard.index(), failedShard.id(),
routingNodes.unassigned().add(new MutableShardRouting(failedShard.index(), failedShard.id(),
null, failedShard.primary(), ShardRoutingState.UNASSIGNED, failedShard.version() + 1));
break;
}
@ -485,13 +488,13 @@ public class AllocationService extends AbstractComponent {
}
if (dirty) {
// next, we need to find the target initializing shard that is recovering from, and remove it...
RoutingNode initializingNode = allocation.routingNodes().node(failedShard.relocatingNodeId());
RoutingNode initializingNode = routingNodes.node(failedShard.relocatingNodeId());
if (initializingNode != null) {
for (Iterator<MutableShardRouting> it = initializingNode.iterator(); it.hasNext(); ) {
MutableShardRouting shardRouting = it.next();
if (shardRouting.shardId().equals(failedShard.shardId()) && shardRouting.state() == INITIALIZING) {
dirty = true;
shardRouting.deassignNode();
routingNodes.deassignShard( shardRouting );
it.remove();
}
}
@ -504,7 +507,7 @@ public class AllocationService extends AbstractComponent {
} else {
// the shard is not relocating, its either started, or initializing, just cancel it and move on...
boolean dirty = false;
RoutingNode node = allocation.routingNodes().node(failedShard.currentNodeId());
RoutingNode node = routingNodes.node(failedShard.currentNodeId());
if (node != null) {
for (Iterator<MutableShardRouting> it = node.iterator(); it.hasNext(); ) {
MutableShardRouting shardRouting = it.next();
@ -522,7 +525,7 @@ public class AllocationService extends AbstractComponent {
// that can keep other shards from being allocated (because of limits applied on how many
// shards we can start per node)
List<MutableShardRouting> shardsToMove = Lists.newArrayList();
for (Iterator<MutableShardRouting> unassignedIt = allocation.routingNodes().unassigned().iterator(); unassignedIt.hasNext(); ) {
for (Iterator<MutableShardRouting> unassignedIt = routingNodes.unassigned().iterator(); unassignedIt.hasNext(); ) {
MutableShardRouting unassignedShardRouting = unassignedIt.next();
if (unassignedShardRouting.shardId().equals(failedShard.shardId())) {
unassignedIt.remove();
@ -530,10 +533,10 @@ public class AllocationService extends AbstractComponent {
}
}
if (!shardsToMove.isEmpty()) {
allocation.routingNodes().unassigned().addAll(shardsToMove);
routingNodes.unassigned().addAll(shardsToMove);
}
allocation.routingNodes().unassigned().add(new MutableShardRouting(failedShard.index(), failedShard.id(), null,
routingNodes.unassigned().add(new MutableShardRouting(failedShard.index(), failedShard.id(), null,
null, failedShard.restoreSource(), failedShard.primary(), ShardRoutingState.UNASSIGNED, failedShard.version() + 1));
break;

View File

@ -512,7 +512,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
*
* @return <code>true</code> iff the shard has successfully been moved.
*/
public boolean move(MutableShardRouting shard, RoutingNode node) {
public boolean move(MutableShardRouting shard, RoutingNode node ) {
if (nodes.isEmpty() || !shard.started()) {
/* with no nodes or a not started shard this is pointless */
return false;
@ -535,7 +535,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
* This is not guaranteed to be balanced after this operation we still try best effort to
* allocate on the minimal eligible node.
*/
for (ModelNode currentNode : nodes) {
if (currentNode.getNodeId().equals(node.nodeId())) {
continue;
@ -547,8 +547,8 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
final MutableShardRouting initializingShard = new MutableShardRouting(shard.index(), shard.id(), currentNode.getNodeId(),
shard.currentNodeId(), shard.restoreSource(), shard.primary(), INITIALIZING, shard.version() + 1);
currentNode.addShard(initializingShard, decision);
target.add(initializingShard);
shard.relocate(target.nodeId()); // set the node to relocate after we added the initializing shard
allocation.routingNodes().assignShardToNode( initializingShard, target.nodeId() );
allocation.routingNodes().relocateShard( shard, target.nodeId() ); // set the node to relocate after we added the initializing shard
if (logger.isTraceEnabled()) {
logger.trace("Moved shard [{}] to node [{}]", shard, currentNode.getNodeId());
}
@ -704,7 +704,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
if (logger.isTraceEnabled()) {
logger.trace("Assigned shard [{}] to [{}]", shard, minNode.getNodeId());
}
routingNodes.node(minNode.getNodeId()).add(shard);
routingNodes.assignShardToNode( shard, routingNodes.node(minNode.getNodeId()).nodeId() );
changed = true;
continue; // don't add to ignoreUnassigned
}
@ -782,13 +782,13 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
/* now allocate on the cluster - if we are started we need to relocate the shard */
if (candidate.started()) {
RoutingNode lowRoutingNode = allocation.routingNodes().node(minNode.getNodeId());
lowRoutingNode.add(new MutableShardRouting(candidate.index(), candidate.id(), lowRoutingNode.nodeId(), candidate
.currentNodeId(), candidate.restoreSource(), candidate.primary(), INITIALIZING, candidate.version() + 1));
candidate.relocate(lowRoutingNode.nodeId());
allocation.routingNodes().assignShardToNode(new MutableShardRouting(candidate.index(), candidate.id(), lowRoutingNode.nodeId(), candidate
.currentNodeId(), candidate.restoreSource(), candidate.primary(), INITIALIZING, candidate.version() + 1), lowRoutingNode.nodeId() );
allocation.routingNodes().relocateShard( candidate, lowRoutingNode.nodeId());
} else {
assert candidate.unassigned();
allocation.routingNodes().node(minNode.getNodeId()).add(candidate);
allocation.routingNodes().assignShardToNode( candidate, allocation.routingNodes().node(minNode.getNodeId()).nodeId() );
}
return true;

View File

@ -108,7 +108,7 @@ public class EvenShardsCountAllocator extends AbstractComponent implements Shard
}
changed = true;
node.add(shard);
allocation.routingNodes().assignShardToNode( shard, node.nodeId() );
unassignedIterator.remove();
break;
}
@ -123,7 +123,7 @@ public class EvenShardsCountAllocator extends AbstractComponent implements Shard
Decision decision = allocation.deciders().canAllocate(shard, routingNode, allocation);
if (decision.type() == Decision.Type.YES) {
changed = true;
routingNode.add(shard);
allocation.routingNodes().assignShardToNode( shard, routingNode.nodeId() );
it.remove();
break;
}
@ -173,11 +173,11 @@ public class EvenShardsCountAllocator extends AbstractComponent implements Shard
Decision allocateDecision = allocation.deciders().canAllocate(startedShard, lowRoutingNode, allocation);
if (allocateDecision.type() == Decision.Type.YES) {
changed = true;
lowRoutingNode.add(new MutableShardRouting(startedShard.index(), startedShard.id(),
allocation.routingNodes().assignShardToNode(new MutableShardRouting(startedShard.index(), startedShard.id(),
lowRoutingNode.nodeId(), startedShard.currentNodeId(), startedShard.restoreSource(),
startedShard.primary(), INITIALIZING, startedShard.version() + 1));
startedShard.primary(), INITIALIZING, startedShard.version() + 1), lowRoutingNode.nodeId() );
startedShard.relocate(lowRoutingNode.nodeId());
allocation.routingNodes().relocateShard( startedShard, lowRoutingNode.nodeId() );
relocated = true;
relocationPerformed = true;
break;
@ -210,11 +210,11 @@ public class EvenShardsCountAllocator extends AbstractComponent implements Shard
}
Decision decision = allocation.deciders().canAllocate(shardRouting, nodeToCheck, allocation);
if (decision.type() == Decision.Type.YES) {
nodeToCheck.add(new MutableShardRouting(shardRouting.index(), shardRouting.id(),
allocation.routingNodes().assignShardToNode(new MutableShardRouting(shardRouting.index(), shardRouting.id(),
nodeToCheck.nodeId(), shardRouting.currentNodeId(), shardRouting.restoreSource(),
shardRouting.primary(), INITIALIZING, shardRouting.version() + 1));
shardRouting.primary(), INITIALIZING, shardRouting.version() + 1), nodeToCheck.nodeId() );
shardRouting.relocate(nodeToCheck.nodeId());
allocation.routingNodes().relocateShard( shardRouting, nodeToCheck.nodeId() );
changed = true;
break;
}

View File

@ -193,7 +193,7 @@ public class AllocateAllocationCommand implements AllocationCommand {
continue;
}
it.remove();
routingNode.add(shardRouting);
allocation.routingNodes().assignShardToNode( shardRouting, routingNode.nodeId() );
if (shardRouting.primary()) {
// we need to clear the post allocation flag, since its an explicit allocation of the primary shard
// and we want to force allocate it (and create a new index for it)

View File

@ -173,13 +173,13 @@ public class CancelAllocationCommand implements AllocationCommand {
if (shardRouting.initializing()) {
// the shard is initializing and recovering from another node, simply cancel the recovery
it.remove();
shardRouting.deassignNode();
allocation.routingNodes().deassignShard( shardRouting );
// and cancel the relocating state from the shard its being relocated from
RoutingNode relocatingFromNode = allocation.routingNodes().node(shardRouting.relocatingNodeId());
if (relocatingFromNode != null) {
for (MutableShardRouting fromShardRouting : relocatingFromNode) {
if (fromShardRouting.shardId().equals(shardRouting.shardId()) && shardRouting.state() == RELOCATING) {
fromShardRouting.cancelRelocation();
allocation.routingNodes().cancelRelocationForShard( fromShardRouting );
break;
}
}
@ -200,7 +200,7 @@ public class CancelAllocationCommand implements AllocationCommand {
for (Iterator<MutableShardRouting> itX = initializingNode.iterator(); itX.hasNext(); ) {
MutableShardRouting initializingShardRouting = itX.next();
if (initializingShardRouting.shardId().equals(shardRouting.shardId()) && initializingShardRouting.state() == INITIALIZING) {
shardRouting.deassignNode();
allocation.routingNodes().deassignShard( shardRouting );
itX.remove();
}
}

View File

@ -167,11 +167,11 @@ public class MoveAllocationCommand implements AllocationCommand {
// its being throttled, maybe have a flag to take it into account and fail? for now, just do it since the "user" wants it...
}
toRoutingNode.add(new MutableShardRouting(shardRouting.index(), shardRouting.id(),
allocation.routingNodes().assignShardToNode(new MutableShardRouting(shardRouting.index(), shardRouting.id(),
toRoutingNode.nodeId(), shardRouting.currentNodeId(), shardRouting.restoreSource(),
shardRouting.primary(), ShardRoutingState.INITIALIZING, shardRouting.version() + 1));
shardRouting.primary(), ShardRoutingState.INITIALIZING, shardRouting.version() + 1), toRoutingNode.nodeId() );
shardRouting.relocate(toRoutingNode.nodeId());
allocation.routingNodes().relocateShard( shardRouting, toRoutingNode.nodeId() );
}
if (!found) {

View File

@ -21,6 +21,7 @@ package org.elasticsearch.cluster.routing.allocation.decider;
import org.elasticsearch.cluster.routing.MutableShardRouting;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.common.inject.Inject;
@ -88,34 +89,26 @@ public class ClusterRebalanceAllocationDecider extends AllocationDecider {
@Override
public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation allocation) {
if (type == ClusterRebalanceType.INDICES_PRIMARIES_ACTIVE) {
for (MutableShardRouting shard : allocation.routingNodes().unassigned()) {
if (shard.primary()) {
return Decision.NO;
}
// check if there are unassigned primaries.
if ( allocation.routingNodes().hasUnassignedPrimaries() ) {
return Decision.NO;
}
for (RoutingNode node : allocation.routingNodes()) {
List<MutableShardRouting> shards = node.shards();
for (int i = 0; i < shards.size(); i++) {
MutableShardRouting shard = shards.get(i);
if (shard.primary() && !shard.active() && shard.relocatingNodeId() == null) {
return Decision.NO;
}
}
// check if there are initializing primaries that don't have a relocatingNodeId entry.
if ( allocation.routingNodes().hasInactivePrimaries() ) {
return Decision.NO;
}
return Decision.YES;
}
if (type == ClusterRebalanceType.INDICES_ALL_ACTIVE) {
if (!allocation.routingNodes().unassigned().isEmpty()) {
// check if there are unassigned shards.
if ( allocation.routingNodes().hasUnassignedShards() ) {
return Decision.NO;
}
for (RoutingNode node : allocation.routingNodes()) {
List<MutableShardRouting> shards = node.shards();
for (int i = 0; i < shards.size(); i++) {
MutableShardRouting shard = shards.get(i);
if (!shard.active() && shard.relocatingNodeId() == null) {
return Decision.NO;
}
}
// in case all indices are assigned, are there initializing shards which
// are not relocating?
if ( allocation.routingNodes().hasInactiveShards() ) {
return Decision.NO;
}
}
// type == Type.ALWAYS

View File

@ -21,6 +21,7 @@ package org.elasticsearch.cluster.routing.allocation.decider;
import org.elasticsearch.cluster.routing.MutableShardRouting;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
@ -72,18 +73,10 @@ public class ConcurrentRebalanceAllocationDecider extends AllocationDecider {
if (clusterConcurrentRebalance == -1) {
return Decision.YES;
}
int rebalance = 0;
for (RoutingNode node : allocation.routingNodes()) {
List<MutableShardRouting> shards = node.shards();
for (int i = 0; i < shards.size(); i++) {
if (shards.get(i).state() == ShardRoutingState.RELOCATING) {
rebalance++;
}
}
}
int rebalance = allocation.routingNodes().getRelocatingShardCount();
if (rebalance >= clusterConcurrentRebalance) {
return Decision.NO;
}
return Decision.YES;
}
}
}

View File

@ -252,7 +252,7 @@ public class BlobReuseExistingGatewayAllocator extends AbstractComponent impleme
}
// we found a match
changed = true;
lastNodeMatched.add(shard);
allocation.routingNodes().assignShardToNode( shard, lastNodeMatched.nodeId() );
unassignedIterator.remove();
}
}

View File

@ -214,7 +214,7 @@ public class LocalGatewayAllocator extends AbstractComponent implements GatewayA
// we found a match
changed = true;
// make sure we create one with the version from the recovered state
node.add(new MutableShardRouting(shard, highestVersion));
allocation.routingNodes().assignShardToNode(new MutableShardRouting(shard, highestVersion), node.nodeId());
unassignedIterator.remove();
// found a node, so no throttling, no "no", and break out of the loop
@ -234,7 +234,7 @@ public class LocalGatewayAllocator extends AbstractComponent implements GatewayA
// we found a match
changed = true;
// make sure we create one with the version from the recovered state
node.add(new MutableShardRouting(shard, highestVersion));
allocation.routingNodes().assignShardToNode(new MutableShardRouting(shard, highestVersion), node.nodeId());
unassignedIterator.remove();
}
} else {
@ -351,7 +351,7 @@ public class LocalGatewayAllocator extends AbstractComponent implements GatewayA
}
// we found a match
changed = true;
lastNodeMatched.add(shard);
allocation.routingNodes().assignShardToNode( shard, lastNodeMatched.nodeId() );
unassignedIterator.remove();
}
}

View File

@ -0,0 +1,174 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.benchmark.cluster;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.elasticsearch.cluster.routing.allocation.RoutingAllocationTests.newNode;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
public class MassiveClusterRebalanceBenchmark {
private static final ESLogger logger = Loggers.getLogger(MassiveClusterRebalanceBenchmark.class);
public static void main(String[] args) {
int numIndices = 5 * 365; // five years
int numShards = 6;
int numReplicas = 2;
AllocationService strategy = new AllocationService(settingsBuilder()
.put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceAllocationDecider.ClusterRebalanceType.ALWAYS.toString())
.put("cluster.routing.allocation.node_initial_primaries_recoveries", 16384)
.put("cluster.routing.allocation.node_concurrent_recoveries", 16384)
.build());
long start = System.currentTimeMillis();
logger.info("Start massive cluster test.");
MetaData.Builder mb = MetaData.builder();
for (int i = 1; i <= numIndices; i++)
mb.put(IndexMetaData.builder("test_" + i).numberOfShards(numShards).numberOfReplicas(numReplicas));
MetaData metaData = mb.build();
logger.info("Buidling MetaData took " + (System.currentTimeMillis() - start) + "ms.");
start = System.currentTimeMillis();
RoutingTable.Builder rb = RoutingTable.builder();
for (int i = 1; i <= numIndices; i++)
rb.addAsNew(metaData.index("test_" + i));
RoutingTable routingTable = rb.build();
logger.info("Buidling RoutingTable took " + (System.currentTimeMillis() - start) + "ms.");
start = System.currentTimeMillis();
ClusterState clusterState = ClusterState.builder().metaData(metaData).routingTable(routingTable).build();
logger.info("Buidling ClusterState took " + (System.currentTimeMillis() - start) + "ms.");
start = System.currentTimeMillis();
logger.info("start two nodes");
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1")).put(newNode("node2"))).build();
logger.info("Buidling ClusterState took " + (System.currentTimeMillis() - start) + "ms.");
start = System.currentTimeMillis();
RoutingTable prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState).routingTable();
logger.info("Buidling new RoutingTable took " + (System.currentTimeMillis() - start) + "ms.");
start = System.currentTimeMillis();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
logger.info("Buidling new ClusterState took " + (System.currentTimeMillis() - start) + "ms.");
start = System.currentTimeMillis();
logger.info("start all the primary shards for test1, replicas will start initializing");
RoutingNodes routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState("test_1", INITIALIZING)).routingTable();
logger.info("Buidling new RoutingTable took " + (System.currentTimeMillis() - start) + "ms.");
start = System.currentTimeMillis();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
logger.info("Buidling new ClusterState took " + (System.currentTimeMillis() - start) + "ms.");
start = System.currentTimeMillis();
routingNodes = clusterState.routingNodes();
logger.info("start the test1 replica shards");
routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState("test_1", INITIALIZING)).routingTable();
logger.info("Buidling new RoutingTable took " + (System.currentTimeMillis() - start) + "ms.");
start = System.currentTimeMillis();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
logger.info("Buidling new ClusterState took " + (System.currentTimeMillis() - start) + "ms.");
start = System.currentTimeMillis();
routingNodes = clusterState.routingNodes();
logger.info("now, start 1 more node, check that rebalancing will happen (for test1) because we set it to always");
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes())
.put(newNode("node3")))
.build();
prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState).routingTable();
logger.info("Buidling new RoutingTable took " + (System.currentTimeMillis() - start) + "ms.");
start = System.currentTimeMillis();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
logger.info("Buidling new ClusterState took " + (System.currentTimeMillis() - start) + "ms.");
start = System.currentTimeMillis();
routingNodes = clusterState.routingNodes();
logger.info("now, start 33 more node, check that rebalancing will happen (for test1) because we set it to always");
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes())
.put(newNode("node4"))
.put(newNode("node5"))
.put(newNode("node6"))
.put(newNode("node7"))
.put(newNode("node8"))
.put(newNode("node9"))
.put(newNode("node10"))
.put(newNode("node11"))
.put(newNode("node12"))
.put(newNode("node13"))
.put(newNode("node15"))
.put(newNode("node16"))
.put(newNode("node17"))
.put(newNode("node18"))
.put(newNode("node19"))
.put(newNode("node20"))
.put(newNode("node21"))
.put(newNode("node22"))
.put(newNode("node23"))
.put(newNode("node24"))
.put(newNode("node25"))
.put(newNode("node26"))
.put(newNode("node27"))
.put(newNode("node28"))
.put(newNode("node29"))
.put(newNode("node30"))
.put(newNode("node31"))
.put(newNode("node32"))
.put(newNode("node33"))
.put(newNode("node34"))
.put(newNode("node35"))
.put(newNode("node36")))
.build();
prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState).routingTable();
logger.info("Buidling new RoutingTable took " + (System.currentTimeMillis() - start) + "ms.");
start = System.currentTimeMillis();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
logger.info("Buidling new ClusterState took " + (System.currentTimeMillis() - start) + "ms.");
start = System.currentTimeMillis();
routingNodes = clusterState.routingNodes();
}
}

View File

@ -409,37 +409,37 @@ public class BalanceConfigurationTests extends ElasticsearchTestCase {
switch (sr.id()) {
case 0:
if (sr.primary()) {
allocation.routingNodes().node("node1").add(sr);
allocation.routingNodes().assignShardToNode( sr, "node1" );
} else {
allocation.routingNodes().node("node0").add(sr);
allocation.routingNodes().assignShardToNode( sr, "node0" );
}
break;
case 1:
if (sr.primary()) {
allocation.routingNodes().node("node1").add(sr);
allocation.routingNodes().assignShardToNode( sr, "node1" );
} else {
allocation.routingNodes().node("node2").add(sr);
allocation.routingNodes().assignShardToNode( sr, "node2" );
}
break;
case 2:
if (sr.primary()) {
allocation.routingNodes().node("node3").add(sr);
allocation.routingNodes().assignShardToNode( sr, "node3" );
} else {
allocation.routingNodes().node("node2").add(sr);
allocation.routingNodes().assignShardToNode( sr, "node2" );
}
break;
case 3:
if (sr.primary()) {
allocation.routingNodes().node("node3").add(sr);
allocation.routingNodes().assignShardToNode( sr, "node3" );
} else {
allocation.routingNodes().node("node1").add(sr);
allocation.routingNodes().assignShardToNode( sr, "node1" );
}
break;
case 4:
if (sr.primary()) {
allocation.routingNodes().node("node2").add(sr);
allocation.routingNodes().assignShardToNode( sr, "node2" );
} else {
allocation.routingNodes().node("node0").add(sr);
allocation.routingNodes().assignShardToNode( sr, "node0" );
}
break;
}
@ -511,4 +511,4 @@ public class BalanceConfigurationTests extends ElasticsearchTestCase {
}
}
}

View File

@ -0,0 +1,416 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.routing.allocation;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.junit.Test;
import static org.elasticsearch.cluster.routing.ShardRoutingState.*;
import static org.elasticsearch.cluster.routing.allocation.RoutingAllocationTests.newNode;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
/**
*
*/
public class RoutingNodesIntegrityTests extends ElasticsearchTestCase {
private final ESLogger logger = Loggers.getLogger(IndexBalanceTests.class);
@Test
public void testBalanceAllNodesStarted() {
AllocationService strategy = new AllocationService(settingsBuilder()
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.put("cluster.routing.allocation.node_initial_primaries_recoveries", 10)
.put("cluster.routing.allocation.allow_rebalance", "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1).build());
logger.info("Building initial routing table");
MetaData metaData = MetaData.builder().put(IndexMetaData.builder("test").numberOfShards(3).numberOfReplicas(1))
.put(IndexMetaData.builder("test1").numberOfShards(3).numberOfReplicas(1)).build();
RoutingTable routingTable = RoutingTable.builder().addAsNew(metaData.index("test")).addAsNew(metaData.index("test1")).build();
ClusterState clusterState = ClusterState.builder().metaData(metaData).routingTable(routingTable).build();
RoutingNodes routingNodes = clusterState.routingNodes();
logger.info("Adding three node and performing rerouting");
clusterState = ClusterState.builder(clusterState)
.nodes(DiscoveryNodes.builder().put(newNode("node1")).put(newNode("node2")).put(newNode("node3"))).build();
routingNodes = clusterState.routingNodes();
assertThat(assertShardStats(routingNodes), equalTo(true));
// all shards are unassigned. so no inactive shards or primaries.
assertThat(routingNodes.hasInactiveShards(), equalTo(false));
assertThat(routingNodes.hasInactivePrimaries(), equalTo(false));
assertThat(routingNodes.hasUnassignedPrimaries(), equalTo(true));
RoutingTable prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
assertThat(assertShardStats(routingNodes), equalTo(true));
assertThat(routingNodes.hasInactiveShards(), equalTo(true));
assertThat(routingNodes.hasInactivePrimaries(), equalTo(true));
assertThat(routingNodes.hasUnassignedPrimaries(), equalTo(false));
logger.info("Another round of rebalancing");
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes())).build();
prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
logger.info("Reroute, nothing should change");
prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState).routingTable();
logger.info("Start the more shards");
routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
assertThat(assertShardStats(routingNodes), equalTo(true));
assertThat(routingNodes.hasInactiveShards(), equalTo(false));
assertThat(routingNodes.hasInactivePrimaries(), equalTo(false));
assertThat(routingNodes.hasUnassignedPrimaries(), equalTo(false));
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
}
@Test
public void testBalanceIncrementallyStartNodes() {
AllocationService strategy = new AllocationService(settingsBuilder()
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.put("cluster.routing.allocation.node_initial_primaries_recoveries", 10)
.put("cluster.routing.allocation.allow_rebalance", "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1).build());
logger.info("Building initial routing table");
MetaData metaData = MetaData.builder().put(IndexMetaData.builder("test").numberOfShards(3).numberOfReplicas(1))
.put(IndexMetaData.builder("test1").numberOfShards(3).numberOfReplicas(1)).build();
RoutingTable routingTable = RoutingTable.builder().addAsNew(metaData.index("test")).addAsNew(metaData.index("test1")).build();
ClusterState clusterState = ClusterState.builder().metaData(metaData).routingTable(routingTable).build();
logger.info("Adding one node and performing rerouting");
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1"))).build();
RoutingTable prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
logger.info("Add another node and perform rerouting, nothing will happen since primary not started");
clusterState = ClusterState.builder(clusterState)
.nodes(DiscoveryNodes.builder(clusterState.nodes()).put(newNode("node2"))).build();
prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
logger.info("Start the primary shard");
RoutingNodes routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
logger.info("Reroute, nothing should change");
prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState).routingTable();
logger.info("Start the backup shard");
routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
logger.info("Add another node and perform rerouting, nothing will happen since primary not started");
clusterState = ClusterState.builder(clusterState)
.nodes(DiscoveryNodes.builder(clusterState.nodes()).put(newNode("node3"))).build();
prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
logger.info("Reroute, nothing should change");
prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState).routingTable();
logger.info("Start the backup shard");
routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
assertThat(prevRoutingTable != routingTable, equalTo(true));
assertThat(routingTable.index("test").shards().size(), equalTo(3));
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
assertThat(prevRoutingTable != routingTable, equalTo(true));
assertThat(routingTable.index("test1").shards().size(), equalTo(3));
assertThat(routingNodes.node("node1").numberOfShardsWithState(STARTED), equalTo(4));
assertThat(routingNodes.node("node2").numberOfShardsWithState(STARTED), equalTo(4));
assertThat(routingNodes.node("node3").numberOfShardsWithState(STARTED), equalTo(4));
assertThat(routingNodes.node("node1").shardsWithState("test", STARTED).size(), equalTo(2));
assertThat(routingNodes.node("node2").shardsWithState("test", STARTED).size(), equalTo(2));
assertThat(routingNodes.node("node3").shardsWithState("test", STARTED).size(), equalTo(2));
assertThat(routingNodes.node("node1").shardsWithState("test1", STARTED).size(), equalTo(2));
assertThat(routingNodes.node("node2").shardsWithState("test1", STARTED).size(), equalTo(2));
assertThat(routingNodes.node("node3").shardsWithState("test1", STARTED).size(), equalTo(2));
}
@Test
public void testBalanceAllNodesStartedAddIndex() {
AllocationService strategy = new AllocationService(settingsBuilder()
.put("cluster.routing.allocation.node_concurrent_recoveries", 1)
.put("cluster.routing.allocation.node_initial_primaries_recoveries", 3)
.put("cluster.routing.allocation.allow_rebalance", "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1).build());
logger.info("Building initial routing table");
MetaData metaData = MetaData.builder().put(IndexMetaData.builder("test").numberOfShards(3).numberOfReplicas(1)).build();
RoutingTable routingTable = RoutingTable.builder().addAsNew(metaData.index("test")).build();
ClusterState clusterState = ClusterState.builder().metaData(metaData).routingTable(routingTable).build();
logger.info("Adding three node and performing rerouting");
clusterState = ClusterState.builder(clusterState)
.nodes(DiscoveryNodes.builder().put(newNode("node1")).put(newNode("node2")).put(newNode("node3"))).build();
RoutingNodes routingNodes = clusterState.routingNodes();
assertThat(assertShardStats(routingNodes), equalTo(true));
assertThat(routingNodes.hasInactiveShards(), equalTo(false));
assertThat(routingNodes.hasInactivePrimaries(), equalTo(false));
assertThat(routingNodes.hasUnassignedPrimaries(), equalTo(true));
RoutingTable prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
assertThat(assertShardStats(routingNodes), equalTo(true));
assertThat(routingNodes.hasInactiveShards(), equalTo(true));
assertThat(routingNodes.hasInactivePrimaries(), equalTo(true));
assertThat(routingNodes.hasUnassignedPrimaries(), equalTo(false));
logger.info("Another round of rebalancing");
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes())).build();
prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
assertThat(prevRoutingTable == routingTable, equalTo(true));
routingNodes = clusterState.routingNodes();
assertThat(routingNodes.node("node1").numberOfShardsWithState(INITIALIZING), equalTo(1));
assertThat(routingNodes.node("node2").numberOfShardsWithState(INITIALIZING), equalTo(1));
assertThat(routingNodes.node("node3").numberOfShardsWithState(INITIALIZING), equalTo(1));
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
assertThat(assertShardStats(routingNodes), equalTo(true));
assertThat(routingNodes.hasInactiveShards(), equalTo(true));
assertThat(routingNodes.hasInactivePrimaries(), equalTo(false));
assertThat(routingNodes.hasUnassignedPrimaries(), equalTo(false));
assertThat(routingNodes.node("node1").numberOfShardsWithState(STARTED), equalTo(1));
assertThat(routingNodes.node("node2").numberOfShardsWithState(STARTED), equalTo(1));
assertThat(routingNodes.node("node3").numberOfShardsWithState(STARTED), equalTo(1));
logger.info("Reroute, nothing should change");
prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState).routingTable();
assertThat(prevRoutingTable == routingTable, equalTo(true));
logger.info("Start the more shards");
routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
assertThat(assertShardStats(routingNodes), equalTo(true));
assertThat(routingNodes.hasInactiveShards(), equalTo(false));
assertThat(routingNodes.hasInactivePrimaries(), equalTo(false));
assertThat(routingNodes.hasUnassignedPrimaries(), equalTo(false));
assertThat(routingNodes.node("node1").numberOfShardsWithState(STARTED), equalTo(2));
assertThat(routingNodes.node("node2").numberOfShardsWithState(STARTED), equalTo(2));
assertThat(routingNodes.node("node3").numberOfShardsWithState(STARTED), equalTo(2));
assertThat(routingNodes.node("node1").shardsWithState("test", STARTED).size(), equalTo(2));
assertThat(routingNodes.node("node2").shardsWithState("test", STARTED).size(), equalTo(2));
assertThat(routingNodes.node("node3").shardsWithState("test", STARTED).size(), equalTo(2));
logger.info("Add new index 3 shards 1 replica");
prevRoutingTable = routingTable;
metaData = MetaData.builder(metaData)
.put(IndexMetaData.builder("test1").settings(ImmutableSettings.settingsBuilder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 3)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
))
.build();
routingTable = RoutingTable.builder(routingTable)
.addAsNew(metaData.index("test1"))
.build();
clusterState = ClusterState.builder(clusterState).metaData(metaData).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
assertThat(assertShardStats(routingNodes), equalTo(true));
assertThat(routingNodes.hasInactiveShards(), equalTo(false));
assertThat(routingNodes.hasInactivePrimaries(), equalTo(false));
assertThat(routingNodes.hasUnassignedPrimaries(), equalTo(true));
assertThat(routingTable.index("test1").shards().size(), equalTo(3));
prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
logger.info("Reroute, assign");
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes())).build();
prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
assertThat(assertShardStats(routingNodes), equalTo(true));
assertThat(routingNodes.hasInactiveShards(), equalTo(true));
assertThat(routingNodes.hasInactivePrimaries(), equalTo(true));
assertThat(routingNodes.hasUnassignedPrimaries(), equalTo(false));
assertThat(prevRoutingTable == routingTable, equalTo(true));
logger.info("Reroute, start the primaries");
routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
assertThat(assertShardStats(routingNodes), equalTo(true));
assertThat(routingNodes.hasInactiveShards(), equalTo(true));
assertThat(routingNodes.hasInactivePrimaries(), equalTo(false));
assertThat(routingNodes.hasUnassignedPrimaries(), equalTo(false));
logger.info("Reroute, start the replicas");
routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
assertThat(assertShardStats(routingNodes), equalTo(true));
assertThat(routingNodes.hasInactiveShards(), equalTo(false));
assertThat(routingNodes.hasInactivePrimaries(), equalTo(false));
assertThat(routingNodes.hasUnassignedPrimaries(), equalTo(false));
assertThat(routingNodes.node("node1").numberOfShardsWithState(STARTED), equalTo(4));
assertThat(routingNodes.node("node2").numberOfShardsWithState(STARTED), equalTo(4));
assertThat(routingNodes.node("node3").numberOfShardsWithState(STARTED), equalTo(4));
assertThat(routingNodes.node("node1").shardsWithState("test1", STARTED).size(), equalTo(2));
assertThat(routingNodes.node("node2").shardsWithState("test1", STARTED).size(), equalTo(2));
assertThat(routingNodes.node("node3").shardsWithState("test1", STARTED).size(), equalTo(2));
logger.info("kill one node");
IndexShardRoutingTable indexShardRoutingTable = routingTable.index("test").shard(0);
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).remove(indexShardRoutingTable.primaryShard().currentNodeId())).build();
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
assertThat(assertShardStats(routingNodes), equalTo(true));
assertThat(routingNodes.hasInactiveShards(), equalTo(true));
// replica got promoted to primary
assertThat(routingNodes.hasInactivePrimaries(), equalTo(false));
assertThat(routingNodes.hasUnassignedPrimaries(), equalTo(false));
logger.info("Start Recovering shards round 1");
routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
assertThat(assertShardStats(routingNodes), equalTo(true));
assertThat(routingNodes.hasInactiveShards(), equalTo(true));
assertThat(routingNodes.hasInactivePrimaries(), equalTo(false));
assertThat(routingNodes.hasUnassignedPrimaries(), equalTo(false));
logger.info("Start Recovering shards round 2");
routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
assertThat(assertShardStats(routingNodes), equalTo(true));
assertThat(routingNodes.hasInactiveShards(), equalTo(false));
assertThat(routingNodes.hasInactivePrimaries(), equalTo(false));
assertThat(routingNodes.hasUnassignedPrimaries(), equalTo(false));
}
private boolean assertShardStats(RoutingNodes routingNodes) {
return routingNodes.assertShardStats();
}
}