diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java b/core/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java
index f453f3c35ca..1b410ce781e 100644
--- a/core/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java
+++ b/core/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java
@@ -23,12 +23,14 @@ import com.carrotsearch.hppc.ObjectIntHashMap;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.apache.lucene.util.CollectionUtil;
import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.UnassignedInfo.AllocationStatus;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.collect.Tuple;
+import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
@@ -48,8 +50,17 @@ import java.util.Set;
import java.util.function.Predicate;
/**
- * {@link RoutingNodes} represents a copy the routing information contained in
- * the {@link ClusterState cluster state}.
+ * {@link RoutingNodes} represents a copy the routing information contained in the {@link ClusterState cluster state}.
+ * It can be either initialized as mutable or immutable (see {@link #RoutingNodes(ClusterState, boolean)}), allowing
+ * or disallowing changes to its elements.
+ *
+ * The main methods used to update routing entries are:
+ *
+ * - {@link #initializeShard} initializes an unassigned shard.
+ *
- {@link #startShard} starts an initializing shard / completes relocation of a shard.
+ *
- {@link #relocateShard} starts relocation of a started shard.
+ *
- {@link #failShard} fails/cancels an assigned shard.
+ *
*/
public class RoutingNodes implements Iterable {
@@ -212,6 +223,7 @@ public class RoutingNodes implements Iterable {
}
public Iterator mutableIterator() {
+ ensureMutable();
return nodesToShards.values().iterator();
}
@@ -396,10 +408,11 @@ public class RoutingNodes implements Iterable {
* @param existingAllocationId allocation id to use. If null, a fresh allocation id is generated.
* @return the initialized shard
*/
- public ShardRouting initialize(ShardRouting shard, String nodeId, @Nullable String existingAllocationId, long expectedSize) {
+ public ShardRouting initializeShard(ShardRouting unassignedShard, String nodeId, @Nullable String existingAllocationId,
+ long expectedSize) {
ensureMutable();
- assert shard.unassigned() : "expected an unassigned shard " + shard;
- ShardRouting initializedShard = shard.initialize(nodeId, existingAllocationId, expectedSize);
+ assert unassignedShard.unassigned() : "expected an unassigned shard " + unassignedShard;
+ ShardRouting initializedShard = unassignedShard.initialize(nodeId, existingAllocationId, expectedSize);
node(nodeId).add(initializedShard);
inactiveShardCount++;
if (initializedShard.primary()) {
@@ -416,25 +429,164 @@ public class RoutingNodes implements Iterable {
*
* @return pair of source relocating and target initializing shards.
*/
- public Tuple relocate(ShardRouting shard, String nodeId, long expectedShardSize) {
+ public Tuple relocateShard(ShardRouting startedShard, String nodeId, long expectedShardSize) {
ensureMutable();
relocatingShards++;
- ShardRouting source = shard.relocate(nodeId, expectedShardSize);
+ ShardRouting source = startedShard.relocate(nodeId, expectedShardSize);
ShardRouting target = source.getTargetRelocatingShard();
- updateAssigned(shard, source);
+ updateAssigned(startedShard, source);
node(target.currentNodeId()).add(target);
assignedShardsAdd(target);
addRecovery(target);
return Tuple.tuple(source, target);
}
+ /**
+ * Applies the relevant logic to start an initializing shard.
+ *
+ * Moves the initializing shard to started. If the shard is a relocation target, also removes the relocation source.
+ *
+ * @return the started shard
+ */
+ public ShardRouting startShard(ESLogger logger, ShardRouting initializingShard) {
+ ensureMutable();
+ ShardRouting startedShard = started(initializingShard);
+ logger.trace("{} marked shard as started (routing: {})", initializingShard.shardId(), initializingShard);
+
+ if (initializingShard.relocatingNodeId() != null) {
+ // relocation target has been started, remove relocation source
+ RoutingNode relocationSourceNode = node(initializingShard.relocatingNodeId());
+ ShardRouting relocationSourceShard = relocationSourceNode.getByShardId(initializingShard.shardId());
+ assert relocationSourceShard.isRelocationSourceOf(initializingShard);
+ assert relocationSourceShard.getTargetRelocatingShard() == initializingShard : "relocation target mismatch, expected: "
+ + initializingShard + " but was: " + relocationSourceShard.getTargetRelocatingShard();
+ remove(relocationSourceShard);
+ }
+ return startedShard;
+ }
+
+ /**
+ * Applies the relevant logic to handle a cancelled or failed shard.
+ *
+ * Moves the shard to unassigned or completely removes the shard (if relocation target).
+ *
+ * - If shard is a primary, this also fails initializing replicas.
+ * - If shard is an active primary, this also promotes an active replica to primary (if such a replica exists).
+ * - If shard is a relocating primary, this also removes the primary relocation target shard.
+ * - If shard is a relocating replica, this promotes the replica relocation target to a full initializing replica, removing the
+ * relocation source information. This is possible as peer recovery is always done from the primary.
+ * - If shard is a (primary or replica) relocation target, this also clears the relocation information on the source shard.
+ *
+ */
+ public void failShard(ESLogger logger, ShardRouting failedShard, UnassignedInfo unassignedInfo, IndexMetaData indexMetaData) {
+ ensureMutable();
+ assert failedShard.assignedToNode() : "only assigned shards can be failed";
+ assert indexMetaData.getIndex().equals(failedShard.index()) :
+ "shard failed for unknown index (shard entry: " + failedShard + ")";
+ assert getByAllocationId(failedShard.shardId(), failedShard.allocationId().getId()) == failedShard :
+ "shard routing to fail does not exist in routing table, expected: " + failedShard + " but was: " +
+ getByAllocationId(failedShard.shardId(), failedShard.allocationId().getId());
+
+ logger.debug("{} failing shard {} with unassigned info ({})", failedShard.shardId(), failedShard, unassignedInfo.shortSummary());
+
+ // if this is a primary, fail initializing replicas first (otherwise we move RoutingNodes into an inconsistent state)
+ if (failedShard.primary()) {
+ List assignedShards = assignedShards(failedShard.shardId());
+ if (assignedShards.isEmpty() == false) {
+ // copy list to prevent ConcurrentModificationException
+ for (ShardRouting routing : new ArrayList<>(assignedShards)) {
+ if (!routing.primary() && routing.initializing()) {
+ // re-resolve replica as earlier iteration could have changed source/target of replica relocation
+ ShardRouting replicaShard = getByAllocationId(routing.shardId(), routing.allocationId().getId());
+ assert replicaShard != null : "failed to re-resolve " + routing + " when failing replicas";
+ UnassignedInfo primaryFailedUnassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.PRIMARY_FAILED,
+ "primary failed while replica initializing", null, 0, unassignedInfo.getUnassignedTimeInNanos(),
+ unassignedInfo.getUnassignedTimeInMillis(), false, AllocationStatus.NO_ATTEMPT);
+ failShard(logger, replicaShard, primaryFailedUnassignedInfo, indexMetaData);
+ }
+ }
+ }
+ }
+
+ if (failedShard.relocating()) {
+ // find the shard that is initializing on the target node
+ ShardRouting targetShard = getByAllocationId(failedShard.shardId(), failedShard.allocationId().getRelocationId());
+ assert targetShard.isRelocationTargetOf(failedShard);
+ if (failedShard.primary()) {
+ logger.trace("{} is removed due to the failure/cancellation of the source shard", targetShard);
+ // cancel and remove target shard
+ remove(targetShard);
+ } else {
+ logger.trace("{}, relocation source failed / cancelled, mark as initializing without relocation source", targetShard);
+ // promote to initializing shard without relocation source and ensure that removed relocation source
+ // is not added back as unassigned shard
+ removeRelocationSource(targetShard);
+ }
+ }
+
+ // fail actual shard
+ if (failedShard.initializing()) {
+ if (failedShard.relocatingNodeId() == null) {
+ // initializing shard that is not relocation target, just move to unassigned
+ moveToUnassigned(failedShard, unassignedInfo);
+ } else {
+ // The shard is a target of a relocating shard. In that case we only need to remove the target shard and cancel the source
+ // relocation. No shard is left unassigned
+ logger.trace("{} is a relocation target, resolving source to cancel relocation ({})", failedShard,
+ unassignedInfo.shortSummary());
+ ShardRouting sourceShard = getByAllocationId(failedShard.shardId(),
+ failedShard.allocationId().getRelocationId());
+ assert sourceShard.isRelocationSourceOf(failedShard);
+ logger.trace("{}, resolved source to [{}]. canceling relocation ... ({})", failedShard.shardId(), sourceShard,
+ unassignedInfo.shortSummary());
+ cancelRelocation(sourceShard);
+ remove(failedShard);
+ }
+ } else {
+ assert failedShard.active();
+ if (failedShard.primary()) {
+ // promote active replica to primary if active replica exists
+ ShardRouting candidate = activeReplica(failedShard.shardId());
+ if (candidate == null) {
+ moveToUnassigned(failedShard, unassignedInfo);
+ } else {
+ movePrimaryToUnassignedAndDemoteToReplica(failedShard, unassignedInfo);
+ ShardRouting primarySwappedCandidate = promoteAssignedReplicaShardToPrimary(candidate);
+ if (primarySwappedCandidate.relocatingNodeId() != null) {
+ // its also relocating, make sure to move the other routing to primary
+ RoutingNode node = node(primarySwappedCandidate.relocatingNodeId());
+ if (node != null) {
+ for (ShardRouting shardRouting : node) {
+ if (shardRouting.shardId().equals(primarySwappedCandidate.shardId()) && !shardRouting.primary()) {
+ promoteAssignedReplicaShardToPrimary(shardRouting);
+ break;
+ }
+ }
+ }
+ }
+ if (IndexMetaData.isIndexUsingShadowReplicas(indexMetaData.getSettings())) {
+ reinitShadowPrimary(primarySwappedCandidate);
+ }
+ }
+ } else {
+ assert failedShard.primary() == false;
+ if (failedShard.relocating()) {
+ remove(failedShard);
+ } else {
+ moveToUnassigned(failedShard, unassignedInfo);
+ }
+ }
+ }
+ assert node(failedShard.currentNodeId()).getByShardId(failedShard.shardId()) == null : "failedShard " + failedShard +
+ " was matched but wasn't removed";
+ }
+
/**
* Mark a shard as started and adjusts internal statistics.
*
* @return the started shard
*/
- public ShardRouting started(ShardRouting shard) {
- ensureMutable();
+ private ShardRouting started(ShardRouting shard) {
assert shard.initializing() : "expected an initializing shard " + shard;
if (shard.relocatingNodeId() == null) {
// if this is not a target shard for relocation, we need to update statistics
@@ -456,8 +608,7 @@ public class RoutingNodes implements Iterable {
*
* @return the shard after cancelling relocation
*/
- public ShardRouting cancelRelocation(ShardRouting shard) {
- ensureMutable();
+ private ShardRouting cancelRelocation(ShardRouting shard) {
relocatingShards--;
ShardRouting cancelledShard = shard.cancelRelocation();
updateAssigned(shard, cancelledShard);
@@ -470,8 +621,7 @@ public class RoutingNodes implements Iterable {
* @param replicaShard the replica shard to be promoted to primary
* @return the resulting primary shard
*/
- public ShardRouting promoteAssignedReplicaShardToPrimary(ShardRouting replicaShard) {
- ensureMutable();
+ private ShardRouting promoteAssignedReplicaShardToPrimary(ShardRouting replicaShard) {
assert replicaShard.unassigned() == false : "unassigned shard cannot be promoted to primary: " + replicaShard;
assert replicaShard.primary() == false : "primary shard cannot be promoted to primary: " + replicaShard;
ShardRouting primaryShard = replicaShard.moveToPrimary();
@@ -485,8 +635,7 @@ public class RoutingNodes implements Iterable {
* Cancels the give shard from the Routing nodes internal statistics and cancels
* the relocation if the shard is relocating.
*/
- public void remove(ShardRouting shard) {
- ensureMutable();
+ private void remove(ShardRouting shard) {
assert shard.unassigned() == false : "only assigned shards can be removed here (" + shard + ")";
node(shard.currentNodeId()).remove(shard);
if (shard.initializing() && shard.relocatingNodeId() == null) {
@@ -508,9 +657,8 @@ public class RoutingNodes implements Iterable {
* Removes relocation source of an initializing non-primary shard. This allows the replica shard to continue recovery from
* the primary even though its non-primary relocation source has failed.
*/
- public ShardRouting removeRelocationSource(ShardRouting shard) {
+ private ShardRouting removeRelocationSource(ShardRouting shard) {
assert shard.isRelocationTarget() : "only relocation target shards can have their relocation source removed (" + shard + ")";
- ensureMutable();
ShardRouting relocationMarkerRemoved = shard.removeRelocationSource();
updateAssigned(shard, relocationMarkerRemoved);
inactiveShardCount++; // relocation targets are not counted as inactive shards whereas initializing shards are
@@ -532,7 +680,6 @@ public class RoutingNodes implements Iterable {
}
private void assignedShardsRemove(ShardRouting shard) {
- ensureMutable();
final List replicaSet = assignedShards.get(shard.shardId());
if (replicaSet != null) {
final Iterator iterator = replicaSet.iterator();
@@ -547,8 +694,7 @@ public class RoutingNodes implements Iterable {
assert false : "No shard found to remove";
}
- public ShardRouting reinitShadowPrimary(ShardRouting candidate) {
- ensureMutable();
+ private ShardRouting reinitShadowPrimary(ShardRouting candidate) {
if (candidate.relocating()) {
cancelRelocation(candidate);
}
@@ -573,8 +719,7 @@ public class RoutingNodes implements Iterable {
shardsWithMatchingShardId.set(previousShardIndex, newShard);
}
- public ShardRouting moveToUnassigned(ShardRouting shard, UnassignedInfo unassignedInfo) {
- ensureMutable();
+ private ShardRouting moveToUnassigned(ShardRouting shard, UnassignedInfo unassignedInfo) {
assert shard.unassigned() == false : "only assigned shards can be moved to unassigned (" + shard + ")";
remove(shard);
ShardRouting unassigned = shard.moveToUnassigned(unassignedInfo);
@@ -582,6 +727,19 @@ public class RoutingNodes implements Iterable {
return unassigned;
}
+ /**
+ * Moves assigned primary to unassigned and demotes it to a replica.
+ * Used in conjunction with {@link #promoteAssignedReplicaShardToPrimary} when an active replica is promoted to primary.
+ */
+ private ShardRouting movePrimaryToUnassignedAndDemoteToReplica(ShardRouting shard, UnassignedInfo unassignedInfo) {
+ assert shard.unassigned() == false : "only assigned shards can be moved to unassigned (" + shard + ")";
+ assert shard.primary() : "only primary can be demoted to replica (" + shard + ")";
+ remove(shard);
+ ShardRouting unassigned = shard.moveToUnassigned(unassignedInfo).moveFromPrimary();
+ unassignedShards.add(unassigned);
+ return unassigned;
+ }
+
/**
* Returns the number of routing nodes
*/
@@ -612,6 +770,7 @@ public class RoutingNodes implements Iterable {
}
public void sort(Comparator comparator) {
+ nodes.ensureMutable();
CollectionUtil.timSort(unassigned, comparator);
}
@@ -661,6 +820,7 @@ public class RoutingNodes implements Iterable {
* @return true iff the decision caused a change to the unassigned info
*/
public boolean ignoreShard(ShardRouting shard, AllocationStatus allocationStatus) {
+ nodes.ensureMutable();
boolean changed = false;
if (shard.primary()) {
ignoredPrimaries++;
@@ -704,8 +864,9 @@ public class RoutingNodes implements Iterable {
* @param existingAllocationId allocation id to use. If null, a fresh allocation id is generated.
*/
public ShardRouting initialize(String nodeId, @Nullable String existingAllocationId, long expectedShardSize) {
+ nodes.ensureMutable();
innerRemove();
- return nodes.initialize(current, nodeId, existingAllocationId, expectedShardSize);
+ return nodes.initializeShard(current, nodeId, existingAllocationId, expectedShardSize);
}
/**
@@ -718,6 +879,7 @@ public class RoutingNodes implements Iterable {
* @return true iff the decision caused an update to the unassigned info
*/
public boolean removeAndIgnore(AllocationStatus attempt) {
+ nodes.ensureMutable();
innerRemove();
return ignoreShard(current, attempt);
}
@@ -734,23 +896,12 @@ public class RoutingNodes implements Iterable {
* @return the shard with unassigned info updated
*/
public ShardRouting updateUnassignedInfo(UnassignedInfo unassignedInfo) {
+ nodes.ensureMutable();
ShardRouting updatedShardRouting = current.updateUnassignedInfo(unassignedInfo);
updateShardRouting(updatedShardRouting);
return updatedShardRouting;
}
- /**
- * marks the current primary shard as replica
- *
- * @return the shard with primary status swapped
- */
- public ShardRouting demotePrimaryToReplicaShard() {
- assert current.primary() : "non-primary shard " + current + " cannot be demoted";
- updateShardRouting(current.moveFromPrimary());
- primaries--;
- return current;
- }
-
/**
* Unsupported operation, just there for the interface. Use {@link #removeAndIgnore(AllocationStatus)} or
* {@link #initialize(String, String, long)}.
@@ -761,7 +912,6 @@ public class RoutingNodes implements Iterable {
}
private void innerRemove() {
- nodes.ensureMutable();
iterator.remove();
if (current.primary()) {
primaries--;
@@ -786,6 +936,7 @@ public class RoutingNodes implements Iterable {
}
public void shuffle() {
+ nodes.ensureMutable();
Randomness.shuffle(unassigned);
}
@@ -794,6 +945,7 @@ public class RoutingNodes implements Iterable {
* This method will not drain ignored shards.
*/
public ShardRouting[] drain() {
+ nodes.ensureMutable();
ShardRouting[] mutableShardRoutings = unassigned.toArray(new ShardRouting[unassigned.size()]);
unassigned.clear();
primaries = 0;
diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java
index 3c01694d374..8401b5f48e1 100644
--- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java
+++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java
@@ -41,14 +41,11 @@ import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
-import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.gateway.GatewayAllocator;
import org.elasticsearch.index.shard.ShardId;
-import java.util.ArrayList;
import java.util.Collections;
-import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
@@ -237,6 +234,7 @@ public class AllocationService extends AbstractComponent {
for (FailedRerouteAllocation.FailedShard failedShardEntry : failedShards) {
ShardRouting shardToFail = failedShardEntry.routingEntry;
+ IndexMetaData indexMetaData = allocation.metaData().getIndexSafe(shardToFail.shardId().getIndex());
allocation.addIgnoreShardForNode(shardToFail.shardId(), shardToFail.currentNodeId());
// failing a primary also fails initializing replica shards, re-resolve ShardRouting
ShardRouting failedShard = routingNodes.getByAllocationId(shardToFail.shardId(), shardToFail.allocationId().getId());
@@ -249,7 +247,7 @@ public class AllocationService extends AbstractComponent {
UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, failedShardEntry.message,
failedShardEntry.failure, failedAllocations + 1, currentNanoTime, System.currentTimeMillis(), false,
AllocationStatus.NO_ATTEMPT);
- applyFailedShard(allocation, failedShard, unassignedInfo);
+ routingNodes.failShard(logger, failedShard, unassignedInfo, indexMetaData);
} else {
logger.trace("{} shard routing failed in an earlier iteration (routing: {})", shardToFail.shardId(), shardToFail);
}
@@ -385,8 +383,7 @@ public class AllocationService extends AbstractComponent {
private boolean reroute(RoutingAllocation allocation) {
assert deassociateDeadNodes(allocation) == false : "dead nodes should be explicitly cleaned up. See deassociateDeadNodes";
- boolean changed = electPrimariesAndUnassignedDanglingReplicas(allocation);
-
+ boolean changed = false;
// now allocate all the unassigned to available nodes
if (allocation.routingNodes().unassigned().size() > 0) {
changed |= removeDelayMarkers(allocation);
@@ -398,49 +395,6 @@ public class AllocationService extends AbstractComponent {
return changed;
}
- private boolean electPrimariesAndUnassignedDanglingReplicas(RoutingAllocation allocation) {
- boolean changed = false;
- final RoutingNodes routingNodes = allocation.routingNodes();
- if (routingNodes.unassigned().getNumPrimaries() == 0) {
- // move out if we don't have unassigned primaries
- return changed;
- }
- // now, go over and elect a new primary if possible, not, from this code block on, if one is elected,
- // routingNodes.hasUnassignedPrimaries() will potentially be false
- final RoutingNodes.UnassignedShards.UnassignedIterator unassignedIterator = routingNodes.unassigned().iterator();
- while (unassignedIterator.hasNext()) {
- ShardRouting shardEntry = unassignedIterator.next();
- if (shardEntry.primary()) {
- // remove dangling replicas that are initializing for primary shards
- changed |= failReplicasForUnassignedPrimary(allocation, shardEntry);
- ShardRouting candidate = allocation.routingNodes().activeReplica(shardEntry.shardId());
- if (candidate != null) {
- shardEntry = unassignedIterator.demotePrimaryToReplicaShard();
- ShardRouting primarySwappedCandidate = routingNodes.promoteAssignedReplicaShardToPrimary(candidate);
- changed = true;
- if (primarySwappedCandidate.relocatingNodeId() != null) {
- // its also relocating, make sure to move the other routing to primary
- RoutingNode node = routingNodes.node(primarySwappedCandidate.relocatingNodeId());
- if (node != null) {
- for (ShardRouting shardRouting : node) {
- if (shardRouting.shardId().equals(primarySwappedCandidate.shardId()) && !shardRouting.primary()) {
- routingNodes.promoteAssignedReplicaShardToPrimary(shardRouting);
- break;
- }
- }
- }
- }
- IndexMetaData index = allocation.metaData().getIndexSafe(primarySwappedCandidate.index());
- if (IndexMetaData.isIndexUsingShadowReplicas(index.getSettings())) {
- routingNodes.reinitShadowPrimary(primarySwappedCandidate);
- }
- }
- }
- }
-
- return changed;
- }
-
private boolean deassociateDeadNodes(RoutingAllocation allocation) {
boolean changed = false;
for (Iterator it = allocation.routingNodes().mutableIterator(); it.hasNext(); ) {
@@ -456,7 +410,7 @@ public class AllocationService extends AbstractComponent {
boolean delayed = INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.get(indexMetaData.getSettings()).nanos() > 0;
UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.NODE_LEFT, "node_left[" + node.nodeId() + "]",
null, 0, allocation.getCurrentNanoTime(), System.currentTimeMillis(), delayed, AllocationStatus.NO_ATTEMPT);
- applyFailedShard(allocation, shardRouting, unassignedInfo);
+ allocation.routingNodes().failShard(logger, shardRouting, unassignedInfo, indexMetaData);
}
// its a dead node, remove it, note, its important to remove it *after* we apply failed shard
// since it relies on the fact that the RoutingNode exists in the list of nodes
@@ -465,23 +419,6 @@ public class AllocationService extends AbstractComponent {
return changed;
}
- private boolean failReplicasForUnassignedPrimary(RoutingAllocation allocation, ShardRouting failedPrimary) {
- assert failedPrimary.primary() : "can only fail replicas for primary shard: " + failedPrimary;
- List replicas = new ArrayList<>();
- for (ShardRouting routing : allocation.routingNodes().assignedShards(failedPrimary.shardId())) {
- if (!routing.primary() && routing.initializing()) {
- replicas.add(routing);
- }
- }
- for (ShardRouting failedReplica : replicas) {
- UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.PRIMARY_FAILED,
- "primary failed while replica initializing", null, 0, allocation.getCurrentNanoTime(), System.currentTimeMillis(), false,
- AllocationStatus.NO_ATTEMPT);
- applyFailedShard(allocation, failedReplica, unassignedInfo);
- }
- return replicas.isEmpty() == false;
- }
-
private void applyStartedShards(RoutingAllocation routingAllocation, List startedShardEntries) {
assert startedShardEntries.isEmpty() == false : "non-empty list of started shard entries expected";
RoutingNodes routingNodes = routingAllocation.routingNodes();
@@ -493,81 +430,7 @@ public class AllocationService extends AbstractComponent {
"shard routing to start does not exist in routing table, expected: " + startedShard + " but was: " +
routingNodes.getByAllocationId(startedShard.shardId(), startedShard.allocationId().getId());
- routingNodes.started(startedShard);
- logger.trace("{} marked shard as started (routing: {})", startedShard.shardId(), startedShard);
-
- if (startedShard.relocatingNodeId() != null) {
- // relocation target has been started, remove relocation source
- RoutingNode relocationSourceNode = routingNodes.node(startedShard.relocatingNodeId());
- ShardRouting relocationSourceShard = relocationSourceNode.getByShardId(startedShard.shardId());
- assert relocationSourceShard.isRelocationSourceOf(startedShard);
- assert relocationSourceShard.getTargetRelocatingShard() == startedShard : "relocation target mismatch, expected: "
- + startedShard + " but was: " + relocationSourceShard.getTargetRelocatingShard();
- routingNodes.remove(relocationSourceShard);
- }
- }
- }
-
- /**
- * Applies the relevant logic to handle a failed shard.
- */
- private void applyFailedShard(RoutingAllocation allocation, ShardRouting failedShard, UnassignedInfo unassignedInfo) {
- RoutingNodes routingNodes = allocation.routingNodes();
- assert failedShard.assignedToNode() : "only assigned shards can be failed";
- assert allocation.metaData().index(failedShard.shardId().getIndex()) != null :
- "shard failed for unknown index (shard entry: " + failedShard + ")";
- assert routingNodes.getByAllocationId(failedShard.shardId(), failedShard.allocationId().getId()) == failedShard :
- "shard routing to fail does not exist in routing table, expected: " + failedShard + " but was: " +
- routingNodes.getByAllocationId(failedShard.shardId(), failedShard.allocationId().getId());
-
- logger.debug("{} failing shard {} with unassigned info ({})", failedShard.shardId(), failedShard, unassignedInfo.shortSummary());
- if (failedShard.primary()) {
- // fail replicas first otherwise we move RoutingNodes into an inconsistent state
- failReplicasForUnassignedPrimary(allocation, failedShard);
- }
-
- cancelShard(logger, failedShard, unassignedInfo, routingNodes);
- assert routingNodes.node(failedShard.currentNodeId()).getByShardId(failedShard.shardId()) == null : "failedShard " + failedShard +
- " was matched but wasn't removed";
- }
-
- public static void cancelShard(ESLogger logger, ShardRouting cancelledShard, UnassignedInfo unassignedInfo, RoutingNodes routingNodes) {
- if (cancelledShard.relocatingNodeId() == null) {
- routingNodes.moveToUnassigned(cancelledShard, unassignedInfo);
- } else {
- if (cancelledShard.initializing()) {
- // The shard is a target of a relocating shard. In that case we only
- // need to remove the target shard and cancel the source relocation.
- // No shard is left unassigned
- logger.trace("{} is a relocation target, resolving source to cancel relocation ({})", cancelledShard,
- unassignedInfo.shortSummary());
- RoutingNode sourceNode = routingNodes.node(cancelledShard.relocatingNodeId());
- ShardRouting sourceShard = sourceNode.getByShardId(cancelledShard.shardId());
- assert sourceShard.isRelocationSourceOf(cancelledShard);
- logger.trace("{}, resolved source to [{}]. canceling relocation ... ({})", cancelledShard.shardId(), sourceShard,
- unassignedInfo.shortSummary());
- routingNodes.cancelRelocation(sourceShard);
- routingNodes.remove(cancelledShard);
- } else {
- assert cancelledShard.relocating();
- // The cancelled shard is the main copy of the current shard routing.
- // now, find the shard that is initializing on the target node
- RoutingNode targetNode = routingNodes.node(cancelledShard.relocatingNodeId());
- ShardRouting targetShard = targetNode.getByShardId(cancelledShard.shardId());
- assert targetShard.isRelocationTargetOf(cancelledShard);
- if (cancelledShard.primary()) {
- logger.trace("{} is removed due to the failure/cancellation of the source shard", targetShard);
- // cancel and remove target shard
- routingNodes.remove(targetShard);
- routingNodes.moveToUnassigned(cancelledShard, unassignedInfo);
- } else {
- logger.trace("{}, relocation source failed / cancelled, mark as initializing without relocation source", targetShard);
- // promote to initializing shard without relocation source and ensure that removed relocation source
- // is not added back as unassigned shard
- routingNodes.removeRelocationSource(targetShard);
- routingNodes.remove(cancelledShard);
- }
- }
+ routingNodes.startShard(logger, startedShard);
}
}
diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java
index d0e2ddf8abe..ed856f44e68 100644
--- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java
+++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java
@@ -553,7 +553,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
Decision allocationDecision = allocation.deciders().canAllocate(shardRouting, target, allocation);
if (allocationDecision.type() == Type.YES) { // TODO maybe we can respect throttling here too?
sourceNode.removeShard(shardRouting);
- Tuple relocatingShards = routingNodes.relocate(shardRouting, target.nodeId(), allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE));
+ Tuple relocatingShards = routingNodes.relocateShard(shardRouting, target.nodeId(), allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE));
currentNode.addShard(relocatingShards.v2());
if (logger.isTraceEnabled()) {
logger.trace("Moved shard [{}] to node [{}]", shardRouting, routingNode.node());
@@ -728,7 +728,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
logger.trace("Assigned shard [{}] to [{}]", shard, minNode.getNodeId());
}
- shard = routingNodes.initialize(shard, minNode.getNodeId(), null, shardSize);
+ shard = routingNodes.initializeShard(shard, minNode.getNodeId(), null, shardSize);
minNode.addShard(shard);
changed = true;
continue; // don't add to ignoreUnassigned
@@ -820,7 +820,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
minNode.getNodeId());
}
/* now allocate on the cluster */
- minNode.addShard(routingNodes.relocate(candidate, minNode.getNodeId(), shardSize).v1());
+ minNode.addShard(routingNodes.relocateShard(candidate, minNode.getNodeId(), shardSize).v1());
return true;
} else {
assert decision.type() == Type.THROTTLE;
diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/CancelAllocationCommand.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/CancelAllocationCommand.java
index 230e7929cc8..349df2d7af7 100644
--- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/CancelAllocationCommand.java
+++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/CancelAllocationCommand.java
@@ -127,8 +127,9 @@ public class CancelAllocationCommand implements AllocationCommand {
ShardRouting shardRouting = null;
RoutingNodes routingNodes = allocation.routingNodes();
RoutingNode routingNode = routingNodes.node(discoNode.getId());
+ IndexMetaData indexMetaData = null;
if (routingNode != null) {
- IndexMetaData indexMetaData = allocation.metaData().index(index());
+ indexMetaData = allocation.metaData().index(index());
if (indexMetaData == null) {
throw new IndexNotFoundException(index());
}
@@ -154,8 +155,8 @@ public class CancelAllocationCommand implements AllocationCommand {
discoNode + ", shard is primary and " + shardRouting.state().name().toLowerCase(Locale.ROOT));
}
}
- AllocationService.cancelShard(Loggers.getLogger(CancelAllocationCommand.class), shardRouting,
- new UnassignedInfo(UnassignedInfo.Reason.REROUTE_CANCELLED, null), routingNodes);
+ routingNodes.failShard(Loggers.getLogger(CancelAllocationCommand.class), shardRouting,
+ new UnassignedInfo(UnassignedInfo.Reason.REROUTE_CANCELLED, null), indexMetaData);
return new RerouteExplanation(this, allocation.decision(Decision.YES, "cancel_allocation_command",
"shard " + shardId + " on node " + discoNode + " can be cancelled"));
}
diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/MoveAllocationCommand.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/MoveAllocationCommand.java
index 69bd8f0eeca..dbd345a81a0 100644
--- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/MoveAllocationCommand.java
+++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/MoveAllocationCommand.java
@@ -132,7 +132,7 @@ public class MoveAllocationCommand implements AllocationCommand {
if (decision.type() == Decision.Type.THROTTLE) {
// its being throttled, maybe have a flag to take it into account and fail? for now, just do it since the "user" wants it...
}
- allocation.routingNodes().relocate(shardRouting, toRoutingNode.nodeId(), allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE));
+ allocation.routingNodes().relocateShard(shardRouting, toRoutingNode.nodeId(), allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE));
}
if (!found) {
diff --git a/core/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java b/core/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java
index 2776cc17684..1cb555347cc 100644
--- a/core/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java
+++ b/core/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java
@@ -31,6 +31,7 @@ import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.routing.UnassignedInfo.AllocationStatus;
+import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.common.Nullable;
@@ -63,7 +64,7 @@ public abstract class ReplicaShardAllocator extends AbstractComponent {
boolean changed = false;
MetaData metaData = allocation.metaData();
RoutingNodes routingNodes = allocation.routingNodes();
- List> recoveriesToCancel = new ArrayList<>();
+ List shardCancellationActions = new ArrayList<>();
for (RoutingNode routingNode : routingNodes) {
for (ShardRouting shard : routingNode) {
if (shard.primary() == true) {
@@ -120,14 +121,14 @@ public abstract class ReplicaShardAllocator extends AbstractComponent {
"existing allocation of replica to [" + currentNode + "] cancelled, sync id match found on node ["+ nodeWithHighestMatch + "]",
null, 0, allocation.getCurrentNanoTime(), System.currentTimeMillis(), false, UnassignedInfo.AllocationStatus.NO_ATTEMPT);
// don't cancel shard in the loop as it will cause a ConcurrentModificationException
- recoveriesToCancel.add(new Tuple<>(shard, unassignedInfo));
+ shardCancellationActions.add(() -> routingNodes.failShard(logger, shard, unassignedInfo, indexMetaData));
changed = true;
}
}
}
}
- for (Tuple cancellation : recoveriesToCancel) {
- routingNodes.moveToUnassigned(cancellation.v1(), cancellation.v2());
+ for (Runnable action : shardCancellationActions) {
+ action.run();
}
return changed;
}
diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/BalanceConfigurationTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/BalanceConfigurationTests.java
index 2e2eabf5063..46376ac3afa 100644
--- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/BalanceConfigurationTests.java
+++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/BalanceConfigurationTests.java
@@ -335,37 +335,37 @@ public class BalanceConfigurationTests extends ESAllocationTestCase {
switch (sr.id()) {
case 0:
if (sr.primary()) {
- allocation.routingNodes().initialize(sr, "node1", null, -1);
+ allocation.routingNodes().initializeShard(sr, "node1", null, -1);
} else {
- allocation.routingNodes().initialize(sr, "node0", null, -1);
+ allocation.routingNodes().initializeShard(sr, "node0", null, -1);
}
break;
case 1:
if (sr.primary()) {
- allocation.routingNodes().initialize(sr, "node1", null, -1);
+ allocation.routingNodes().initializeShard(sr, "node1", null, -1);
} else {
- allocation.routingNodes().initialize(sr, "node2", null, -1);
+ allocation.routingNodes().initializeShard(sr, "node2", null, -1);
}
break;
case 2:
if (sr.primary()) {
- allocation.routingNodes().initialize(sr, "node3", null, -1);
+ allocation.routingNodes().initializeShard(sr, "node3", null, -1);
} else {
- allocation.routingNodes().initialize(sr, "node2", null, -1);
+ allocation.routingNodes().initializeShard(sr, "node2", null, -1);
}
break;
case 3:
if (sr.primary()) {
- allocation.routingNodes().initialize(sr, "node3", null, -1);
+ allocation.routingNodes().initializeShard(sr, "node3", null, -1);
} else {
- allocation.routingNodes().initialize(sr, "node1", null, -1);
+ allocation.routingNodes().initializeShard(sr, "node1", null, -1);
}
break;
case 4:
if (sr.primary()) {
- allocation.routingNodes().initialize(sr, "node2", null, -1);
+ allocation.routingNodes().initializeShard(sr, "node2", null, -1);
} else {
- allocation.routingNodes().initialize(sr, "node0", null, -1);
+ allocation.routingNodes().initializeShard(sr, "node0", null, -1);
}
break;
}
diff --git a/core/src/test/java/org/elasticsearch/gateway/PriorityComparatorTests.java b/core/src/test/java/org/elasticsearch/gateway/PriorityComparatorTests.java
index bb5a6ff748e..6c441a7c878 100644
--- a/core/src/test/java/org/elasticsearch/gateway/PriorityComparatorTests.java
+++ b/core/src/test/java/org/elasticsearch/gateway/PriorityComparatorTests.java
@@ -35,10 +35,12 @@ import java.util.List;
import java.util.Locale;
import java.util.Map;
+import static org.mockito.Mockito.mock;
+
public class PriorityComparatorTests extends ESTestCase {
public void testPreferNewIndices() {
- RoutingNodes.UnassignedShards shards = new RoutingNodes.UnassignedShards(null);
+ RoutingNodes.UnassignedShards shards = new RoutingNodes.UnassignedShards(mock(RoutingNodes.class));
List shardRoutings = Arrays.asList(TestShardRouting.newShardRouting("oldest", 0, null, null, null,
randomBoolean(), ShardRoutingState.UNASSIGNED, new UnassignedInfo(randomFrom(UnassignedInfo.Reason.values()), "foobar")), TestShardRouting.newShardRouting("newest", 0, null, null, null,
randomBoolean(), ShardRoutingState.UNASSIGNED, new UnassignedInfo(randomFrom(UnassignedInfo.Reason.values()), "foobar")));
@@ -68,7 +70,7 @@ public class PriorityComparatorTests extends ESTestCase {
}
public void testPreferPriorityIndices() {
- RoutingNodes.UnassignedShards shards = new RoutingNodes.UnassignedShards((RoutingNodes) null);
+ RoutingNodes.UnassignedShards shards = new RoutingNodes.UnassignedShards(mock(RoutingNodes.class));
List shardRoutings = Arrays.asList(TestShardRouting.newShardRouting("oldest", 0, null, null, null,
randomBoolean(), ShardRoutingState.UNASSIGNED, new UnassignedInfo(randomFrom(UnassignedInfo.Reason.values()), "foobar")), TestShardRouting.newShardRouting("newest", 0, null, null, null,
randomBoolean(), ShardRoutingState.UNASSIGNED, new UnassignedInfo(randomFrom(UnassignedInfo.Reason.values()), "foobar")));
@@ -98,7 +100,7 @@ public class PriorityComparatorTests extends ESTestCase {
}
public void testPriorityComparatorSort() {
- RoutingNodes.UnassignedShards shards = new RoutingNodes.UnassignedShards((RoutingNodes) null);
+ RoutingNodes.UnassignedShards shards = new RoutingNodes.UnassignedShards(mock(RoutingNodes.class));
int numIndices = randomIntBetween(3, 99);
IndexMeta[] indices = new IndexMeta[numIndices];
final Map map = new HashMap<>();