Simplify RoutingNodes interface (#19870)
Slims the public interface of RoutingNodes down to 4 methods to update routing entries: - initializeShard() -> initializes an unassigned shard - startShard() -> starts an initializing shard / completes relocation of a shard - relocateShard() -> starts relocation of a started shard - failShard() -> fails/cancels an assigned shard In the spirit of PR #19743, where deassociateDeadNodes was moved to its own public method to be only called when nodes have actually left the cluster and not on every reroute step, this commit also removes electPrimariesAndUnassignedDanglingReplicas from AllocationService and folds it into the shard failure logic. This means that an active replica is promoted to primary in the same method where the primary was failed. Previously we would scan in each reroute iteration for active replicas to be promoted to primary.
This commit is contained in:
parent
1760e00489
commit
6abcd42a05
|
@ -23,12 +23,14 @@ import com.carrotsearch.hppc.ObjectIntHashMap;
|
|||
import com.carrotsearch.hppc.cursors.ObjectCursor;
|
||||
import org.apache.lucene.util.CollectionUtil;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.routing.UnassignedInfo.AllocationStatus;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.Randomness;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
|
||||
|
@ -48,8 +50,17 @@ import java.util.Set;
|
|||
import java.util.function.Predicate;
|
||||
|
||||
/**
|
||||
* {@link RoutingNodes} represents a copy the routing information contained in
|
||||
* the {@link ClusterState cluster state}.
|
||||
* {@link RoutingNodes} represents a copy the routing information contained in the {@link ClusterState cluster state}.
|
||||
* It can be either initialized as mutable or immutable (see {@link #RoutingNodes(ClusterState, boolean)}), allowing
|
||||
* or disallowing changes to its elements.
|
||||
*
|
||||
* The main methods used to update routing entries are:
|
||||
* <ul>
|
||||
* <li> {@link #initializeShard} initializes an unassigned shard.
|
||||
* <li> {@link #startShard} starts an initializing shard / completes relocation of a shard.
|
||||
* <li> {@link #relocateShard} starts relocation of a started shard.
|
||||
* <li> {@link #failShard} fails/cancels an assigned shard.
|
||||
* </ul>
|
||||
*/
|
||||
public class RoutingNodes implements Iterable<RoutingNode> {
|
||||
|
||||
|
@ -212,6 +223,7 @@ public class RoutingNodes implements Iterable<RoutingNode> {
|
|||
}
|
||||
|
||||
public Iterator<RoutingNode> mutableIterator() {
|
||||
ensureMutable();
|
||||
return nodesToShards.values().iterator();
|
||||
}
|
||||
|
||||
|
@ -396,10 +408,11 @@ public class RoutingNodes implements Iterable<RoutingNode> {
|
|||
* @param existingAllocationId allocation id to use. If null, a fresh allocation id is generated.
|
||||
* @return the initialized shard
|
||||
*/
|
||||
public ShardRouting initialize(ShardRouting shard, String nodeId, @Nullable String existingAllocationId, long expectedSize) {
|
||||
public ShardRouting initializeShard(ShardRouting unassignedShard, String nodeId, @Nullable String existingAllocationId,
|
||||
long expectedSize) {
|
||||
ensureMutable();
|
||||
assert shard.unassigned() : "expected an unassigned shard " + shard;
|
||||
ShardRouting initializedShard = shard.initialize(nodeId, existingAllocationId, expectedSize);
|
||||
assert unassignedShard.unassigned() : "expected an unassigned shard " + unassignedShard;
|
||||
ShardRouting initializedShard = unassignedShard.initialize(nodeId, existingAllocationId, expectedSize);
|
||||
node(nodeId).add(initializedShard);
|
||||
inactiveShardCount++;
|
||||
if (initializedShard.primary()) {
|
||||
|
@ -416,25 +429,164 @@ public class RoutingNodes implements Iterable<RoutingNode> {
|
|||
*
|
||||
* @return pair of source relocating and target initializing shards.
|
||||
*/
|
||||
public Tuple<ShardRouting,ShardRouting> relocate(ShardRouting shard, String nodeId, long expectedShardSize) {
|
||||
public Tuple<ShardRouting,ShardRouting> relocateShard(ShardRouting startedShard, String nodeId, long expectedShardSize) {
|
||||
ensureMutable();
|
||||
relocatingShards++;
|
||||
ShardRouting source = shard.relocate(nodeId, expectedShardSize);
|
||||
ShardRouting source = startedShard.relocate(nodeId, expectedShardSize);
|
||||
ShardRouting target = source.getTargetRelocatingShard();
|
||||
updateAssigned(shard, source);
|
||||
updateAssigned(startedShard, source);
|
||||
node(target.currentNodeId()).add(target);
|
||||
assignedShardsAdd(target);
|
||||
addRecovery(target);
|
||||
return Tuple.tuple(source, target);
|
||||
}
|
||||
|
||||
/**
|
||||
* Applies the relevant logic to start an initializing shard.
|
||||
*
|
||||
* Moves the initializing shard to started. If the shard is a relocation target, also removes the relocation source.
|
||||
*
|
||||
* @return the started shard
|
||||
*/
|
||||
public ShardRouting startShard(ESLogger logger, ShardRouting initializingShard) {
|
||||
ensureMutable();
|
||||
ShardRouting startedShard = started(initializingShard);
|
||||
logger.trace("{} marked shard as started (routing: {})", initializingShard.shardId(), initializingShard);
|
||||
|
||||
if (initializingShard.relocatingNodeId() != null) {
|
||||
// relocation target has been started, remove relocation source
|
||||
RoutingNode relocationSourceNode = node(initializingShard.relocatingNodeId());
|
||||
ShardRouting relocationSourceShard = relocationSourceNode.getByShardId(initializingShard.shardId());
|
||||
assert relocationSourceShard.isRelocationSourceOf(initializingShard);
|
||||
assert relocationSourceShard.getTargetRelocatingShard() == initializingShard : "relocation target mismatch, expected: "
|
||||
+ initializingShard + " but was: " + relocationSourceShard.getTargetRelocatingShard();
|
||||
remove(relocationSourceShard);
|
||||
}
|
||||
return startedShard;
|
||||
}
|
||||
|
||||
/**
|
||||
* Applies the relevant logic to handle a cancelled or failed shard.
|
||||
*
|
||||
* Moves the shard to unassigned or completely removes the shard (if relocation target).
|
||||
*
|
||||
* - If shard is a primary, this also fails initializing replicas.
|
||||
* - If shard is an active primary, this also promotes an active replica to primary (if such a replica exists).
|
||||
* - If shard is a relocating primary, this also removes the primary relocation target shard.
|
||||
* - If shard is a relocating replica, this promotes the replica relocation target to a full initializing replica, removing the
|
||||
* relocation source information. This is possible as peer recovery is always done from the primary.
|
||||
* - If shard is a (primary or replica) relocation target, this also clears the relocation information on the source shard.
|
||||
*
|
||||
*/
|
||||
public void failShard(ESLogger logger, ShardRouting failedShard, UnassignedInfo unassignedInfo, IndexMetaData indexMetaData) {
|
||||
ensureMutable();
|
||||
assert failedShard.assignedToNode() : "only assigned shards can be failed";
|
||||
assert indexMetaData.getIndex().equals(failedShard.index()) :
|
||||
"shard failed for unknown index (shard entry: " + failedShard + ")";
|
||||
assert getByAllocationId(failedShard.shardId(), failedShard.allocationId().getId()) == failedShard :
|
||||
"shard routing to fail does not exist in routing table, expected: " + failedShard + " but was: " +
|
||||
getByAllocationId(failedShard.shardId(), failedShard.allocationId().getId());
|
||||
|
||||
logger.debug("{} failing shard {} with unassigned info ({})", failedShard.shardId(), failedShard, unassignedInfo.shortSummary());
|
||||
|
||||
// if this is a primary, fail initializing replicas first (otherwise we move RoutingNodes into an inconsistent state)
|
||||
if (failedShard.primary()) {
|
||||
List<ShardRouting> assignedShards = assignedShards(failedShard.shardId());
|
||||
if (assignedShards.isEmpty() == false) {
|
||||
// copy list to prevent ConcurrentModificationException
|
||||
for (ShardRouting routing : new ArrayList<>(assignedShards)) {
|
||||
if (!routing.primary() && routing.initializing()) {
|
||||
// re-resolve replica as earlier iteration could have changed source/target of replica relocation
|
||||
ShardRouting replicaShard = getByAllocationId(routing.shardId(), routing.allocationId().getId());
|
||||
assert replicaShard != null : "failed to re-resolve " + routing + " when failing replicas";
|
||||
UnassignedInfo primaryFailedUnassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.PRIMARY_FAILED,
|
||||
"primary failed while replica initializing", null, 0, unassignedInfo.getUnassignedTimeInNanos(),
|
||||
unassignedInfo.getUnassignedTimeInMillis(), false, AllocationStatus.NO_ATTEMPT);
|
||||
failShard(logger, replicaShard, primaryFailedUnassignedInfo, indexMetaData);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (failedShard.relocating()) {
|
||||
// find the shard that is initializing on the target node
|
||||
ShardRouting targetShard = getByAllocationId(failedShard.shardId(), failedShard.allocationId().getRelocationId());
|
||||
assert targetShard.isRelocationTargetOf(failedShard);
|
||||
if (failedShard.primary()) {
|
||||
logger.trace("{} is removed due to the failure/cancellation of the source shard", targetShard);
|
||||
// cancel and remove target shard
|
||||
remove(targetShard);
|
||||
} else {
|
||||
logger.trace("{}, relocation source failed / cancelled, mark as initializing without relocation source", targetShard);
|
||||
// promote to initializing shard without relocation source and ensure that removed relocation source
|
||||
// is not added back as unassigned shard
|
||||
removeRelocationSource(targetShard);
|
||||
}
|
||||
}
|
||||
|
||||
// fail actual shard
|
||||
if (failedShard.initializing()) {
|
||||
if (failedShard.relocatingNodeId() == null) {
|
||||
// initializing shard that is not relocation target, just move to unassigned
|
||||
moveToUnassigned(failedShard, unassignedInfo);
|
||||
} else {
|
||||
// The shard is a target of a relocating shard. In that case we only need to remove the target shard and cancel the source
|
||||
// relocation. No shard is left unassigned
|
||||
logger.trace("{} is a relocation target, resolving source to cancel relocation ({})", failedShard,
|
||||
unassignedInfo.shortSummary());
|
||||
ShardRouting sourceShard = getByAllocationId(failedShard.shardId(),
|
||||
failedShard.allocationId().getRelocationId());
|
||||
assert sourceShard.isRelocationSourceOf(failedShard);
|
||||
logger.trace("{}, resolved source to [{}]. canceling relocation ... ({})", failedShard.shardId(), sourceShard,
|
||||
unassignedInfo.shortSummary());
|
||||
cancelRelocation(sourceShard);
|
||||
remove(failedShard);
|
||||
}
|
||||
} else {
|
||||
assert failedShard.active();
|
||||
if (failedShard.primary()) {
|
||||
// promote active replica to primary if active replica exists
|
||||
ShardRouting candidate = activeReplica(failedShard.shardId());
|
||||
if (candidate == null) {
|
||||
moveToUnassigned(failedShard, unassignedInfo);
|
||||
} else {
|
||||
movePrimaryToUnassignedAndDemoteToReplica(failedShard, unassignedInfo);
|
||||
ShardRouting primarySwappedCandidate = promoteAssignedReplicaShardToPrimary(candidate);
|
||||
if (primarySwappedCandidate.relocatingNodeId() != null) {
|
||||
// its also relocating, make sure to move the other routing to primary
|
||||
RoutingNode node = node(primarySwappedCandidate.relocatingNodeId());
|
||||
if (node != null) {
|
||||
for (ShardRouting shardRouting : node) {
|
||||
if (shardRouting.shardId().equals(primarySwappedCandidate.shardId()) && !shardRouting.primary()) {
|
||||
promoteAssignedReplicaShardToPrimary(shardRouting);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (IndexMetaData.isIndexUsingShadowReplicas(indexMetaData.getSettings())) {
|
||||
reinitShadowPrimary(primarySwappedCandidate);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
assert failedShard.primary() == false;
|
||||
if (failedShard.relocating()) {
|
||||
remove(failedShard);
|
||||
} else {
|
||||
moveToUnassigned(failedShard, unassignedInfo);
|
||||
}
|
||||
}
|
||||
}
|
||||
assert node(failedShard.currentNodeId()).getByShardId(failedShard.shardId()) == null : "failedShard " + failedShard +
|
||||
" was matched but wasn't removed";
|
||||
}
|
||||
|
||||
/**
|
||||
* Mark a shard as started and adjusts internal statistics.
|
||||
*
|
||||
* @return the started shard
|
||||
*/
|
||||
public ShardRouting started(ShardRouting shard) {
|
||||
ensureMutable();
|
||||
private ShardRouting started(ShardRouting shard) {
|
||||
assert shard.initializing() : "expected an initializing shard " + shard;
|
||||
if (shard.relocatingNodeId() == null) {
|
||||
// if this is not a target shard for relocation, we need to update statistics
|
||||
|
@ -456,8 +608,7 @@ public class RoutingNodes implements Iterable<RoutingNode> {
|
|||
*
|
||||
* @return the shard after cancelling relocation
|
||||
*/
|
||||
public ShardRouting cancelRelocation(ShardRouting shard) {
|
||||
ensureMutable();
|
||||
private ShardRouting cancelRelocation(ShardRouting shard) {
|
||||
relocatingShards--;
|
||||
ShardRouting cancelledShard = shard.cancelRelocation();
|
||||
updateAssigned(shard, cancelledShard);
|
||||
|
@ -470,8 +621,7 @@ public class RoutingNodes implements Iterable<RoutingNode> {
|
|||
* @param replicaShard the replica shard to be promoted to primary
|
||||
* @return the resulting primary shard
|
||||
*/
|
||||
public ShardRouting promoteAssignedReplicaShardToPrimary(ShardRouting replicaShard) {
|
||||
ensureMutable();
|
||||
private ShardRouting promoteAssignedReplicaShardToPrimary(ShardRouting replicaShard) {
|
||||
assert replicaShard.unassigned() == false : "unassigned shard cannot be promoted to primary: " + replicaShard;
|
||||
assert replicaShard.primary() == false : "primary shard cannot be promoted to primary: " + replicaShard;
|
||||
ShardRouting primaryShard = replicaShard.moveToPrimary();
|
||||
|
@ -485,8 +635,7 @@ public class RoutingNodes implements Iterable<RoutingNode> {
|
|||
* Cancels the give shard from the Routing nodes internal statistics and cancels
|
||||
* the relocation if the shard is relocating.
|
||||
*/
|
||||
public void remove(ShardRouting shard) {
|
||||
ensureMutable();
|
||||
private void remove(ShardRouting shard) {
|
||||
assert shard.unassigned() == false : "only assigned shards can be removed here (" + shard + ")";
|
||||
node(shard.currentNodeId()).remove(shard);
|
||||
if (shard.initializing() && shard.relocatingNodeId() == null) {
|
||||
|
@ -508,9 +657,8 @@ public class RoutingNodes implements Iterable<RoutingNode> {
|
|||
* Removes relocation source of an initializing non-primary shard. This allows the replica shard to continue recovery from
|
||||
* the primary even though its non-primary relocation source has failed.
|
||||
*/
|
||||
public ShardRouting removeRelocationSource(ShardRouting shard) {
|
||||
private ShardRouting removeRelocationSource(ShardRouting shard) {
|
||||
assert shard.isRelocationTarget() : "only relocation target shards can have their relocation source removed (" + shard + ")";
|
||||
ensureMutable();
|
||||
ShardRouting relocationMarkerRemoved = shard.removeRelocationSource();
|
||||
updateAssigned(shard, relocationMarkerRemoved);
|
||||
inactiveShardCount++; // relocation targets are not counted as inactive shards whereas initializing shards are
|
||||
|
@ -532,7 +680,6 @@ public class RoutingNodes implements Iterable<RoutingNode> {
|
|||
}
|
||||
|
||||
private void assignedShardsRemove(ShardRouting shard) {
|
||||
ensureMutable();
|
||||
final List<ShardRouting> replicaSet = assignedShards.get(shard.shardId());
|
||||
if (replicaSet != null) {
|
||||
final Iterator<ShardRouting> iterator = replicaSet.iterator();
|
||||
|
@ -547,8 +694,7 @@ public class RoutingNodes implements Iterable<RoutingNode> {
|
|||
assert false : "No shard found to remove";
|
||||
}
|
||||
|
||||
public ShardRouting reinitShadowPrimary(ShardRouting candidate) {
|
||||
ensureMutable();
|
||||
private ShardRouting reinitShadowPrimary(ShardRouting candidate) {
|
||||
if (candidate.relocating()) {
|
||||
cancelRelocation(candidate);
|
||||
}
|
||||
|
@ -573,8 +719,7 @@ public class RoutingNodes implements Iterable<RoutingNode> {
|
|||
shardsWithMatchingShardId.set(previousShardIndex, newShard);
|
||||
}
|
||||
|
||||
public ShardRouting moveToUnassigned(ShardRouting shard, UnassignedInfo unassignedInfo) {
|
||||
ensureMutable();
|
||||
private ShardRouting moveToUnassigned(ShardRouting shard, UnassignedInfo unassignedInfo) {
|
||||
assert shard.unassigned() == false : "only assigned shards can be moved to unassigned (" + shard + ")";
|
||||
remove(shard);
|
||||
ShardRouting unassigned = shard.moveToUnassigned(unassignedInfo);
|
||||
|
@ -582,6 +727,19 @@ public class RoutingNodes implements Iterable<RoutingNode> {
|
|||
return unassigned;
|
||||
}
|
||||
|
||||
/**
|
||||
* Moves assigned primary to unassigned and demotes it to a replica.
|
||||
* Used in conjunction with {@link #promoteAssignedReplicaShardToPrimary} when an active replica is promoted to primary.
|
||||
*/
|
||||
private ShardRouting movePrimaryToUnassignedAndDemoteToReplica(ShardRouting shard, UnassignedInfo unassignedInfo) {
|
||||
assert shard.unassigned() == false : "only assigned shards can be moved to unassigned (" + shard + ")";
|
||||
assert shard.primary() : "only primary can be demoted to replica (" + shard + ")";
|
||||
remove(shard);
|
||||
ShardRouting unassigned = shard.moveToUnassigned(unassignedInfo).moveFromPrimary();
|
||||
unassignedShards.add(unassigned);
|
||||
return unassigned;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the number of routing nodes
|
||||
*/
|
||||
|
@ -612,6 +770,7 @@ public class RoutingNodes implements Iterable<RoutingNode> {
|
|||
}
|
||||
|
||||
public void sort(Comparator<ShardRouting> comparator) {
|
||||
nodes.ensureMutable();
|
||||
CollectionUtil.timSort(unassigned, comparator);
|
||||
}
|
||||
|
||||
|
@ -661,6 +820,7 @@ public class RoutingNodes implements Iterable<RoutingNode> {
|
|||
* @return true iff the decision caused a change to the unassigned info
|
||||
*/
|
||||
public boolean ignoreShard(ShardRouting shard, AllocationStatus allocationStatus) {
|
||||
nodes.ensureMutable();
|
||||
boolean changed = false;
|
||||
if (shard.primary()) {
|
||||
ignoredPrimaries++;
|
||||
|
@ -704,8 +864,9 @@ public class RoutingNodes implements Iterable<RoutingNode> {
|
|||
* @param existingAllocationId allocation id to use. If null, a fresh allocation id is generated.
|
||||
*/
|
||||
public ShardRouting initialize(String nodeId, @Nullable String existingAllocationId, long expectedShardSize) {
|
||||
nodes.ensureMutable();
|
||||
innerRemove();
|
||||
return nodes.initialize(current, nodeId, existingAllocationId, expectedShardSize);
|
||||
return nodes.initializeShard(current, nodeId, existingAllocationId, expectedShardSize);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -718,6 +879,7 @@ public class RoutingNodes implements Iterable<RoutingNode> {
|
|||
* @return true iff the decision caused an update to the unassigned info
|
||||
*/
|
||||
public boolean removeAndIgnore(AllocationStatus attempt) {
|
||||
nodes.ensureMutable();
|
||||
innerRemove();
|
||||
return ignoreShard(current, attempt);
|
||||
}
|
||||
|
@ -734,23 +896,12 @@ public class RoutingNodes implements Iterable<RoutingNode> {
|
|||
* @return the shard with unassigned info updated
|
||||
*/
|
||||
public ShardRouting updateUnassignedInfo(UnassignedInfo unassignedInfo) {
|
||||
nodes.ensureMutable();
|
||||
ShardRouting updatedShardRouting = current.updateUnassignedInfo(unassignedInfo);
|
||||
updateShardRouting(updatedShardRouting);
|
||||
return updatedShardRouting;
|
||||
}
|
||||
|
||||
/**
|
||||
* marks the current primary shard as replica
|
||||
*
|
||||
* @return the shard with primary status swapped
|
||||
*/
|
||||
public ShardRouting demotePrimaryToReplicaShard() {
|
||||
assert current.primary() : "non-primary shard " + current + " cannot be demoted";
|
||||
updateShardRouting(current.moveFromPrimary());
|
||||
primaries--;
|
||||
return current;
|
||||
}
|
||||
|
||||
/**
|
||||
* Unsupported operation, just there for the interface. Use {@link #removeAndIgnore(AllocationStatus)} or
|
||||
* {@link #initialize(String, String, long)}.
|
||||
|
@ -761,7 +912,6 @@ public class RoutingNodes implements Iterable<RoutingNode> {
|
|||
}
|
||||
|
||||
private void innerRemove() {
|
||||
nodes.ensureMutable();
|
||||
iterator.remove();
|
||||
if (current.primary()) {
|
||||
primaries--;
|
||||
|
@ -786,6 +936,7 @@ public class RoutingNodes implements Iterable<RoutingNode> {
|
|||
}
|
||||
|
||||
public void shuffle() {
|
||||
nodes.ensureMutable();
|
||||
Randomness.shuffle(unassigned);
|
||||
}
|
||||
|
||||
|
@ -794,6 +945,7 @@ public class RoutingNodes implements Iterable<RoutingNode> {
|
|||
* This method will not drain ignored shards.
|
||||
*/
|
||||
public ShardRouting[] drain() {
|
||||
nodes.ensureMutable();
|
||||
ShardRouting[] mutableShardRoutings = unassigned.toArray(new ShardRouting[unassigned.size()]);
|
||||
unassigned.clear();
|
||||
primaries = 0;
|
||||
|
|
|
@ -41,14 +41,11 @@ import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands;
|
|||
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.gateway.GatewayAllocator;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
@ -237,6 +234,7 @@ public class AllocationService extends AbstractComponent {
|
|||
|
||||
for (FailedRerouteAllocation.FailedShard failedShardEntry : failedShards) {
|
||||
ShardRouting shardToFail = failedShardEntry.routingEntry;
|
||||
IndexMetaData indexMetaData = allocation.metaData().getIndexSafe(shardToFail.shardId().getIndex());
|
||||
allocation.addIgnoreShardForNode(shardToFail.shardId(), shardToFail.currentNodeId());
|
||||
// failing a primary also fails initializing replica shards, re-resolve ShardRouting
|
||||
ShardRouting failedShard = routingNodes.getByAllocationId(shardToFail.shardId(), shardToFail.allocationId().getId());
|
||||
|
@ -249,7 +247,7 @@ public class AllocationService extends AbstractComponent {
|
|||
UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, failedShardEntry.message,
|
||||
failedShardEntry.failure, failedAllocations + 1, currentNanoTime, System.currentTimeMillis(), false,
|
||||
AllocationStatus.NO_ATTEMPT);
|
||||
applyFailedShard(allocation, failedShard, unassignedInfo);
|
||||
routingNodes.failShard(logger, failedShard, unassignedInfo, indexMetaData);
|
||||
} else {
|
||||
logger.trace("{} shard routing failed in an earlier iteration (routing: {})", shardToFail.shardId(), shardToFail);
|
||||
}
|
||||
|
@ -385,8 +383,7 @@ public class AllocationService extends AbstractComponent {
|
|||
private boolean reroute(RoutingAllocation allocation) {
|
||||
assert deassociateDeadNodes(allocation) == false : "dead nodes should be explicitly cleaned up. See deassociateDeadNodes";
|
||||
|
||||
boolean changed = electPrimariesAndUnassignedDanglingReplicas(allocation);
|
||||
|
||||
boolean changed = false;
|
||||
// now allocate all the unassigned to available nodes
|
||||
if (allocation.routingNodes().unassigned().size() > 0) {
|
||||
changed |= removeDelayMarkers(allocation);
|
||||
|
@ -398,49 +395,6 @@ public class AllocationService extends AbstractComponent {
|
|||
return changed;
|
||||
}
|
||||
|
||||
private boolean electPrimariesAndUnassignedDanglingReplicas(RoutingAllocation allocation) {
|
||||
boolean changed = false;
|
||||
final RoutingNodes routingNodes = allocation.routingNodes();
|
||||
if (routingNodes.unassigned().getNumPrimaries() == 0) {
|
||||
// move out if we don't have unassigned primaries
|
||||
return changed;
|
||||
}
|
||||
// now, go over and elect a new primary if possible, not, from this code block on, if one is elected,
|
||||
// routingNodes.hasUnassignedPrimaries() will potentially be false
|
||||
final RoutingNodes.UnassignedShards.UnassignedIterator unassignedIterator = routingNodes.unassigned().iterator();
|
||||
while (unassignedIterator.hasNext()) {
|
||||
ShardRouting shardEntry = unassignedIterator.next();
|
||||
if (shardEntry.primary()) {
|
||||
// remove dangling replicas that are initializing for primary shards
|
||||
changed |= failReplicasForUnassignedPrimary(allocation, shardEntry);
|
||||
ShardRouting candidate = allocation.routingNodes().activeReplica(shardEntry.shardId());
|
||||
if (candidate != null) {
|
||||
shardEntry = unassignedIterator.demotePrimaryToReplicaShard();
|
||||
ShardRouting primarySwappedCandidate = routingNodes.promoteAssignedReplicaShardToPrimary(candidate);
|
||||
changed = true;
|
||||
if (primarySwappedCandidate.relocatingNodeId() != null) {
|
||||
// its also relocating, make sure to move the other routing to primary
|
||||
RoutingNode node = routingNodes.node(primarySwappedCandidate.relocatingNodeId());
|
||||
if (node != null) {
|
||||
for (ShardRouting shardRouting : node) {
|
||||
if (shardRouting.shardId().equals(primarySwappedCandidate.shardId()) && !shardRouting.primary()) {
|
||||
routingNodes.promoteAssignedReplicaShardToPrimary(shardRouting);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
IndexMetaData index = allocation.metaData().getIndexSafe(primarySwappedCandidate.index());
|
||||
if (IndexMetaData.isIndexUsingShadowReplicas(index.getSettings())) {
|
||||
routingNodes.reinitShadowPrimary(primarySwappedCandidate);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return changed;
|
||||
}
|
||||
|
||||
private boolean deassociateDeadNodes(RoutingAllocation allocation) {
|
||||
boolean changed = false;
|
||||
for (Iterator<RoutingNode> it = allocation.routingNodes().mutableIterator(); it.hasNext(); ) {
|
||||
|
@ -456,7 +410,7 @@ public class AllocationService extends AbstractComponent {
|
|||
boolean delayed = INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.get(indexMetaData.getSettings()).nanos() > 0;
|
||||
UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.NODE_LEFT, "node_left[" + node.nodeId() + "]",
|
||||
null, 0, allocation.getCurrentNanoTime(), System.currentTimeMillis(), delayed, AllocationStatus.NO_ATTEMPT);
|
||||
applyFailedShard(allocation, shardRouting, unassignedInfo);
|
||||
allocation.routingNodes().failShard(logger, shardRouting, unassignedInfo, indexMetaData);
|
||||
}
|
||||
// its a dead node, remove it, note, its important to remove it *after* we apply failed shard
|
||||
// since it relies on the fact that the RoutingNode exists in the list of nodes
|
||||
|
@ -465,23 +419,6 @@ public class AllocationService extends AbstractComponent {
|
|||
return changed;
|
||||
}
|
||||
|
||||
private boolean failReplicasForUnassignedPrimary(RoutingAllocation allocation, ShardRouting failedPrimary) {
|
||||
assert failedPrimary.primary() : "can only fail replicas for primary shard: " + failedPrimary;
|
||||
List<ShardRouting> replicas = new ArrayList<>();
|
||||
for (ShardRouting routing : allocation.routingNodes().assignedShards(failedPrimary.shardId())) {
|
||||
if (!routing.primary() && routing.initializing()) {
|
||||
replicas.add(routing);
|
||||
}
|
||||
}
|
||||
for (ShardRouting failedReplica : replicas) {
|
||||
UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.PRIMARY_FAILED,
|
||||
"primary failed while replica initializing", null, 0, allocation.getCurrentNanoTime(), System.currentTimeMillis(), false,
|
||||
AllocationStatus.NO_ATTEMPT);
|
||||
applyFailedShard(allocation, failedReplica, unassignedInfo);
|
||||
}
|
||||
return replicas.isEmpty() == false;
|
||||
}
|
||||
|
||||
private void applyStartedShards(RoutingAllocation routingAllocation, List<ShardRouting> startedShardEntries) {
|
||||
assert startedShardEntries.isEmpty() == false : "non-empty list of started shard entries expected";
|
||||
RoutingNodes routingNodes = routingAllocation.routingNodes();
|
||||
|
@ -493,81 +430,7 @@ public class AllocationService extends AbstractComponent {
|
|||
"shard routing to start does not exist in routing table, expected: " + startedShard + " but was: " +
|
||||
routingNodes.getByAllocationId(startedShard.shardId(), startedShard.allocationId().getId());
|
||||
|
||||
routingNodes.started(startedShard);
|
||||
logger.trace("{} marked shard as started (routing: {})", startedShard.shardId(), startedShard);
|
||||
|
||||
if (startedShard.relocatingNodeId() != null) {
|
||||
// relocation target has been started, remove relocation source
|
||||
RoutingNode relocationSourceNode = routingNodes.node(startedShard.relocatingNodeId());
|
||||
ShardRouting relocationSourceShard = relocationSourceNode.getByShardId(startedShard.shardId());
|
||||
assert relocationSourceShard.isRelocationSourceOf(startedShard);
|
||||
assert relocationSourceShard.getTargetRelocatingShard() == startedShard : "relocation target mismatch, expected: "
|
||||
+ startedShard + " but was: " + relocationSourceShard.getTargetRelocatingShard();
|
||||
routingNodes.remove(relocationSourceShard);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Applies the relevant logic to handle a failed shard.
|
||||
*/
|
||||
private void applyFailedShard(RoutingAllocation allocation, ShardRouting failedShard, UnassignedInfo unassignedInfo) {
|
||||
RoutingNodes routingNodes = allocation.routingNodes();
|
||||
assert failedShard.assignedToNode() : "only assigned shards can be failed";
|
||||
assert allocation.metaData().index(failedShard.shardId().getIndex()) != null :
|
||||
"shard failed for unknown index (shard entry: " + failedShard + ")";
|
||||
assert routingNodes.getByAllocationId(failedShard.shardId(), failedShard.allocationId().getId()) == failedShard :
|
||||
"shard routing to fail does not exist in routing table, expected: " + failedShard + " but was: " +
|
||||
routingNodes.getByAllocationId(failedShard.shardId(), failedShard.allocationId().getId());
|
||||
|
||||
logger.debug("{} failing shard {} with unassigned info ({})", failedShard.shardId(), failedShard, unassignedInfo.shortSummary());
|
||||
if (failedShard.primary()) {
|
||||
// fail replicas first otherwise we move RoutingNodes into an inconsistent state
|
||||
failReplicasForUnassignedPrimary(allocation, failedShard);
|
||||
}
|
||||
|
||||
cancelShard(logger, failedShard, unassignedInfo, routingNodes);
|
||||
assert routingNodes.node(failedShard.currentNodeId()).getByShardId(failedShard.shardId()) == null : "failedShard " + failedShard +
|
||||
" was matched but wasn't removed";
|
||||
}
|
||||
|
||||
public static void cancelShard(ESLogger logger, ShardRouting cancelledShard, UnassignedInfo unassignedInfo, RoutingNodes routingNodes) {
|
||||
if (cancelledShard.relocatingNodeId() == null) {
|
||||
routingNodes.moveToUnassigned(cancelledShard, unassignedInfo);
|
||||
} else {
|
||||
if (cancelledShard.initializing()) {
|
||||
// The shard is a target of a relocating shard. In that case we only
|
||||
// need to remove the target shard and cancel the source relocation.
|
||||
// No shard is left unassigned
|
||||
logger.trace("{} is a relocation target, resolving source to cancel relocation ({})", cancelledShard,
|
||||
unassignedInfo.shortSummary());
|
||||
RoutingNode sourceNode = routingNodes.node(cancelledShard.relocatingNodeId());
|
||||
ShardRouting sourceShard = sourceNode.getByShardId(cancelledShard.shardId());
|
||||
assert sourceShard.isRelocationSourceOf(cancelledShard);
|
||||
logger.trace("{}, resolved source to [{}]. canceling relocation ... ({})", cancelledShard.shardId(), sourceShard,
|
||||
unassignedInfo.shortSummary());
|
||||
routingNodes.cancelRelocation(sourceShard);
|
||||
routingNodes.remove(cancelledShard);
|
||||
} else {
|
||||
assert cancelledShard.relocating();
|
||||
// The cancelled shard is the main copy of the current shard routing.
|
||||
// now, find the shard that is initializing on the target node
|
||||
RoutingNode targetNode = routingNodes.node(cancelledShard.relocatingNodeId());
|
||||
ShardRouting targetShard = targetNode.getByShardId(cancelledShard.shardId());
|
||||
assert targetShard.isRelocationTargetOf(cancelledShard);
|
||||
if (cancelledShard.primary()) {
|
||||
logger.trace("{} is removed due to the failure/cancellation of the source shard", targetShard);
|
||||
// cancel and remove target shard
|
||||
routingNodes.remove(targetShard);
|
||||
routingNodes.moveToUnassigned(cancelledShard, unassignedInfo);
|
||||
} else {
|
||||
logger.trace("{}, relocation source failed / cancelled, mark as initializing without relocation source", targetShard);
|
||||
// promote to initializing shard without relocation source and ensure that removed relocation source
|
||||
// is not added back as unassigned shard
|
||||
routingNodes.removeRelocationSource(targetShard);
|
||||
routingNodes.remove(cancelledShard);
|
||||
}
|
||||
}
|
||||
routingNodes.startShard(logger, startedShard);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -553,7 +553,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
|||
Decision allocationDecision = allocation.deciders().canAllocate(shardRouting, target, allocation);
|
||||
if (allocationDecision.type() == Type.YES) { // TODO maybe we can respect throttling here too?
|
||||
sourceNode.removeShard(shardRouting);
|
||||
Tuple<ShardRouting, ShardRouting> relocatingShards = routingNodes.relocate(shardRouting, target.nodeId(), allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE));
|
||||
Tuple<ShardRouting, ShardRouting> relocatingShards = routingNodes.relocateShard(shardRouting, target.nodeId(), allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE));
|
||||
currentNode.addShard(relocatingShards.v2());
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Moved shard [{}] to node [{}]", shardRouting, routingNode.node());
|
||||
|
@ -728,7 +728,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
|||
logger.trace("Assigned shard [{}] to [{}]", shard, minNode.getNodeId());
|
||||
}
|
||||
|
||||
shard = routingNodes.initialize(shard, minNode.getNodeId(), null, shardSize);
|
||||
shard = routingNodes.initializeShard(shard, minNode.getNodeId(), null, shardSize);
|
||||
minNode.addShard(shard);
|
||||
changed = true;
|
||||
continue; // don't add to ignoreUnassigned
|
||||
|
@ -820,7 +820,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
|||
minNode.getNodeId());
|
||||
}
|
||||
/* now allocate on the cluster */
|
||||
minNode.addShard(routingNodes.relocate(candidate, minNode.getNodeId(), shardSize).v1());
|
||||
minNode.addShard(routingNodes.relocateShard(candidate, minNode.getNodeId(), shardSize).v1());
|
||||
return true;
|
||||
} else {
|
||||
assert decision.type() == Type.THROTTLE;
|
||||
|
|
|
@ -127,8 +127,9 @@ public class CancelAllocationCommand implements AllocationCommand {
|
|||
ShardRouting shardRouting = null;
|
||||
RoutingNodes routingNodes = allocation.routingNodes();
|
||||
RoutingNode routingNode = routingNodes.node(discoNode.getId());
|
||||
IndexMetaData indexMetaData = null;
|
||||
if (routingNode != null) {
|
||||
IndexMetaData indexMetaData = allocation.metaData().index(index());
|
||||
indexMetaData = allocation.metaData().index(index());
|
||||
if (indexMetaData == null) {
|
||||
throw new IndexNotFoundException(index());
|
||||
}
|
||||
|
@ -154,8 +155,8 @@ public class CancelAllocationCommand implements AllocationCommand {
|
|||
discoNode + ", shard is primary and " + shardRouting.state().name().toLowerCase(Locale.ROOT));
|
||||
}
|
||||
}
|
||||
AllocationService.cancelShard(Loggers.getLogger(CancelAllocationCommand.class), shardRouting,
|
||||
new UnassignedInfo(UnassignedInfo.Reason.REROUTE_CANCELLED, null), routingNodes);
|
||||
routingNodes.failShard(Loggers.getLogger(CancelAllocationCommand.class), shardRouting,
|
||||
new UnassignedInfo(UnassignedInfo.Reason.REROUTE_CANCELLED, null), indexMetaData);
|
||||
return new RerouteExplanation(this, allocation.decision(Decision.YES, "cancel_allocation_command",
|
||||
"shard " + shardId + " on node " + discoNode + " can be cancelled"));
|
||||
}
|
||||
|
|
|
@ -132,7 +132,7 @@ public class MoveAllocationCommand implements AllocationCommand {
|
|||
if (decision.type() == Decision.Type.THROTTLE) {
|
||||
// its being throttled, maybe have a flag to take it into account and fail? for now, just do it since the "user" wants it...
|
||||
}
|
||||
allocation.routingNodes().relocate(shardRouting, toRoutingNode.nodeId(), allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE));
|
||||
allocation.routingNodes().relocateShard(shardRouting, toRoutingNode.nodeId(), allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE));
|
||||
}
|
||||
|
||||
if (!found) {
|
||||
|
|
|
@ -31,6 +31,7 @@ import org.elasticsearch.cluster.routing.RoutingNodes;
|
|||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.UnassignedInfo;
|
||||
import org.elasticsearch.cluster.routing.UnassignedInfo.AllocationStatus;
|
||||
import org.elasticsearch.cluster.routing.allocation.AllocationService;
|
||||
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
|
@ -63,7 +64,7 @@ public abstract class ReplicaShardAllocator extends AbstractComponent {
|
|||
boolean changed = false;
|
||||
MetaData metaData = allocation.metaData();
|
||||
RoutingNodes routingNodes = allocation.routingNodes();
|
||||
List<Tuple<ShardRouting, UnassignedInfo>> recoveriesToCancel = new ArrayList<>();
|
||||
List<Runnable> shardCancellationActions = new ArrayList<>();
|
||||
for (RoutingNode routingNode : routingNodes) {
|
||||
for (ShardRouting shard : routingNode) {
|
||||
if (shard.primary() == true) {
|
||||
|
@ -120,14 +121,14 @@ public abstract class ReplicaShardAllocator extends AbstractComponent {
|
|||
"existing allocation of replica to [" + currentNode + "] cancelled, sync id match found on node ["+ nodeWithHighestMatch + "]",
|
||||
null, 0, allocation.getCurrentNanoTime(), System.currentTimeMillis(), false, UnassignedInfo.AllocationStatus.NO_ATTEMPT);
|
||||
// don't cancel shard in the loop as it will cause a ConcurrentModificationException
|
||||
recoveriesToCancel.add(new Tuple<>(shard, unassignedInfo));
|
||||
shardCancellationActions.add(() -> routingNodes.failShard(logger, shard, unassignedInfo, indexMetaData));
|
||||
changed = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
for (Tuple<ShardRouting, UnassignedInfo> cancellation : recoveriesToCancel) {
|
||||
routingNodes.moveToUnassigned(cancellation.v1(), cancellation.v2());
|
||||
for (Runnable action : shardCancellationActions) {
|
||||
action.run();
|
||||
}
|
||||
return changed;
|
||||
}
|
||||
|
|
|
@ -335,37 +335,37 @@ public class BalanceConfigurationTests extends ESAllocationTestCase {
|
|||
switch (sr.id()) {
|
||||
case 0:
|
||||
if (sr.primary()) {
|
||||
allocation.routingNodes().initialize(sr, "node1", null, -1);
|
||||
allocation.routingNodes().initializeShard(sr, "node1", null, -1);
|
||||
} else {
|
||||
allocation.routingNodes().initialize(sr, "node0", null, -1);
|
||||
allocation.routingNodes().initializeShard(sr, "node0", null, -1);
|
||||
}
|
||||
break;
|
||||
case 1:
|
||||
if (sr.primary()) {
|
||||
allocation.routingNodes().initialize(sr, "node1", null, -1);
|
||||
allocation.routingNodes().initializeShard(sr, "node1", null, -1);
|
||||
} else {
|
||||
allocation.routingNodes().initialize(sr, "node2", null, -1);
|
||||
allocation.routingNodes().initializeShard(sr, "node2", null, -1);
|
||||
}
|
||||
break;
|
||||
case 2:
|
||||
if (sr.primary()) {
|
||||
allocation.routingNodes().initialize(sr, "node3", null, -1);
|
||||
allocation.routingNodes().initializeShard(sr, "node3", null, -1);
|
||||
} else {
|
||||
allocation.routingNodes().initialize(sr, "node2", null, -1);
|
||||
allocation.routingNodes().initializeShard(sr, "node2", null, -1);
|
||||
}
|
||||
break;
|
||||
case 3:
|
||||
if (sr.primary()) {
|
||||
allocation.routingNodes().initialize(sr, "node3", null, -1);
|
||||
allocation.routingNodes().initializeShard(sr, "node3", null, -1);
|
||||
} else {
|
||||
allocation.routingNodes().initialize(sr, "node1", null, -1);
|
||||
allocation.routingNodes().initializeShard(sr, "node1", null, -1);
|
||||
}
|
||||
break;
|
||||
case 4:
|
||||
if (sr.primary()) {
|
||||
allocation.routingNodes().initialize(sr, "node2", null, -1);
|
||||
allocation.routingNodes().initializeShard(sr, "node2", null, -1);
|
||||
} else {
|
||||
allocation.routingNodes().initialize(sr, "node0", null, -1);
|
||||
allocation.routingNodes().initializeShard(sr, "node0", null, -1);
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -35,10 +35,12 @@ import java.util.List;
|
|||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
public class PriorityComparatorTests extends ESTestCase {
|
||||
|
||||
public void testPreferNewIndices() {
|
||||
RoutingNodes.UnassignedShards shards = new RoutingNodes.UnassignedShards(null);
|
||||
RoutingNodes.UnassignedShards shards = new RoutingNodes.UnassignedShards(mock(RoutingNodes.class));
|
||||
List<ShardRouting> shardRoutings = Arrays.asList(TestShardRouting.newShardRouting("oldest", 0, null, null, null,
|
||||
randomBoolean(), ShardRoutingState.UNASSIGNED, new UnassignedInfo(randomFrom(UnassignedInfo.Reason.values()), "foobar")), TestShardRouting.newShardRouting("newest", 0, null, null, null,
|
||||
randomBoolean(), ShardRoutingState.UNASSIGNED, new UnassignedInfo(randomFrom(UnassignedInfo.Reason.values()), "foobar")));
|
||||
|
@ -68,7 +70,7 @@ public class PriorityComparatorTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testPreferPriorityIndices() {
|
||||
RoutingNodes.UnassignedShards shards = new RoutingNodes.UnassignedShards((RoutingNodes) null);
|
||||
RoutingNodes.UnassignedShards shards = new RoutingNodes.UnassignedShards(mock(RoutingNodes.class));
|
||||
List<ShardRouting> shardRoutings = Arrays.asList(TestShardRouting.newShardRouting("oldest", 0, null, null, null,
|
||||
randomBoolean(), ShardRoutingState.UNASSIGNED, new UnassignedInfo(randomFrom(UnassignedInfo.Reason.values()), "foobar")), TestShardRouting.newShardRouting("newest", 0, null, null, null,
|
||||
randomBoolean(), ShardRoutingState.UNASSIGNED, new UnassignedInfo(randomFrom(UnassignedInfo.Reason.values()), "foobar")));
|
||||
|
@ -98,7 +100,7 @@ public class PriorityComparatorTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testPriorityComparatorSort() {
|
||||
RoutingNodes.UnassignedShards shards = new RoutingNodes.UnassignedShards((RoutingNodes) null);
|
||||
RoutingNodes.UnassignedShards shards = new RoutingNodes.UnassignedShards(mock(RoutingNodes.class));
|
||||
int numIndices = randomIntBetween(3, 99);
|
||||
IndexMeta[] indices = new IndexMeta[numIndices];
|
||||
final Map<String, IndexMeta> map = new HashMap<>();
|
||||
|
|
Loading…
Reference in New Issue