internal custom allocation commands

add support for internal custom allocation commands, including allocation, move, and cancel (shard).
also, fix #2242, which causes the cluster state to be in inconsistent state when a shard being the source of relocation is failed
This commit is contained in:
Shay Banon 2012-09-12 15:12:51 +02:00
parent b6a9bd9a31
commit e530f03b94
19 changed files with 1238 additions and 195 deletions

View File

@ -168,7 +168,7 @@ public class TransportNodesShutdownAction extends TransportMasterNodeOperationAc
});
t.start();
} else {
final String[] nodesIds = state.nodes().resolveNodes(request.nodesIds);
final String[] nodesIds = state.nodes().resolveNodesIds(request.nodesIds);
logger.info("[partial_cluster_shutdown]: requested, shutting down [{}] in [{}]", nodesIds, request.delay);
for (String nodeId : nodesIds) {

View File

@ -119,7 +119,7 @@ public abstract class TransportNodesOperationAction<Request extends NodesOperati
this.request = request;
this.listener = listener;
clusterState = clusterService.state();
String[] nodesIds = clusterState.nodes().resolveNodes(request.nodesIds());
String[] nodesIds = clusterState.nodes().resolveNodesIds(request.nodesIds());
this.nodesIds = filterNodeIds(clusterState.nodes(), nodesIds);
this.responses = new AtomicReferenceArray<Object>(this.nodesIds.length);
}

View File

@ -22,6 +22,7 @@ package org.elasticsearch.cluster.node;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.UnmodifiableIterator;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.MapBuilder;
@ -177,7 +178,18 @@ public class DiscoveryNodes implements Iterable<DiscoveryNode> {
return nodesIds == null || nodesIds.length == 0 || (nodesIds.length == 1 && nodesIds[0].equals("_all"));
}
public String[] resolveNodes(String... nodesIds) {
public DiscoveryNode resolveNode(String node) {
String[] resolvedNodeIds = resolveNodesIds(node);
if (resolvedNodeIds.length > 1) {
throw new ElasticSearchIllegalArgumentException("resolved [" + node + "] into [" + resolvedNodeIds.length + "] nodes, where expected to be resolved to a single node");
}
if (resolvedNodeIds.length == 0) {
throw new ElasticSearchIllegalArgumentException("failed to resolve [" + node + " ], no matching nodes");
}
return nodes.get(resolvedNodeIds[0]);
}
public String[] resolveNodesIds(String... nodesIds) {
if (isAllNodes(nodesIds)) {
int index = 0;
nodesIds = new String[nodes.size()];

View File

@ -243,7 +243,8 @@ public class ImmutableShardRouting implements Streamable, Serializable, ShardRou
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
// we check on instanceof so we also handle the MutableShardRouting case as well
if (o == null || !(o instanceof ImmutableShardRouting)) return false;
ImmutableShardRouting that = (ImmutableShardRouting) o;

View File

@ -24,11 +24,9 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.*;
import static com.google.common.collect.Lists.newArrayList;
import static com.google.common.collect.Maps.newHashMap;
@ -57,6 +55,12 @@ public class RoutingNodes implements Iterable<RoutingNode> {
this.blocks = clusterState.blocks();
this.routingTable = clusterState.routingTable();
Map<String, List<MutableShardRouting>> nodesToShards = newHashMap();
// fill in the nodeToShards with the "live" nodes
for (DiscoveryNode node : clusterState.nodes().dataNodes().values()) {
nodesToShards.put(node.id(), new ArrayList<MutableShardRouting>());
}
// fill in the inverse of node -> shards allocated
for (IndexRoutingTable indexRoutingTable : routingTable.indicesRouting().values()) {
for (IndexShardRoutingTable indexShard : indexRoutingTable) {
for (ShardRouting shard : indexShard) {

View File

@ -20,11 +20,13 @@
package org.elasticsearch.cluster.routing.allocation;
import com.google.common.collect.Lists;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocators;
import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
@ -32,10 +34,13 @@ import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.node.settings.NodeSettingsService;
import java.util.*;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import static com.google.common.collect.Sets.newHashSet;
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING;
/**
*
@ -93,7 +98,7 @@ public class AllocationService extends AbstractComponent {
// shuffle the unassigned nodes, just so we won't have things like poison failed shards
Collections.shuffle(routingNodes.unassigned());
FailedRerouteAllocation allocation = new FailedRerouteAllocation(allocationDeciders, routingNodes, clusterState.nodes(), failedShard);
boolean changed = applyFailedShard(allocation);
boolean changed = applyFailedShard(allocation, failedShard);
if (!changed) {
return new RoutingAllocation.Result(false, clusterState.routingTable(), allocation.explanation());
}
@ -102,6 +107,19 @@ public class AllocationService extends AbstractComponent {
return new RoutingAllocation.Result(true, new RoutingTable.Builder().updateNodes(routingNodes).build().validateRaiseException(clusterState.metaData()), allocation.explanation());
}
public RoutingAllocation.Result reroute(ClusterState clusterState, AllocationCommands commands) throws ElasticSearchException {
RoutingNodes routingNodes = clusterState.routingNodes();
// we don't shuffle the unassigned shards here, to try and get as close as possible to
// a consistent result of the effect the commands have on the routing
// this allows systems to dry run the commands, see the resulting cluster state, and act on it
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, clusterState.nodes());
commands.execute(allocation);
// the assumption is that commands will move / act on shards (or fail through exceptions)
// so, there will always be shard "movements", so no need to check on reroute
reroute(allocation);
return new RoutingAllocation.Result(true, new RoutingTable.Builder().updateNodes(routingNodes).build().validateRaiseException(clusterState.metaData()), allocation.explanation());
}
/**
* Reroutes the routing table based on the live nodes.
* <p/>
@ -131,10 +149,10 @@ public class AllocationService extends AbstractComponent {
Iterable<DiscoveryNode> dataNodes = allocation.nodes().dataNodes().values();
boolean changed = false;
// first, clear from the shards any node id they used to belong to that is now dead
changed |= deassociateDeadNodes(allocation.routingNodes(), dataNodes);
changed |= deassociateDeadNodes(allocation);
// create a sorted list of from nodes with least number of shards to the maximum ones
applyNewNodes(allocation.routingNodes(), dataNodes);
applyNewNodes(allocation);
// elect primaries *before* allocating unassigned, so backups of primaries that failed
// will be moved to primary state and not wait for primaries to be allocated and recovered (*from gateway*)
@ -151,10 +169,10 @@ public class AllocationService extends AbstractComponent {
boolean changed = false;
// first, clear from the shards any node id they used to belong to that is now dead
changed |= deassociateDeadNodes(allocation.routingNodes(), dataNodes);
changed |= deassociateDeadNodes(allocation);
// create a sorted list of from nodes with least number of shards to the maximum ones
applyNewNodes(allocation.routingNodes(), dataNodes);
applyNewNodes(allocation);
// elect primaries *before* allocating unassigned, so backups of primaries that failed
// will be moved to primary state and not wait for primaries to be allocated and recovered (*from gateway*)
@ -261,109 +279,34 @@ public class AllocationService extends AbstractComponent {
/**
* Applies the new nodes to the routing nodes and returns them (just the
* new nodes);
*
* @param liveNodes currently live nodes.
*/
private void applyNewNodes(RoutingNodes routingNodes, Iterable<DiscoveryNode> liveNodes) {
for (DiscoveryNode node : liveNodes) {
if (!routingNodes.nodesToShards().containsKey(node.id())) {
private void applyNewNodes(RoutingAllocation allocation) {
for (DiscoveryNode node : allocation.nodes().dataNodes().values()) {
if (!allocation.routingNodes().nodesToShards().containsKey(node.id())) {
RoutingNode routingNode = new RoutingNode(node.id(), node);
routingNodes.nodesToShards().put(node.id(), routingNode);
allocation.routingNodes().nodesToShards().put(node.id(), routingNode);
}
}
}
private boolean deassociateDeadNodes(RoutingNodes routingNodes, Iterable<DiscoveryNode> liveNodes) {
private boolean deassociateDeadNodes(RoutingAllocation allocation) {
boolean changed = false;
Set<String> liveNodeIds = newHashSet();
for (DiscoveryNode liveNode : liveNodes) {
liveNodeIds.add(liveNode.id());
}
Set<String> nodeIdsToRemove = newHashSet();
for (RoutingNode routingNode : routingNodes) {
for (Iterator<MutableShardRouting> shardsIterator = routingNode.shards().iterator(); shardsIterator.hasNext(); ) {
MutableShardRouting shardRoutingEntry = shardsIterator.next();
if (!shardRoutingEntry.assignedToNode()) {
throw new ElasticSearchIllegalStateException(shardRoutingEntry.shardId() + " is not assigned to a node, but listed on as existing on node [" + routingNode.nodeId() + "]");
}
// we store the relocation state here since when we call de-assign node
// later on, we will loose this state
boolean relocating = shardRoutingEntry.relocating();
String relocatingNodeId = shardRoutingEntry.relocatingNodeId();
// is this the destination shard that we are relocating an existing shard to?
// we know this since it has a relocating node id (the node we relocate from) and our state is INITIALIZING (and not RELOCATING)
boolean isRelocationDestinationShard = relocatingNodeId != null && shardRoutingEntry.initializing();
boolean remove = false;
boolean currentNodeIsDead = false;
if (!liveNodeIds.contains(shardRoutingEntry.currentNodeId())) {
changed = true;
nodeIdsToRemove.add(shardRoutingEntry.currentNodeId());
if (!isRelocationDestinationShard) {
routingNodes.unassigned().add(shardRoutingEntry);
}
shardRoutingEntry.deassignNode();
currentNodeIsDead = true;
remove = true;
}
// move source shard back to active state and cancel relocation mode.
if (relocating && !liveNodeIds.contains(relocatingNodeId)) {
nodeIdsToRemove.add(relocatingNodeId);
if (!currentNodeIsDead) {
changed = true;
shardRoutingEntry.cancelRelocation();
}
}
if (isRelocationDestinationShard && !liveNodeIds.contains(relocatingNodeId)) {
changed = true;
remove = true;
}
if (remove) {
shardsIterator.remove();
}
}
}
for (String nodeIdToRemove : nodeIdsToRemove) {
routingNodes.nodesToShards().remove(nodeIdToRemove);
}
// now, go over shards that are initializing and recovering from primary shards that are now down...
for (RoutingNode routingNode : routingNodes) {
for (Iterator<MutableShardRouting> shardsIterator = routingNode.shards().iterator(); shardsIterator.hasNext(); ) {
MutableShardRouting shardRoutingEntry = shardsIterator.next();
if (!shardRoutingEntry.assignedToNode()) {
throw new ElasticSearchIllegalStateException(shardRoutingEntry.shardId() + " is not assigned to a node, but listed on as existing on node [" + routingNode.nodeId() + "]");
}
// we always recover from primaries, so we care about replicas that are not primaries
if (shardRoutingEntry.primary()) {
for (Iterator<RoutingNode> it = allocation.routingNodes().nodesToShards().values().iterator(); it.hasNext(); ) {
RoutingNode node = it.next();
if (allocation.nodes().dataNodes().containsKey(node.nodeId())) {
// its a live node, continue
continue;
}
// if its not initializing, then its not recovering from the primary
if (!shardRoutingEntry.initializing()) {
continue;
changed = true;
// now, go over all the shards routing on the node, and fail them
for (MutableShardRouting shardRouting : new ArrayList<MutableShardRouting>(node.shards())) {
// we create a copy of the shard routing, since applyFailedShard assumes its a new copy
applyFailedShard(allocation, shardRouting);
}
// its initializing because its relocating from another node (its replica recovering from another replica)
if (shardRoutingEntry.relocatingNodeId() != null) {
continue;
// its a dead node, remove it, note, its important to remove it *after* we apply failed shard
// since it relies on the fact that the RoutingNode exists in the list of nodes
it.remove();
}
for (MutableShardRouting unassignedShardRouting : routingNodes.unassigned()) {
// double check on the unassignedShardRouting.primary(), but it has to be a primary... (well, we double checked actually before...)
if (unassignedShardRouting.shardId().equals(shardRoutingEntry.shardId()) && unassignedShardRouting.primary()) {
// remove it...
routingNodes.unassigned().add(shardRoutingEntry);
shardRoutingEntry.deassignNode();
shardsIterator.remove();
break;
}
}
}
}
return changed;
}
@ -419,87 +362,132 @@ public class AllocationService extends AbstractComponent {
* Applies the relevant logic to handle a failed shard. Returns <tt>true</tt> if changes happened that
* require relocation.
*/
private boolean applyFailedShard(FailedRerouteAllocation allocation) {
IndexRoutingTable indexRoutingTable = allocation.routingTable().index(allocation.failedShard().index());
private boolean applyFailedShard(RoutingAllocation allocation, ShardRouting failedShard) {
// create a copy of the failed shard, since we assume we can change possible refernces to it without
// changing the state of failed shard
failedShard = new ImmutableShardRouting(failedShard);
IndexRoutingTable indexRoutingTable = allocation.routingTable().index(failedShard.index());
if (indexRoutingTable == null) {
return false;
}
ShardRouting failedShard = allocation.failedShard();
boolean shardDirty = false;
boolean inRelocation = failedShard.relocatingNodeId() != null;
if (inRelocation) {
RoutingNode routingNode = allocation.routingNodes().nodesToShards().get(failedShard.currentNodeId());
if (routingNode != null) {
Iterator<MutableShardRouting> shards = routingNode.iterator();
while (shards.hasNext()) {
MutableShardRouting shard = shards.next();
if (shard.shardId().equals(failedShard.shardId())) {
shardDirty = true;
shard.deassignNode();
shards.remove();
break;
}
}
}
}
String nodeId = inRelocation ? failedShard.relocatingNodeId() : failedShard.currentNodeId();
RoutingNode currentRoutingNode = allocation.routingNodes().nodesToShards().get(nodeId);
if (currentRoutingNode == null) {
// already failed (might be called several times for the same shard)
return false;
}
Iterator<MutableShardRouting> shards = currentRoutingNode.iterator();
while (shards.hasNext()) {
MutableShardRouting shard = shards.next();
if (shard.shardId().equals(failedShard.shardId())) {
shardDirty = true;
if (!inRelocation) {
shard.deassignNode();
shards.remove();
} else {
shard.cancelRelocation();
}
break;
}
}
if (!shardDirty) {
return false;
}
if (failedShard.relocatingNodeId() != null) {
// the shard is relocating, either in initializing (recovery from another node) or relocating (moving to another node)
if (failedShard.state() == INITIALIZING) {
// the shard is initializing and recovering from another node
boolean dirty = false;
// first, we need to cancel the current node that is being initialized
RoutingNode initializingNode = allocation.routingNodes().node(failedShard.currentNodeId());
if (initializingNode != null) {
for (Iterator<MutableShardRouting> it = initializingNode.iterator(); it.hasNext(); ) {
MutableShardRouting shardRouting = it.next();
if (shardRouting.equals(failedShard)) {
dirty = true;
it.remove();
shardRouting.deassignNode();
// make sure we ignore this shard on the relevant node
allocation.addIgnoreShardForNode(failedShard.shardId(), failedShard.currentNodeId());
// if in relocation no need to find a new target, just cancel the relocation.
if (inRelocation) {
return true; // lets true, so we reroute in this case
break;
}
}
}
if (dirty) {
// now, find the node that we are relocating *from*, and cancel its relocation
RoutingNode relocatingFromNode = allocation.routingNodes().node(failedShard.relocatingNodeId());
if (relocatingFromNode != null) {
for (Iterator<MutableShardRouting> it = relocatingFromNode.iterator(); it.hasNext(); ) {
MutableShardRouting shardRouting = it.next();
if (shardRouting.shardId().equals(failedShard.shardId()) && shardRouting.state() == RELOCATING) {
dirty = true;
shardRouting.cancelRelocation();
break;
}
}
}
}
return dirty;
} else if (failedShard.state() == RELOCATING) {
boolean dirty = false;
// the shard is relocating, meaning its the source the shard is relocating from
// first, we need to cancel the current relocation from the current node
// now, find the node that we are recovering from, cancel the relocation, remove it from the node
// and add it to the unassigned shards list...
RoutingNode relocatingFromNode = allocation.routingNodes().node(failedShard.currentNodeId());
if (relocatingFromNode != null) {
for (Iterator<MutableShardRouting> it = relocatingFromNode.iterator(); it.hasNext(); ) {
MutableShardRouting shardRouting = it.next();
if (shardRouting.equals(failedShard)) {
dirty = true;
shardRouting.cancelRelocation();
it.remove();
// make sure we ignore this shard on the relevant node
allocation.addIgnoreShardForNode(failedShard.shardId(), failedShard.currentNodeId());
allocation.routingNodes().unassigned().add(new MutableShardRouting(failedShard.index(), failedShard.id(),
null, failedShard.primary(), ShardRoutingState.UNASSIGNED, failedShard.version() + 1));
break;
}
}
}
if (dirty) {
// next, we need to find the target initializing shard that is recovering from, and remove it...
RoutingNode initializingNode = allocation.routingNodes().node(failedShard.relocatingNodeId());
if (initializingNode != null) {
for (Iterator<MutableShardRouting> it = initializingNode.iterator(); it.hasNext(); ) {
MutableShardRouting shardRouting = it.next();
if (shardRouting.shardId().equals(failedShard.shardId()) && shardRouting.state() == INITIALIZING) {
dirty = true;
shardRouting.deassignNode();
it.remove();
}
}
}
}
return dirty;
} else {
throw new ElasticSearchIllegalStateException("illegal state for a failed shard, relocating node id is set, but state does not match: " + failedShard);
}
} else {
// the shard is not relocating, its either started, or initializing, just cancel it and move on...
boolean dirty = false;
RoutingNode node = allocation.routingNodes().node(failedShard.currentNodeId());
if (node != null) {
for (Iterator<MutableShardRouting> it = node.iterator(); it.hasNext(); ) {
MutableShardRouting shardRouting = it.next();
if (shardRouting.equals(failedShard)) {
dirty = true;
// make sure we ignore this shard on the relevant node
allocation.addIgnoreShardForNode(failedShard.shardId(), failedShard.currentNodeId());
it.remove();
// move all the shards matching the failed shard to the end of the unassigned list
// so we give a chance for other allocations and won't create poison failed allocations
// that can keep other shards from being allocated (because of limits applied on how many
// shards we can start per node)
List<MutableShardRouting> shardsToMove = Lists.newArrayList();
for (Iterator<MutableShardRouting> it = allocation.routingNodes().unassigned().iterator(); it.hasNext(); ) {
MutableShardRouting shardRouting = it.next();
if (shardRouting.shardId().equals(failedShard.shardId())) {
it.remove();
shardsToMove.add(shardRouting);
for (Iterator<MutableShardRouting> unassignedIt = allocation.routingNodes().unassigned().iterator(); unassignedIt.hasNext(); ) {
MutableShardRouting unassignedShardRouting = unassignedIt.next();
if (unassignedShardRouting.shardId().equals(failedShard.shardId())) {
unassignedIt.remove();
shardsToMove.add(unassignedShardRouting);
}
}
if (!shardsToMove.isEmpty()) {
allocation.routingNodes().unassigned().addAll(shardsToMove);
}
// add the failed shard to the unassigned shards
allocation.routingNodes().unassigned().add(new MutableShardRouting(failedShard.index(), failedShard.id(),
null, failedShard.primary(), ShardRoutingState.UNASSIGNED, failedShard.version() + 1));
return true;
break;
}
}
}
return dirty;
}
}
}

View File

@ -71,6 +71,8 @@ public class RoutingAllocation {
private Map<ShardId, String> ignoredShardToNodes = null;
private Map<ShardId, String> ignoreDisable = null;
public RoutingAllocation(AllocationDeciders deciders, RoutingNodes routingNodes, DiscoveryNodes nodes) {
this.deciders = deciders;
this.routingNodes = routingNodes;
@ -101,6 +103,17 @@ public class RoutingAllocation {
return explanation;
}
public void addIgnoreDisable(ShardId shardId, String nodeId) {
if (ignoreDisable == null) {
ignoreDisable = new HashMap<ShardId, String>();
}
ignoreDisable.put(shardId, nodeId);
}
public boolean shouldIgnoreDisable(ShardId shardId, String nodeId) {
return ignoreDisable != null && nodeId.equals(ignoreDisable.get(shardId));
}
public void addIgnoreShardForNode(ShardId shardId, String nodeId) {
if (ignoredShardToNodes == null) {
ignoredShardToNodes = new HashMap<ShardId, String>();

View File

@ -0,0 +1,86 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.routing.allocation.command;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.MutableShardRouting;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.index.shard.ShardId;
import java.util.Iterator;
/**
* Allocates an unassigned shard to a specific node. Note, primary allocation will "force"
* allocation which might mean one will loose data if using local gateway..., use with care
* with the <tt>allowPrimary</tt> flag.
*/
public class AllocateAllocationCommand implements AllocationCommand {
private final ShardId shardId;
private final String nodeId;
private final boolean allowPrimary;
public AllocateAllocationCommand(ShardId shardId, String nodeId, boolean allowPrimary) {
this.shardId = shardId;
this.nodeId = nodeId;
this.allowPrimary = allowPrimary;
}
@Override
public void execute(RoutingAllocation allocation) throws ElasticSearchException {
DiscoveryNode node = allocation.nodes().resolveNode(nodeId);
MutableShardRouting shardRouting = null;
for (MutableShardRouting routing : allocation.routingNodes().unassigned()) {
if (routing.shardId().equals(shardId)) {
// prefer primaries first to allocate
if (shardRouting == null || routing.primary()) {
shardRouting = routing;
}
}
}
if (shardRouting == null) {
throw new ElasticSearchIllegalArgumentException("[allocate] failed to find " + shardId + " on the list of unassigned shards");
}
if (shardRouting.primary() && !allowPrimary) {
throw new ElasticSearchIllegalArgumentException("[allocate] trying to allocate a primary shard " + shardId + "], which is disabled");
}
RoutingNode routingNode = allocation.routingNodes().node(node.id());
allocation.addIgnoreDisable(shardRouting.shardId(), routingNode.nodeId());
if (!allocation.deciders().canAllocate(shardRouting, routingNode, allocation).allowed()) {
throw new ElasticSearchIllegalArgumentException("[allocate] allocation of " + shardId + " on node " + node + " is not allowed");
}
// go over and remove it from the unassigned
for (Iterator<MutableShardRouting> it = allocation.routingNodes().unassigned().iterator(); it.hasNext(); ) {
if (it.next() != shardRouting) {
continue;
}
it.remove();
routingNode.add(shardRouting);
break;
}
}
}

View File

@ -0,0 +1,30 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.routing.allocation.command;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
/**
*/
public interface AllocationCommand {
void execute(RoutingAllocation allocation) throws ElasticSearchException;
}

View File

@ -0,0 +1,53 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.routing.allocation.command;
import com.google.common.collect.Lists;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import java.util.Arrays;
import java.util.List;
/**
*/
public class AllocationCommands {
private final List<AllocationCommand> commands = Lists.newArrayList();
public AllocationCommands(AllocationCommand... commands) {
if (commands != null) {
this.commands.addAll(Arrays.asList(commands));
}
}
public AllocationCommands add(AllocationCommand... commands) {
if (commands != null) {
this.commands.addAll(Arrays.asList(commands));
}
return this;
}
public void execute(RoutingAllocation allocation) throws ElasticSearchException {
for (AllocationCommand command : commands) {
command.execute(allocation);
}
}
}

View File

@ -0,0 +1,114 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.routing.allocation.command;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.MutableShardRouting;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.index.shard.ShardId;
import java.util.Iterator;
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING;
/**
* A command that cancel relocation, or recovery of a given shard on a node.
*/
public class CancelAllocationCommand implements AllocationCommand {
private final ShardId shardId;
private final String nodeId;
public CancelAllocationCommand(ShardId shardId, String node) {
this.shardId = shardId;
this.nodeId = node;
}
@Override
public void execute(RoutingAllocation allocation) throws ElasticSearchException {
DiscoveryNode node = allocation.nodes().resolveNode(nodeId);
boolean found = false;
for (Iterator<MutableShardRouting> it = allocation.routingNodes().node(node.id()).iterator(); it.hasNext(); ) {
MutableShardRouting shardRouting = it.next();
if (!shardRouting.shardId().equals(shardId)) {
continue;
}
found = true;
if (shardRouting.relocatingNodeId() != null) {
if (shardRouting.initializing()) {
// the shard is initializing and recovering from another node, simply cancel the recovery
it.remove();
shardRouting.deassignNode();
// and cancel the relocating state from the shard its being relocated from
RoutingNode relocatingFromNode = allocation.routingNodes().node(shardRouting.relocatingNodeId());
if (relocatingFromNode != null) {
for (MutableShardRouting fromShardRouting : relocatingFromNode) {
if (fromShardRouting.shardId().equals(shardRouting.shardId()) && shardRouting.state() == RELOCATING) {
fromShardRouting.cancelRelocation();
break;
}
}
}
} else if (shardRouting.relocating()) {
// the shard is relocating to another node, cancel the recovery on the other node, and deallocate this one
if (shardRouting.primary()) {
// can't cancel a primary shard being initialized
throw new ElasticSearchIllegalArgumentException("[cancel_allocation] can't cancel " + shardId + " on node " + node + ", shard is primary and initializing its state");
}
it.remove();
allocation.routingNodes().unassigned().add(new MutableShardRouting(shardRouting.index(), shardRouting.id(),
null, shardRouting.primary(), ShardRoutingState.UNASSIGNED, shardRouting.version() + 1));
// now, go and find the shard that is initializing on the target node, and cancel it as well...
RoutingNode initializingNode = allocation.routingNodes().node(shardRouting.relocatingNodeId());
if (initializingNode != null) {
for (Iterator<MutableShardRouting> itX = initializingNode.iterator(); itX.hasNext(); ) {
MutableShardRouting initializingShardRouting = itX.next();
if (initializingShardRouting.shardId().equals(shardRouting.shardId()) && initializingShardRouting.state() == INITIALIZING) {
shardRouting.deassignNode();
itX.remove();
}
}
}
}
} else {
// the shard is not relocating, its either started, or initializing, just cancel it and move on...
if (shardRouting.primary()) {
// can't cancel a primary shard being initialized
throw new ElasticSearchIllegalArgumentException("[cancel_allocation] can't cancel " + shardId + " on node " + node + ", shard is primary and initializing its state");
}
it.remove();
allocation.routingNodes().unassigned().add(new MutableShardRouting(shardRouting.index(), shardRouting.id(),
null, shardRouting.primary(), ShardRoutingState.UNASSIGNED, shardRouting.version() + 1));
}
}
if (!found) {
throw new ElasticSearchIllegalArgumentException("[cancel_allocation] can't cancel " + shardId + ", failed to find it on node " + node);
}
}
}

View File

@ -0,0 +1,87 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.routing.allocation.command;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.MutableShardRouting;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.index.shard.ShardId;
/**
* A command that moves a shard from a specific node to another node. Note, the shards
* need to be in "started" state in order to be moved if from is specified.
*/
public class MoveAllocationCommand implements AllocationCommand {
private final ShardId shardId;
@Nullable
private final String fromNode;
private final String toNode;
public MoveAllocationCommand(ShardId shardId, @Nullable String fromNode, String toNode) {
this.shardId = shardId;
this.fromNode = fromNode;
this.toNode = toNode;
}
@Override
public void execute(RoutingAllocation allocation) throws ElasticSearchException {
DiscoveryNode from = allocation.nodes().resolveNode(fromNode);
DiscoveryNode to = allocation.nodes().resolveNode(toNode);
boolean found = false;
for (MutableShardRouting shardRouting : allocation.routingNodes().node(from.id())) {
if (!shardRouting.shardId().equals(shardId)) {
continue;
}
found = true;
// TODO we can possibly support also relocating cases, where we cancel relocation and move...
if (!shardRouting.started()) {
throw new ElasticSearchIllegalArgumentException("[move_allocation] can't move " + shardId + ", shard is not started (state = " + shardRouting.state() + "]");
}
RoutingNode toRoutingNode = allocation.routingNodes().node(to.id());
AllocationDecider.Decision decision = allocation.deciders().canAllocate(shardRouting, toRoutingNode, allocation);
if (!decision.allowed()) {
throw new ElasticSearchIllegalArgumentException("[move_allocation] can't move " + shardId + ", from " + from + ", to " + to + ", since its not allowed");
}
if (!decision.allocate()) {
// its being throttled, maybe have a flag to take it into account and fail? for now, just do it since the "user" wants it...
}
toRoutingNode.add(new MutableShardRouting(shardRouting.index(), shardRouting.id(),
toRoutingNode.nodeId(), shardRouting.currentNodeId(),
shardRouting.primary(), ShardRoutingState.INITIALIZING, shardRouting.version() + 1));
shardRouting.relocate(toRoutingNode.nodeId());
}
if (!found) {
throw new ElasticSearchIllegalArgumentException("[move_allocation] can't move " + shardId + ", failed to find it on node " + from);
}
}
}

View File

@ -27,8 +27,6 @@ import org.elasticsearch.common.settings.Settings;
/**
* A pluggable logic allowing to control if allocation of a shard is allowed on a specific node.
*
*
*/
public abstract class AllocationDecider extends AbstractComponent {
@ -38,21 +36,46 @@ public abstract class AllocationDecider extends AbstractComponent {
public boolean allocate() {
return true;
}
@Override
public boolean allowed() {
return true;
}
},
NO {
@Override
public boolean allocate() {
return false;
}
@Override
public boolean allowed() {
return false;
}
},
THROTTLE {
@Override
public boolean allocate() {
return false;
}
@Override
public boolean allowed() {
return true;
}
};
/**
* It can be allocated *now* on a node. Note, it might be {@link #allowed()} to be allocated
* on a node, yet, allocate will be <tt>false</tt> since its being throttled for example.
*/
public abstract boolean allocate();
/**
* Is allocation allowed on a node. Note, this does not mean that we should allocate *now*,
* though, in extreme cases, we might "force" allocation.
*/
public abstract boolean allowed();
}
protected AllocationDecider(Settings settings) {

View File

@ -70,10 +70,10 @@ public class DisableAllocationDecider extends AllocationDecider {
@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
if (disableAllocation) {
return Decision.NO;
return allocation.shouldIgnoreDisable(shardRouting.shardId(), node.nodeId()) ? Decision.YES : Decision.NO;
}
if (disableReplicaAllocation) {
return shardRouting.primary() ? Decision.YES : Decision.NO;
return shardRouting.primary() ? Decision.YES : allocation.shouldIgnoreDisable(shardRouting.shardId(), node.nodeId()) ? Decision.YES : Decision.NO;
}
return Decision.YES;
}

View File

@ -40,7 +40,8 @@ import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF
import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.equalTo;
/**
*
@ -180,11 +181,11 @@ public class IndexLifecycleActionTests extends AbstractNodesTests {
clusterState2 = clusterService2.state();
routingNodeEntry2 = clusterState2.readOnlyRoutingNodes().nodesToShards().get(clusterState2.nodes().localNodeId());
assertThat(routingNodeEntry2, nullValue());
assertThat(routingNodeEntry2.shards().isEmpty(), equalTo(true));
clusterState3 = clusterService3.state();
routingNodeEntry3 = clusterState3.readOnlyRoutingNodes().nodesToShards().get(clusterState3.nodes().localNodeId());
assertThat(routingNodeEntry3, nullValue());
assertThat(routingNodeEntry3.shards().isEmpty(), equalTo(true));
}
@Test
@ -314,11 +315,11 @@ public class IndexLifecycleActionTests extends AbstractNodesTests {
clusterState2 = clusterService2.state();
routingNodeEntry2 = clusterState2.readOnlyRoutingNodes().nodesToShards().get(clusterState2.nodes().localNodeId());
assertThat(routingNodeEntry2, nullValue());
assertThat(routingNodeEntry2.shards().isEmpty(), equalTo(true));
clusterState3 = clusterService3.state();
routingNodeEntry3 = clusterState3.readOnlyRoutingNodes().nodesToShards().get(clusterState3.nodes().localNodeId());
assertThat(routingNodeEntry3, nullValue());
assertThat(routingNodeEntry3.shards().isEmpty(), equalTo(true));
}
@Test

View File

@ -0,0 +1,287 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test.unit.cluster.routing.allocation;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.command.AllocateAllocationCommand;
import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands;
import org.elasticsearch.cluster.routing.allocation.command.CancelAllocationCommand;
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.index.shard.ShardId;
import org.testng.annotations.Test;
import static org.elasticsearch.cluster.ClusterState.newClusterStateBuilder;
import static org.elasticsearch.cluster.metadata.IndexMetaData.newIndexMetaDataBuilder;
import static org.elasticsearch.cluster.metadata.MetaData.newMetaDataBuilder;
import static org.elasticsearch.cluster.node.DiscoveryNodes.newNodesBuilder;
import static org.elasticsearch.cluster.routing.RoutingBuilders.indexRoutingTable;
import static org.elasticsearch.cluster.routing.RoutingBuilders.routingTable;
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.elasticsearch.test.unit.cluster.routing.allocation.RoutingAllocationTests.newNode;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
/**
*/
@Test
public class AllocationCommandsTests {
private final ESLogger logger = Loggers.getLogger(AllocationCommandsTests.class);
@Test
public void moveShardCommand() {
AllocationService allocation = new AllocationService(settingsBuilder().put("cluster.routing.allocation.concurrent_recoveries", 10).build());
logger.info("creating an index with 1 shard, no replica");
MetaData metaData = newMetaDataBuilder()
.put(newIndexMetaDataBuilder("test").numberOfShards(1).numberOfReplicas(0))
.build();
RoutingTable routingTable = routingTable()
.add(indexRoutingTable("test").initializeEmpty(metaData.index("test")))
.build();
ClusterState clusterState = newClusterStateBuilder().metaData(metaData).routingTable(routingTable).build();
logger.info("adding two nodes and performing rerouting");
clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().put(newNode("node1")).put(newNode("node2"))).build();
RoutingAllocation.Result rerouteResult = allocation.reroute(clusterState);
clusterState = newClusterStateBuilder().state(clusterState).routingTable(rerouteResult.routingTable()).build();
logger.info("start primary shard");
rerouteResult = allocation.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING));
clusterState = newClusterStateBuilder().state(clusterState).routingTable(rerouteResult.routingTable()).build();
logger.info("move the shard");
String existingNodeId = clusterState.routingTable().index("test").shard(0).primaryShard().currentNodeId();
String toNodeId;
if ("node1".equals(existingNodeId)) {
toNodeId = "node2";
} else {
toNodeId = "node1";
}
rerouteResult = allocation.reroute(clusterState, new AllocationCommands(new MoveAllocationCommand(new ShardId("test", 0), existingNodeId, toNodeId)));
assertThat(rerouteResult.changed(), equalTo(true));
clusterState = newClusterStateBuilder().state(clusterState).routingTable(rerouteResult.routingTable()).build();
assertThat(clusterState.routingNodes().node(existingNodeId).shards().get(0).state(), equalTo(ShardRoutingState.RELOCATING));
assertThat(clusterState.routingNodes().node(toNodeId).shards().get(0).state(), equalTo(ShardRoutingState.INITIALIZING));
logger.info("finish moving the shard");
rerouteResult = allocation.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING));
clusterState = newClusterStateBuilder().state(clusterState).routingTable(rerouteResult.routingTable()).build();
assertThat(clusterState.routingNodes().node(existingNodeId).shards().isEmpty(), equalTo(true));
assertThat(clusterState.routingNodes().node(toNodeId).shards().get(0).state(), equalTo(ShardRoutingState.STARTED));
}
@Test
public void allocateCommand() {
AllocationService allocation = new AllocationService(settingsBuilder()
.put("cluster.routing.allocation.disable_allocation", true)
.build());
logger.info("--> building initial routing table");
MetaData metaData = newMetaDataBuilder()
.put(newIndexMetaDataBuilder("test").numberOfShards(1).numberOfReplicas(1))
.build();
RoutingTable routingTable = routingTable()
.add(indexRoutingTable("test").initializeEmpty(metaData.index("test")))
.build();
ClusterState clusterState = newClusterStateBuilder().metaData(metaData).routingTable(routingTable).build();
logger.info("--> adding 3 nodes on same rack and do rerouting");
clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder()
.put(newNode("node1"))
.put(newNode("node2"))
.put(newNode("node3"))
).build();
RoutingAllocation.Result rerouteResult = allocation.reroute(clusterState);
clusterState = newClusterStateBuilder().state(clusterState).routingTable(rerouteResult.routingTable()).build();
assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(0));
logger.info("--> allocating with primary flag set to false, should fail");
try {
allocation.reroute(clusterState, new AllocationCommands(new AllocateAllocationCommand(new ShardId("test", 0), "node1", false)));
assert false;
} catch (ElasticSearchIllegalArgumentException e) {
}
logger.info("--> allocating with primary flag set to true");
rerouteResult = allocation.reroute(clusterState, new AllocationCommands(new AllocateAllocationCommand(new ShardId("test", 0), "node1", true)));
assertThat(rerouteResult.changed(), equalTo(true));
clusterState = newClusterStateBuilder().state(clusterState).routingTable(rerouteResult.routingTable()).build();
assertThat(clusterState.routingNodes().node("node1").shards().size(), equalTo(1));
assertThat(clusterState.routingNodes().node("node1").shardsWithState(INITIALIZING).size(), equalTo(1));
assertThat(clusterState.routingNodes().node("node2").shards().size(), equalTo(0));
logger.info("--> start the primary shard");
rerouteResult = allocation.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING));
clusterState = newClusterStateBuilder().state(clusterState).routingTable(rerouteResult.routingTable()).build();
assertThat(clusterState.routingNodes().node("node1").shards().size(), equalTo(1));
assertThat(clusterState.routingNodes().node("node1").shardsWithState(STARTED).size(), equalTo(1));
assertThat(clusterState.routingNodes().node("node2").shards().size(), equalTo(0));
logger.info("--> allocate the replica shard on the primary shard node, should fail");
try {
allocation.reroute(clusterState, new AllocationCommands(new AllocateAllocationCommand(new ShardId("test", 0), "node1", false)));
assert false;
} catch (ElasticSearchIllegalArgumentException e) {
}
logger.info("--> allocate the replica shard on on the second node");
rerouteResult = allocation.reroute(clusterState, new AllocationCommands(new AllocateAllocationCommand(new ShardId("test", 0), "node2", false)));
assertThat(rerouteResult.changed(), equalTo(true));
clusterState = newClusterStateBuilder().state(clusterState).routingTable(rerouteResult.routingTable()).build();
assertThat(clusterState.routingNodes().node("node1").shards().size(), equalTo(1));
assertThat(clusterState.routingNodes().node("node1").shardsWithState(STARTED).size(), equalTo(1));
assertThat(clusterState.routingNodes().node("node2").shards().size(), equalTo(1));
assertThat(clusterState.routingNodes().node("node2").shardsWithState(INITIALIZING).size(), equalTo(1));
logger.info("--> start the replica shard");
rerouteResult = allocation.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING));
clusterState = newClusterStateBuilder().state(clusterState).routingTable(rerouteResult.routingTable()).build();
assertThat(clusterState.routingNodes().node("node1").shards().size(), equalTo(1));
assertThat(clusterState.routingNodes().node("node1").shardsWithState(STARTED).size(), equalTo(1));
assertThat(clusterState.routingNodes().node("node2").shards().size(), equalTo(1));
assertThat(clusterState.routingNodes().node("node2").shardsWithState(STARTED).size(), equalTo(1));
logger.info("--> verify that we fail when there are no unassigned shards");
try {
allocation.reroute(clusterState, new AllocationCommands(new AllocateAllocationCommand(new ShardId("test", 0), "node3", false)));
assert false;
} catch (ElasticSearchIllegalArgumentException e) {
}
}
@Test
public void cacnelCommand() {
AllocationService allocation = new AllocationService(settingsBuilder()
.put("cluster.routing.allocation.disable_allocation", true)
.build());
logger.info("--> building initial routing table");
MetaData metaData = newMetaDataBuilder()
.put(newIndexMetaDataBuilder("test").numberOfShards(1).numberOfReplicas(1))
.build();
RoutingTable routingTable = routingTable()
.add(indexRoutingTable("test").initializeEmpty(metaData.index("test")))
.build();
ClusterState clusterState = newClusterStateBuilder().metaData(metaData).routingTable(routingTable).build();
logger.info("--> adding 3 nodes");
clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder()
.put(newNode("node1"))
.put(newNode("node2"))
.put(newNode("node3"))
).build();
RoutingAllocation.Result rerouteResult = allocation.reroute(clusterState);
clusterState = newClusterStateBuilder().state(clusterState).routingTable(rerouteResult.routingTable()).build();
assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(0));
logger.info("--> allocating with primary flag set to true");
rerouteResult = allocation.reroute(clusterState, new AllocationCommands(new AllocateAllocationCommand(new ShardId("test", 0), "node1", true)));
assertThat(rerouteResult.changed(), equalTo(true));
clusterState = newClusterStateBuilder().state(clusterState).routingTable(rerouteResult.routingTable()).build();
assertThat(clusterState.routingNodes().node("node1").shards().size(), equalTo(1));
assertThat(clusterState.routingNodes().node("node1").shardsWithState(INITIALIZING).size(), equalTo(1));
assertThat(clusterState.routingNodes().node("node2").shards().size(), equalTo(0));
logger.info("--> cancel primary allocation, make sure it fails...");
try {
allocation.reroute(clusterState, new AllocationCommands(new CancelAllocationCommand(new ShardId("test", 0), "node1")));
assert false;
} catch (ElasticSearchIllegalArgumentException e) {
}
logger.info("--> start the primary shard");
rerouteResult = allocation.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING));
clusterState = newClusterStateBuilder().state(clusterState).routingTable(rerouteResult.routingTable()).build();
assertThat(clusterState.routingNodes().node("node1").shards().size(), equalTo(1));
assertThat(clusterState.routingNodes().node("node1").shardsWithState(STARTED).size(), equalTo(1));
assertThat(clusterState.routingNodes().node("node2").shards().size(), equalTo(0));
logger.info("--> cancel primary allocation, make sure it fails...");
try {
allocation.reroute(clusterState, new AllocationCommands(new CancelAllocationCommand(new ShardId("test", 0), "node1")));
assert false;
} catch (ElasticSearchIllegalArgumentException e) {
}
logger.info("--> allocate the replica shard on on the second node");
rerouteResult = allocation.reroute(clusterState, new AllocationCommands(new AllocateAllocationCommand(new ShardId("test", 0), "node2", false)));
assertThat(rerouteResult.changed(), equalTo(true));
clusterState = newClusterStateBuilder().state(clusterState).routingTable(rerouteResult.routingTable()).build();
assertThat(clusterState.routingNodes().node("node1").shards().size(), equalTo(1));
assertThat(clusterState.routingNodes().node("node1").shardsWithState(STARTED).size(), equalTo(1));
assertThat(clusterState.routingNodes().node("node2").shards().size(), equalTo(1));
assertThat(clusterState.routingNodes().node("node2").shardsWithState(INITIALIZING).size(), equalTo(1));
logger.info("--> cancel the relocation allocation");
rerouteResult = allocation.reroute(clusterState, new AllocationCommands(new CancelAllocationCommand(new ShardId("test", 0), "node2")));
assertThat(rerouteResult.changed(), equalTo(true));
clusterState = newClusterStateBuilder().state(clusterState).routingTable(rerouteResult.routingTable()).build();
assertThat(clusterState.routingNodes().node("node1").shards().size(), equalTo(1));
assertThat(clusterState.routingNodes().node("node1").shardsWithState(STARTED).size(), equalTo(1));
assertThat(clusterState.routingNodes().node("node2").shards().size(), equalTo(0));
assertThat(clusterState.routingNodes().node("node3").shards().size(), equalTo(0));
logger.info("--> allocate the replica shard on on the second node");
rerouteResult = allocation.reroute(clusterState, new AllocationCommands(new AllocateAllocationCommand(new ShardId("test", 0), "node2", false)));
assertThat(rerouteResult.changed(), equalTo(true));
clusterState = newClusterStateBuilder().state(clusterState).routingTable(rerouteResult.routingTable()).build();
assertThat(clusterState.routingNodes().node("node1").shards().size(), equalTo(1));
assertThat(clusterState.routingNodes().node("node1").shardsWithState(STARTED).size(), equalTo(1));
assertThat(clusterState.routingNodes().node("node2").shards().size(), equalTo(1));
assertThat(clusterState.routingNodes().node("node2").shardsWithState(INITIALIZING).size(), equalTo(1));
logger.info("--> cancel the primary being relocated, make sure it fails");
try {
allocation.reroute(clusterState, new AllocationCommands(new CancelAllocationCommand(new ShardId("test", 0), "node1")));
assert false;
} catch (ElasticSearchIllegalArgumentException e) {
}
logger.info("--> start the replica shard");
rerouteResult = allocation.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING));
clusterState = newClusterStateBuilder().state(clusterState).routingTable(rerouteResult.routingTable()).build();
assertThat(clusterState.routingNodes().node("node1").shards().size(), equalTo(1));
assertThat(clusterState.routingNodes().node("node1").shardsWithState(STARTED).size(), equalTo(1));
assertThat(clusterState.routingNodes().node("node2").shards().size(), equalTo(1));
assertThat(clusterState.routingNodes().node("node2").shardsWithState(STARTED).size(), equalTo(1));
logger.info("--> cancel allocation of the replica shard");
rerouteResult = allocation.reroute(clusterState, new AllocationCommands(new CancelAllocationCommand(new ShardId("test", 0), "node2")));
assertThat(rerouteResult.changed(), equalTo(true));
clusterState = newClusterStateBuilder().state(clusterState).routingTable(rerouteResult.routingTable()).build();
assertThat(clusterState.routingNodes().node("node1").shards().size(), equalTo(1));
assertThat(clusterState.routingNodes().node("node1").shardsWithState(STARTED).size(), equalTo(1));
assertThat(clusterState.routingNodes().node("node2").shards().size(), equalTo(0));
assertThat(clusterState.routingNodes().node("node3").shards().size(), equalTo(0));
}
}

View File

@ -36,11 +36,10 @@ import static org.elasticsearch.cluster.node.DiscoveryNodes.newNodesBuilder;
import static org.elasticsearch.cluster.routing.RoutingBuilders.indexRoutingTable;
import static org.elasticsearch.cluster.routing.RoutingBuilders.routingTable;
import static org.elasticsearch.cluster.routing.ShardRoutingState.*;
import static org.elasticsearch.test.unit.cluster.routing.allocation.RoutingAllocationTests.newNode;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.elasticsearch.test.unit.cluster.routing.allocation.RoutingAllocationTests.newNode;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
@Test
public class ClusterRebalanceRoutingTests {
@ -318,7 +317,7 @@ public class ClusterRebalanceRoutingTests {
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
assertThat(routingNodes.node("node3"), nullValue());
assertThat(routingNodes.node("node3").shards().isEmpty(), equalTo(true));
}
@Test
@ -525,7 +524,7 @@ public class ClusterRebalanceRoutingTests {
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
assertThat(routingNodes.node("node3"), nullValue());
assertThat(routingNodes.node("node3").shards().isEmpty(), equalTo(true));
}
@Test
@ -628,6 +627,6 @@ public class ClusterRebalanceRoutingTests {
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
assertThat(routingNodes.node("node3"), nullValue());
assertThat(routingNodes.node("node3").shards().isEmpty(), equalTo(true));
}
}

View File

@ -0,0 +1,253 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test.unit.cluster.routing.allocation;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands;
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.testng.annotations.Test;
import static org.elasticsearch.cluster.ClusterState.newClusterStateBuilder;
import static org.elasticsearch.cluster.metadata.IndexMetaData.newIndexMetaDataBuilder;
import static org.elasticsearch.cluster.metadata.MetaData.newMetaDataBuilder;
import static org.elasticsearch.cluster.node.DiscoveryNodes.newNodesBuilder;
import static org.elasticsearch.cluster.routing.RoutingBuilders.indexRoutingTable;
import static org.elasticsearch.cluster.routing.RoutingBuilders.routingTable;
import static org.elasticsearch.cluster.routing.ShardRoutingState.*;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.elasticsearch.test.unit.cluster.routing.allocation.RoutingAllocationTests.newNode;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
/**
*/
@Test
public class DeadNodesAllocationTests {
private final ESLogger logger = Loggers.getLogger(DeadNodesAllocationTests.class);
@Test
public void simpleDeadNodeOnStartedPrimaryShard() {
AllocationService allocation = new AllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put("cluster.routing.allocation.allow_rebalance", "always")
.build());
logger.info("--> building initial routing table");
MetaData metaData = newMetaDataBuilder()
.put(newIndexMetaDataBuilder("test").numberOfShards(1).numberOfReplicas(1))
.build();
RoutingTable routingTable = routingTable()
.add(indexRoutingTable("test").initializeEmpty(metaData.index("test")))
.build();
ClusterState clusterState = newClusterStateBuilder().metaData(metaData).routingTable(routingTable).build();
logger.info("--> adding 2 nodes on same rack and do rerouting");
clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder()
.put(newNode("node1"))
.put(newNode("node2"))
).build();
RoutingAllocation.Result rerouteResult = allocation.reroute(clusterState);
clusterState = newClusterStateBuilder().state(clusterState).routingTable(rerouteResult.routingTable()).build();
// starting primaries
rerouteResult = allocation.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING));
clusterState = newClusterStateBuilder().state(clusterState).routingTable(rerouteResult.routingTable()).build();
// starting replicas
rerouteResult = allocation.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING));
clusterState = newClusterStateBuilder().state(clusterState).routingTable(rerouteResult.routingTable()).build();
logger.info("--> verifying all is allocated");
assertThat(clusterState.routingNodes().node("node1").shards().size(), equalTo(1));
assertThat(clusterState.routingNodes().node("node1").shards().get(0).state(), equalTo(STARTED));
assertThat(clusterState.routingNodes().node("node2").shards().size(), equalTo(1));
assertThat(clusterState.routingNodes().node("node2").shards().get(0).state(), equalTo(STARTED));
logger.info("--> fail node with primary");
String nodeIdToFail = clusterState.routingTable().index("test").shard(0).primaryShard().currentNodeId();
String nodeIdRemaining = nodeIdToFail.equals("node1") ? "node2" : "node1";
clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder()
.put(newNode(nodeIdRemaining))
).build();
rerouteResult = allocation.reroute(clusterState);
clusterState = newClusterStateBuilder().state(clusterState).routingTable(rerouteResult.routingTable()).build();
assertThat(clusterState.routingNodes().node(nodeIdRemaining).shards().get(0).primary(), equalTo(true));
assertThat(clusterState.routingNodes().node(nodeIdRemaining).shards().get(0).state(), equalTo(STARTED));
}
@Test
public void deadNodeWhileRelocatingOnToNode() {
AllocationService allocation = new AllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put("cluster.routing.allocation.allow_rebalance", "always")
.build());
logger.info("--> building initial routing table");
MetaData metaData = newMetaDataBuilder()
.put(newIndexMetaDataBuilder("test").numberOfShards(1).numberOfReplicas(1))
.build();
RoutingTable routingTable = routingTable()
.add(indexRoutingTable("test").initializeEmpty(metaData.index("test")))
.build();
ClusterState clusterState = newClusterStateBuilder().metaData(metaData).routingTable(routingTable).build();
logger.info("--> adding 2 nodes on same rack and do rerouting");
clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder()
.put(newNode("node1"))
.put(newNode("node2"))
).build();
RoutingAllocation.Result rerouteResult = allocation.reroute(clusterState);
clusterState = newClusterStateBuilder().state(clusterState).routingTable(rerouteResult.routingTable()).build();
// starting primaries
rerouteResult = allocation.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING));
clusterState = newClusterStateBuilder().state(clusterState).routingTable(rerouteResult.routingTable()).build();
// starting replicas
rerouteResult = allocation.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING));
clusterState = newClusterStateBuilder().state(clusterState).routingTable(rerouteResult.routingTable()).build();
logger.info("--> verifying all is allocated");
assertThat(clusterState.routingNodes().node("node1").shards().size(), equalTo(1));
assertThat(clusterState.routingNodes().node("node1").shards().get(0).state(), equalTo(STARTED));
assertThat(clusterState.routingNodes().node("node2").shards().size(), equalTo(1));
assertThat(clusterState.routingNodes().node("node2").shards().get(0).state(), equalTo(STARTED));
logger.info("--> adding additional node");
clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().putAll(clusterState.nodes())
.put(newNode("node3"))
).build();
rerouteResult = allocation.reroute(clusterState);
clusterState = newClusterStateBuilder().state(clusterState).routingTable(rerouteResult.routingTable()).build();
assertThat(clusterState.routingNodes().node("node1").shards().size(), equalTo(1));
assertThat(clusterState.routingNodes().node("node1").shards().get(0).state(), equalTo(STARTED));
assertThat(clusterState.routingNodes().node("node2").shards().size(), equalTo(1));
assertThat(clusterState.routingNodes().node("node2").shards().get(0).state(), equalTo(STARTED));
assertThat(clusterState.routingNodes().node("node3").shards().size(), equalTo(0));
String origPrimaryNodeId = clusterState.routingTable().index("test").shard(0).primaryShard().currentNodeId();
String origReplicaNodeId = clusterState.routingTable().index("test").shard(0).replicaShards().get(0).currentNodeId();
logger.info("--> moving primary shard to node3");
rerouteResult = allocation.reroute(clusterState, new AllocationCommands(
new MoveAllocationCommand(clusterState.routingTable().index("test").shard(0).primaryShard().shardId(), clusterState.routingTable().index("test").shard(0).primaryShard().currentNodeId(), "node3"))
);
assertThat(rerouteResult.changed(), equalTo(true));
clusterState = newClusterStateBuilder().state(clusterState).routingTable(rerouteResult.routingTable()).build();
assertThat(clusterState.routingNodes().node(origPrimaryNodeId).shards().get(0).state(), equalTo(RELOCATING));
assertThat(clusterState.routingNodes().node("node3").shards().get(0).state(), equalTo(INITIALIZING));
logger.info("--> fail primary shard recovering instance on node3 being initialized by killing node3");
clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder()
.put(newNode(origPrimaryNodeId))
.put(newNode(origReplicaNodeId))
).build();
rerouteResult = allocation.reroute(clusterState);
clusterState = newClusterStateBuilder().state(clusterState).routingTable(rerouteResult.routingTable()).build();
assertThat(clusterState.routingNodes().node(origPrimaryNodeId).shards().get(0).state(), equalTo(STARTED));
assertThat(clusterState.routingNodes().node(origReplicaNodeId).shards().get(0).state(), equalTo(STARTED));
}
@Test
public void deadNodeWhileRelocatingOnFromNode() {
AllocationService allocation = new AllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put("cluster.routing.allocation.allow_rebalance", "always")
.build());
logger.info("--> building initial routing table");
MetaData metaData = newMetaDataBuilder()
.put(newIndexMetaDataBuilder("test").numberOfShards(1).numberOfReplicas(1))
.build();
RoutingTable routingTable = routingTable()
.add(indexRoutingTable("test").initializeEmpty(metaData.index("test")))
.build();
ClusterState clusterState = newClusterStateBuilder().metaData(metaData).routingTable(routingTable).build();
logger.info("--> adding 2 nodes on same rack and do rerouting");
clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder()
.put(newNode("node1"))
.put(newNode("node2"))
).build();
RoutingAllocation.Result rerouteResult = allocation.reroute(clusterState);
clusterState = newClusterStateBuilder().state(clusterState).routingTable(rerouteResult.routingTable()).build();
// starting primaries
rerouteResult = allocation.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING));
clusterState = newClusterStateBuilder().state(clusterState).routingTable(rerouteResult.routingTable()).build();
// starting replicas
rerouteResult = allocation.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING));
clusterState = newClusterStateBuilder().state(clusterState).routingTable(rerouteResult.routingTable()).build();
logger.info("--> verifying all is allocated");
assertThat(clusterState.routingNodes().node("node1").shards().size(), equalTo(1));
assertThat(clusterState.routingNodes().node("node1").shards().get(0).state(), equalTo(STARTED));
assertThat(clusterState.routingNodes().node("node2").shards().size(), equalTo(1));
assertThat(clusterState.routingNodes().node("node2").shards().get(0).state(), equalTo(STARTED));
logger.info("--> adding additional node");
clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().putAll(clusterState.nodes())
.put(newNode("node3"))
).build();
rerouteResult = allocation.reroute(clusterState);
clusterState = newClusterStateBuilder().state(clusterState).routingTable(rerouteResult.routingTable()).build();
assertThat(clusterState.routingNodes().node("node1").shards().size(), equalTo(1));
assertThat(clusterState.routingNodes().node("node1").shards().get(0).state(), equalTo(STARTED));
assertThat(clusterState.routingNodes().node("node2").shards().size(), equalTo(1));
assertThat(clusterState.routingNodes().node("node2").shards().get(0).state(), equalTo(STARTED));
assertThat(clusterState.routingNodes().node("node3").shards().size(), equalTo(0));
String origPrimaryNodeId = clusterState.routingTable().index("test").shard(0).primaryShard().currentNodeId();
String origReplicaNodeId = clusterState.routingTable().index("test").shard(0).replicaShards().get(0).currentNodeId();
logger.info("--> moving primary shard to node3");
rerouteResult = allocation.reroute(clusterState, new AllocationCommands(
new MoveAllocationCommand(clusterState.routingTable().index("test").shard(0).primaryShard().shardId(), clusterState.routingTable().index("test").shard(0).primaryShard().currentNodeId(), "node3"))
);
assertThat(rerouteResult.changed(), equalTo(true));
clusterState = newClusterStateBuilder().state(clusterState).routingTable(rerouteResult.routingTable()).build();
assertThat(clusterState.routingNodes().node(origPrimaryNodeId).shards().get(0).state(), equalTo(RELOCATING));
assertThat(clusterState.routingNodes().node("node3").shards().get(0).state(), equalTo(INITIALIZING));
logger.info("--> fail primary shard recovering instance on 'origPrimaryNodeId' being relocated");
clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder()
.put(newNode("node3"))
.put(newNode(origReplicaNodeId))
).build();
rerouteResult = allocation.reroute(clusterState);
clusterState = newClusterStateBuilder().state(clusterState).routingTable(rerouteResult.routingTable()).build();
assertThat(clusterState.routingNodes().node(origReplicaNodeId).shards().get(0).state(), equalTo(STARTED));
assertThat(clusterState.routingNodes().node("node3").shards().get(0).state(), equalTo(INITIALIZING));
}
}

View File

@ -26,6 +26,9 @@ import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands;
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.testng.annotations.Test;
@ -37,8 +40,8 @@ import static org.elasticsearch.cluster.node.DiscoveryNodes.newNodesBuilder;
import static org.elasticsearch.cluster.routing.RoutingBuilders.indexRoutingTable;
import static org.elasticsearch.cluster.routing.RoutingBuilders.routingTable;
import static org.elasticsearch.cluster.routing.ShardRoutingState.*;
import static org.elasticsearch.test.unit.cluster.routing.allocation.RoutingAllocationTests.newNode;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.elasticsearch.test.unit.cluster.routing.allocation.RoutingAllocationTests.newNode;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;
@ -50,6 +53,95 @@ public class FailedShardsRoutingTests {
private final ESLogger logger = Loggers.getLogger(FailedShardsRoutingTests.class);
@Test
public void testFailedShardPrimaryRelocatingToAndFrom() {
AllocationService allocation = new AllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put("cluster.routing.allocation.allow_rebalance", "always")
.build());
logger.info("--> building initial routing table");
MetaData metaData = newMetaDataBuilder()
.put(newIndexMetaDataBuilder("test").numberOfShards(1).numberOfReplicas(1))
.build();
RoutingTable routingTable = routingTable()
.add(indexRoutingTable("test").initializeEmpty(metaData.index("test")))
.build();
ClusterState clusterState = newClusterStateBuilder().metaData(metaData).routingTable(routingTable).build();
logger.info("--> adding 2 nodes on same rack and do rerouting");
clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder()
.put(newNode("node1"))
.put(newNode("node2"))
).build();
RoutingAllocation.Result rerouteResult = allocation.reroute(clusterState);
clusterState = newClusterStateBuilder().state(clusterState).routingTable(rerouteResult.routingTable()).build();
// starting primaries
rerouteResult = allocation.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING));
clusterState = newClusterStateBuilder().state(clusterState).routingTable(rerouteResult.routingTable()).build();
// starting replicas
rerouteResult = allocation.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING));
clusterState = newClusterStateBuilder().state(clusterState).routingTable(rerouteResult.routingTable()).build();
logger.info("--> verifying all is allocated");
assertThat(clusterState.routingNodes().node("node1").shards().size(), equalTo(1));
assertThat(clusterState.routingNodes().node("node1").shards().get(0).state(), equalTo(STARTED));
assertThat(clusterState.routingNodes().node("node2").shards().size(), equalTo(1));
assertThat(clusterState.routingNodes().node("node2").shards().get(0).state(), equalTo(STARTED));
logger.info("--> adding additional node");
clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().putAll(clusterState.nodes())
.put(newNode("node3"))
).build();
rerouteResult = allocation.reroute(clusterState);
clusterState = newClusterStateBuilder().state(clusterState).routingTable(rerouteResult.routingTable()).build();
assertThat(clusterState.routingNodes().node("node1").shards().size(), equalTo(1));
assertThat(clusterState.routingNodes().node("node1").shards().get(0).state(), equalTo(STARTED));
assertThat(clusterState.routingNodes().node("node2").shards().size(), equalTo(1));
assertThat(clusterState.routingNodes().node("node2").shards().get(0).state(), equalTo(STARTED));
assertThat(clusterState.routingNodes().node("node3").shards().size(), equalTo(0));
String origPrimaryNodeId = clusterState.routingTable().index("test").shard(0).primaryShard().currentNodeId();
String origReplicaNodeId = clusterState.routingTable().index("test").shard(0).replicaShards().get(0).currentNodeId();
logger.info("--> moving primary shard to node3");
rerouteResult = allocation.reroute(clusterState, new AllocationCommands(
new MoveAllocationCommand(clusterState.routingTable().index("test").shard(0).primaryShard().shardId(), clusterState.routingTable().index("test").shard(0).primaryShard().currentNodeId(), "node3"))
);
assertThat(rerouteResult.changed(), equalTo(true));
clusterState = newClusterStateBuilder().state(clusterState).routingTable(rerouteResult.routingTable()).build();
assertThat(clusterState.routingNodes().node(origPrimaryNodeId).shards().get(0).state(), equalTo(RELOCATING));
assertThat(clusterState.routingNodes().node("node3").shards().get(0).state(), equalTo(INITIALIZING));
logger.info("--> fail primary shard recovering instance on node3 being initialized");
rerouteResult = allocation.applyFailedShard(clusterState, new ImmutableShardRouting(clusterState.routingNodes().node("node3").shards().get(0)));
clusterState = newClusterStateBuilder().state(clusterState).routingTable(rerouteResult.routingTable()).build();
assertThat(clusterState.routingNodes().node(origPrimaryNodeId).shards().get(0).state(), equalTo(STARTED));
assertThat(clusterState.routingNodes().node("node3").shards().size(), equalTo(0));
logger.info("--> moving primary shard to node3");
rerouteResult = allocation.reroute(clusterState, new AllocationCommands(
new MoveAllocationCommand(clusterState.routingTable().index("test").shard(0).primaryShard().shardId(), clusterState.routingTable().index("test").shard(0).primaryShard().currentNodeId(), "node3"))
);
assertThat(rerouteResult.changed(), equalTo(true));
clusterState = newClusterStateBuilder().state(clusterState).routingTable(rerouteResult.routingTable()).build();
assertThat(clusterState.routingNodes().node(origPrimaryNodeId).shards().get(0).state(), equalTo(RELOCATING));
assertThat(clusterState.routingNodes().node("node3").shards().get(0).state(), equalTo(INITIALIZING));
logger.info("--> fail primary shard recovering instance on node1 being relocated");
rerouteResult = allocation.applyFailedShard(clusterState, new ImmutableShardRouting(clusterState.routingNodes().node(origPrimaryNodeId).shards().get(0)));
clusterState = newClusterStateBuilder().state(clusterState).routingTable(rerouteResult.routingTable()).build();
// check promotion of replica to primary
assertThat(clusterState.routingNodes().node(origReplicaNodeId).shards().get(0).state(), equalTo(STARTED));
assertThat(clusterState.routingTable().index("test").shard(0).primaryShard().currentNodeId(), equalTo(origReplicaNodeId));
assertThat(clusterState.routingTable().index("test").shard(0).replicaShards().get(0).currentNodeId(), anyOf(equalTo(origPrimaryNodeId), equalTo("node3")));
}
@Test
public void failPrimaryStartedCheckReplicaElected() {
AllocationService strategy = new AllocationService(settingsBuilder()