diff --git a/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java b/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java index 36ae62e437a..41ddb49bb65 100644 --- a/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java +++ b/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java @@ -145,17 +145,12 @@ public class ShardStateAction extends AbstractComponent { return currentState; } - final MetaData metaData = currentState.getMetaData(); - - List shardRoutingsToBeApplied = new ArrayList<>(shardRoutingEntries.size()); - for (ShardRoutingEntry entry : extractShardsToBeApplied(shardRoutingEntries, "failed", metaData, logger)) { - shardRoutingsToBeApplied.add(new FailedRerouteAllocation.FailedShard(entry.shardRouting, entry.message, entry.failure)); - } // mark all entries as processed for (ShardRoutingEntry entry : shardRoutingEntries) { entry.processed = true; + shardRoutingsToBeApplied.add(new FailedRerouteAllocation.FailedShard(entry.shardRouting, entry.message, entry.failure)); } RoutingAllocation.Result routingResult = allocationService.applyFailedShards(currentState, shardRoutingsToBeApplied); @@ -180,31 +175,6 @@ public class ShardStateAction extends AbstractComponent { }); } - static List extractShardsToBeApplied(List shardRoutingEntries, String type, MetaData metaData, ESLogger logger) { - List shardRoutingsToBeApplied = new ArrayList<>(shardRoutingEntries.size()); - for (int i = 0; i < shardRoutingEntries.size(); i++) { - ShardRoutingEntry shardRoutingEntry = shardRoutingEntries.get(i); - ShardRouting shardRouting = shardRoutingEntry.shardRouting; - IndexMetaData indexMetaData = metaData.index(shardRouting.index()); - // if there is no metadata or the current index is not of the right uuid, the index has been deleted while it was being allocated - // which is fine, we should just ignore this - if (indexMetaData == null) { - logger.debug("{} ignoring shard {}, unknown index in {}", shardRouting.shardId(), type, shardRoutingEntry); - continue; - } - if (!indexMetaData.isSameUUID(shardRoutingEntry.indexUUID)) { - logger.debug("{} ignoring shard {}, different index uuid, current {}, got {}", shardRouting.shardId(), type, indexMetaData.getIndexUUID(), shardRoutingEntry); - continue; - } - - // more debug info will be logged by the allocation service - logger.trace("{} will apply shard {} {}", shardRouting.shardId(), type, shardRoutingEntry); - shardRoutingsToBeApplied.add(shardRoutingEntry); - } - return shardRoutingsToBeApplied; - - } - private void shardStartedOnMaster(final ShardRoutingEntry shardRoutingEntry) { logger.debug("received shard started for {}", shardRoutingEntry); // buffer shard started requests, and the state update tasks will simply drain it @@ -230,18 +200,12 @@ public class ShardStateAction extends AbstractComponent { return currentState; } - RoutingTable routingTable = currentState.routingTable(); - MetaData metaData = currentState.getMetaData(); - - List shardRoutingToBeApplied = new ArrayList<>(shardRoutingEntries.size()); - for (ShardRoutingEntry entry : extractShardsToBeApplied(shardRoutingEntries, "started", metaData, logger)) { - shardRoutingToBeApplied.add(entry.shardRouting); - } // mark all entries as processed for (ShardRoutingEntry entry : shardRoutingEntries) { entry.processed = true; + shardRoutingToBeApplied.add(entry.shardRouting); } if (shardRoutingToBeApplied.isEmpty()) { diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/AllocationId.java b/core/src/main/java/org/elasticsearch/cluster/routing/AllocationId.java index 55e7ca729b7..ffcf9f1e80c 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/AllocationId.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/AllocationId.java @@ -22,6 +22,8 @@ package org.elasticsearch.cluster.routing; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; import java.io.IOException; @@ -34,7 +36,7 @@ import java.io.IOException; * relocationId. Once relocation is done, the new allocation id is set to the relocationId. This is similar * behavior to how ShardRouting#currentNodeId is used. */ -public class AllocationId { +public class AllocationId implements ToXContent { private final String id; private final String relocationId; @@ -67,7 +69,7 @@ public class AllocationId { */ public static AllocationId newTargetRelocation(AllocationId allocationId) { assert allocationId.getRelocationId() != null; - return new AllocationId(allocationId.getRelocationId(), null); + return new AllocationId(allocationId.getRelocationId(), allocationId.getId()); } /** @@ -81,19 +83,24 @@ public class AllocationId { /** * Creates a new allocation id representing a cancelled relocation. - */ + * + * Note that this is expected to be called on the allocation id + * of the *source* shard + * */ public static AllocationId cancelRelocation(AllocationId allocationId) { assert allocationId.getRelocationId() != null; return new AllocationId(allocationId.getId(), null); } /** - * Creates a new allocation id finalizing a relocation, moving the transient - * relocation id to be the actual id. + * Creates a new allocation id finalizing a relocation. + * + * Note that this is expected to be called on the allocation id + * of the *target* shard and thus it only needs to clear the relocating id. */ public static AllocationId finishRelocation(AllocationId allocationId) { assert allocationId.getRelocationId() != null; - return new AllocationId(allocationId.getRelocationId(), null); + return new AllocationId(allocationId.getId(), null); } /** @@ -126,4 +133,20 @@ public class AllocationId { result = 31 * result + (relocationId != null ? relocationId.hashCode() : 0); return result; } + + @Override + public String toString() { + return "[id=" + id + (relocationId == null ? "" : ", rId=" + relocationId) + "]"; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject("allocation_id"); + builder.field("id", id); + if (relocationId != null) { + builder.field("relocation_id", relocationId); + } + builder.endObject(); + return builder; + } } diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java b/core/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java index efe3f676f78..b348afbc192 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java @@ -394,15 +394,14 @@ public class RoutingNodes implements Iterable { * Mark a shard as started and adjusts internal statistics. */ public void started(ShardRouting shard) { - if (!shard.active() && shard.relocatingNodeId() == null) { + assert !shard.active() : "expected an intializing shard " + shard; + if (shard.relocatingNodeId() == null) { + // if this is not a target shard for relocation, we need to update statistics inactiveShardCount--; if (shard.primary()) { inactivePrimaryCount--; } - } else if (shard.relocating()) { - relocatingShards--; } - assert !shard.started(); shard.moveToStarted(); } @@ -757,6 +756,7 @@ public class RoutingNodes implements Iterable { private final RoutingNode iterable; private ShardRouting shard; private final Iterator delegate; + private boolean removed = false; public RoutingNodeIterator(RoutingNode iterable) { this.delegate = iterable.mutableIterator(); @@ -770,6 +770,7 @@ public class RoutingNodes implements Iterable { @Override public ShardRouting next() { + removed = false; return shard = delegate.next(); } @@ -777,6 +778,13 @@ public class RoutingNodes implements Iterable { public void remove() { delegate.remove(); RoutingNodes.this.remove(shard); + removed = true; + } + + + /** returns true if {@link #remove()} or {@link #moveToUnassigned(UnassignedInfo)} were called on the current shard */ + public boolean isRemoved() { + return removed; } @Override @@ -785,10 +793,16 @@ public class RoutingNodes implements Iterable { } public void moveToUnassigned(UnassignedInfo unassignedInfo) { - remove(); + if (isRemoved() == false) { + remove(); + } ShardRouting unassigned = new ShardRouting(shard); // protective copy of the mutable shard unassigned.moveToUnassigned(unassignedInfo); unassigned().add(unassigned); } + + public ShardRouting current() { + return shard; + } } } diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/ShardRouting.java b/core/src/main/java/org/elasticsearch/cluster/routing/ShardRouting.java index 7165fda8a4d..1cbacda02ee 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/ShardRouting.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/ShardRouting.java @@ -441,11 +441,12 @@ public final class ShardRouting implements Streamable, ToXContent { void moveToStarted() { ensureNotFrozen(); version++; - assert state == ShardRoutingState.INITIALIZING || state == ShardRoutingState.RELOCATING : this; + assert state == ShardRoutingState.INITIALIZING : "expected an initializing shard " + this; relocatingNodeId = null; restoreSource = null; unassignedInfo = null; // we keep the unassigned data until the shard is started - if (state == ShardRoutingState.RELOCATING) { + if (allocationId.getRelocationId() != null) { + // target relocation allocationId = AllocationId.finishRelocation(allocationId); } state = ShardRoutingState.STARTED; @@ -502,6 +503,9 @@ public final class ShardRouting implements Streamable, ToXContent { if (relocatingNodeId != null ? !relocatingNodeId.equals(that.relocatingNodeId) : that.relocatingNodeId != null) { return false; } + if (allocationId != null ? !allocationId.equals(that.allocationId) : that.allocationId != null) { + return false; + } if (state != that.state) { return false; } @@ -526,6 +530,7 @@ public final class ShardRouting implements Streamable, ToXContent { result = 31 * result + (primary ? 1 : 0); result = 31 * result + (state != null ? state.hashCode() : 0); result = 31 * result + (restoreSource != null ? restoreSource.hashCode() : 0); + result = 31 * result + (allocationId != null ? allocationId.hashCode() : 0); return hashCode = result; } @@ -549,10 +554,14 @@ public final class ShardRouting implements Streamable, ToXContent { } else { sb.append("[R]"); } + sb.append(", v[").append(version).append("]"); if (this.restoreSource != null) { sb.append(", restoring[" + restoreSource + "]"); } sb.append(", s[").append(state).append("]"); + if (allocationId != null) { + sb.append(", a").append(allocationId); + } if (this.unassignedInfo != null) { sb.append(", ").append(unassignedInfo.toString()); } @@ -567,11 +576,16 @@ public final class ShardRouting implements Streamable, ToXContent { .field("node", currentNodeId()) .field("relocating_node", relocatingNodeId()) .field("shard", shardId().id()) - .field("index", shardId().index().name()); + .field("index", shardId().index().name()) + .field("version", version); + if (restoreSource() != null) { builder.field("restore_source"); restoreSource().toXContent(builder, params); } + if (allocationId != null) { + allocationId.toXContent(builder, params); + } if (unassignedInfo != null) { unassignedInfo.toXContent(builder, params); } diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java b/core/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java index 080a8340daf..6e9c5c88914 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java @@ -257,19 +257,22 @@ public class UnassignedInfo implements ToXContent, Writeable { return nextDelay == Long.MAX_VALUE ? 0l : nextDelay; } - @Override - public String toString() { + public String shortSummary() { StringBuilder sb = new StringBuilder(); - sb.append("unassigned_info[[reason=").append(reason).append("]"); + sb.append("[reason=").append(reason).append("]"); sb.append(", at[").append(DATE_TIME_FORMATTER.printer().print(timestamp)).append("]"); String details = getDetails(); if (details != null) { sb.append(", details[").append(details).append("]"); } - sb.append("]"); return sb.toString(); } + @Override + public String toString() { + return "unassigned_info[" + shortSummary() + "]"; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject("unassigned_info"); diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java index f17171d1796..7a560642ec0 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java @@ -22,7 +22,6 @@ package org.elasticsearch.cluster.routing.allocation; import com.carrotsearch.hppc.cursors.ObjectCursor; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; -import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.cluster.ClusterInfoService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -325,47 +324,50 @@ public class AllocationService extends AbstractComponent { for (ShardRouting startedShard : startedShardEntries) { assert startedShard.initializing(); - // retrieve the relocating node id before calling startedShard(). - String relocatingNodeId = null; + // validate index still exists. strictly speaking this is not needed but it gives clearer logs + if (routingNodes.routingTable().index(startedShard.index()) == null) { + logger.debug("{} ignoring shard started, unknown index (routing: {})", startedShard.shardId(), startedShard); + continue; + } + RoutingNodes.RoutingNodeIterator currentRoutingNode = routingNodes.routingNodeIter(startedShard.currentNodeId()); - if (currentRoutingNode != null) { - for (ShardRouting shard : currentRoutingNode) { - if (shard.shardId().equals(startedShard.shardId())) { - if (shard.equals(startedShard)) { - relocatingNodeId = shard.relocatingNodeId(); - dirty = true; - routingNodes.started(shard); - logger.trace("{} marked as started", shard); - } else { - logger.debug("failed to find shard [{}] in order to start it [no matching shard on node], ignoring", startedShard); - } - break; - } - } - } else { - logger.debug("failed to find shard [{}] in order to start it [failed to find node], ignoring", startedShard); + if (currentRoutingNode == null) { + logger.debug("{} failed to find shard in order to start it [failed to find node], ignoring (routing: {})", startedShard.shardId(), startedShard); + continue; + } + for (ShardRouting shard : currentRoutingNode) { + if (shard.allocationId().getId().equals(startedShard.allocationId().getId())) { + if (shard.active()) { + logger.trace("{} shard is already started, ignoring (routing: {})", startedShard.shardId(), startedShard); + } else { + dirty = true; + // override started shard with the latest copy. Capture it now , before starting the shard destroys it... + startedShard = new ShardRouting(shard); + routingNodes.started(shard); + logger.trace("{} marked shard as started (routing: {})", startedShard.shardId(), startedShard); + } + break; + } } // startedShard is the current state of the shard (post relocation for example) // this means that after relocation, the state will be started and the currentNodeId will be // the node we relocated to - - if (relocatingNodeId == null) { + if (startedShard.relocatingNodeId() == null) { continue; } - RoutingNodes.RoutingNodeIterator sourceRoutingNode = routingNodes.routingNodeIter(relocatingNodeId); + RoutingNodes.RoutingNodeIterator sourceRoutingNode = routingNodes.routingNodeIter(startedShard.relocatingNodeId()); if (sourceRoutingNode != null) { while (sourceRoutingNode.hasNext()) { ShardRouting shard = sourceRoutingNode.next(); - if (shard.shardId().equals(startedShard.shardId())) { - if (shard.relocating()) { - dirty = true; - sourceRoutingNode.remove(); - break; - } + if (shard.allocationId().getId().equals(startedShard.allocationId().getRelocationId())) { + assert shard.relocating() : "source shard for relocation is not marked as relocating. source " + shard + ", started target " + startedShard; + dirty = true; + sourceRoutingNode.remove(); + break; } } } @@ -378,133 +380,105 @@ public class AllocationService extends AbstractComponent { * require relocation. */ private boolean applyFailedShard(RoutingAllocation allocation, ShardRouting failedShard, boolean addToIgnoreList, UnassignedInfo unassignedInfo) { - // create a copy of the failed shard, since we assume we can change possible references to it without - // changing the state of failed shard - failedShard = new ShardRouting(failedShard); - IndexRoutingTable indexRoutingTable = allocation.routingTable().index(failedShard.index()); if (indexRoutingTable == null) { + logger.debug("{} ignoring shard failure, unknown index in {} ({})", failedShard.shardId(), failedShard, unassignedInfo.shortSummary()); return false; } RoutingNodes routingNodes = allocation.routingNodes(); - boolean dirty = false; - if (failedShard.relocatingNodeId() != null) { - // the shard is relocating, either in initializing (recovery from another node) or relocating (moving to another node) - if (failedShard.initializing()) { - // the shard is initializing and recovering from another node - // first, we need to cancel the current node that is being initialized - RoutingNodes.RoutingNodeIterator initializingNode = routingNodes.routingNodeIter(failedShard.currentNodeId()); - if (initializingNode != null) { - while (initializingNode.hasNext()) { - ShardRouting shardRouting = initializingNode.next(); - if (shardRouting.equals(failedShard)) { - dirty = true; - initializingNode.remove(); - if (addToIgnoreList) { - // make sure we ignore this shard on the relevant node - allocation.addIgnoreShardForNode(failedShard.shardId(), failedShard.currentNodeId()); - } - break; - } - } - } - if (dirty) { - logger.debug("failed shard {} found in routingNodes, failing it", failedShard); - // now, find the node that we are relocating *from*, and cancel its relocation - RoutingNode relocatingFromNode = routingNodes.node(failedShard.relocatingNodeId()); - if (relocatingFromNode != null) { - for (ShardRouting shardRouting : relocatingFromNode) { - if (shardRouting.shardId().equals(failedShard.shardId()) && shardRouting.relocating()) { - dirty = true; - routingNodes.cancelRelocation(shardRouting); - break; - } - } - } - } else { - logger.debug("failed shard {} not found in routingNodes, ignoring it", failedShard); - } - return dirty; - } else if (failedShard.relocating()) { - // the shard is relocating, meaning its the source the shard is relocating from - // first, we need to cancel the current relocation from the current node - // now, find the node that we are recovering from, cancel the relocation, remove it from the node - // and add it to the unassigned shards list... - RoutingNodes.RoutingNodeIterator relocatingFromNode = routingNodes.routingNodeIter(failedShard.currentNodeId()); - if (relocatingFromNode != null) { - while (relocatingFromNode.hasNext()) { - ShardRouting shardRouting = relocatingFromNode.next(); - if (shardRouting.equals(failedShard)) { - dirty = true; - if (addToIgnoreList) { - // make sure we ignore this shard on the relevant node - allocation.addIgnoreShardForNode(failedShard.shardId(), failedShard.currentNodeId()); - } - relocatingFromNode.moveToUnassigned(unassignedInfo); - break; - } - } - } - if (dirty) { - logger.debug("failed shard {} found in routingNodes, failing it", failedShard); - // next, we need to find the target initializing shard that is recovering from, and remove it... - RoutingNodes.RoutingNodeIterator initializingNode = routingNodes.routingNodeIter(failedShard.relocatingNodeId()); - if (initializingNode != null) { - while (initializingNode.hasNext()) { - ShardRouting shardRouting = initializingNode.next(); - if (shardRouting.shardId().equals(failedShard.shardId()) && shardRouting.initializing()) { - dirty = true; - initializingNode.remove(); - } - } - } - } else { - logger.debug("failed shard {} not found in routingNodes, ignoring it", failedShard); - } - } else { - throw new IllegalStateException("illegal state for a failed shard, relocating node id is set, but state does not match: " + failedShard); + RoutingNodes.RoutingNodeIterator matchedNode = routingNodes.routingNodeIter(failedShard.currentNodeId()); + if (matchedNode == null) { + logger.debug("{} ignoring shard failure, unknown node in {} ({})", failedShard.shardId(), failedShard, unassignedInfo.shortSummary()); + return false; + } + + boolean matchedShard = false; + while (matchedNode.hasNext()) { + ShardRouting routing = matchedNode.next(); + if (routing.allocationId().getId().equals(failedShard.allocationId().getId())) { + matchedShard = true; + logger.debug("{} failed shard {} found in routingNodes, failing it ({})", failedShard.shardId(), failedShard, unassignedInfo.shortSummary()); + break; } - } else { - // the shard is not relocating, its either started, or initializing, just cancel it and move on... - RoutingNodes.RoutingNodeIterator node = routingNodes.routingNodeIter(failedShard.currentNodeId()); - if (node != null) { - while (node.hasNext()) { - ShardRouting shardRouting = node.next(); - if (shardRouting.equals(failedShard)) { - dirty = true; - if (addToIgnoreList) { - // make sure we ignore this shard on the relevant node - allocation.addIgnoreShardForNode(failedShard.shardId(), failedShard.currentNodeId()); - } - // move all the shards matching the failed shard to the end of the unassigned list - // so we give a chance for other allocations and won't create poison failed allocations - // that can keep other shards from being allocated (because of limits applied on how many - // shards we can start per node) - List shardsToMove = Lists.newArrayList(); - for (Iterator unassignedIt = routingNodes.unassigned().iterator(); unassignedIt.hasNext(); ) { - ShardRouting unassignedShardRouting = unassignedIt.next(); - if (unassignedShardRouting.shardId().equals(failedShard.shardId())) { - unassignedIt.remove(); - shardsToMove.add(unassignedShardRouting); - } - } - if (!shardsToMove.isEmpty()) { - routingNodes.unassigned().addAll(shardsToMove); - } + } - node.moveToUnassigned(unassignedInfo); + if (matchedShard == false) { + logger.debug("{} ignoring shard failure, unknown allocation id in {} ({})", failedShard.shardId(), failedShard, unassignedInfo.shortSummary()); + return false; + } + + // replace incoming instance to make sure we work on the latest one. Copy it to maintain information during modifications. + failedShard = new ShardRouting(matchedNode.current()); + + // remove the current copy of the shard + matchedNode.remove(); + + if (addToIgnoreList) { + // make sure we ignore this shard on the relevant node + allocation.addIgnoreShardForNode(failedShard.shardId(), failedShard.currentNodeId()); + } + + if (failedShard.relocatingNodeId() != null && failedShard.initializing()) { + // The shard is a target of a relocating shard. In that case we only + // need to remove the target shard and cancel the source relocation. + // No shard is left unassigned + logger.trace("{} is a relocation target, resolving source to cancel relocation ({})", failedShard, unassignedInfo.shortSummary()); + RoutingNode relocatingFromNode = routingNodes.node(failedShard.relocatingNodeId()); + if (relocatingFromNode != null) { + for (ShardRouting shardRouting : relocatingFromNode) { + if (shardRouting.allocationId().getId().equals(failedShard.allocationId().getRelocationId())) { + logger.trace("{}, resolved source to [{}]. canceling relocation ... ({})", failedShard.shardId(), shardRouting, unassignedInfo.shortSummary()); + routingNodes.cancelRelocation(shardRouting); break; } } } - if (dirty) { - logger.debug("failed shard {} found in routingNodes and failed", failedShard); - } else { - logger.debug("failed shard {} not found in routingNodes, ignoring it", failedShard); + } else { + // The fail shard is the main copy of the current shard routing. Any + // relocation will be cancelled (and the target shard removed as well) + // and the shard copy needs to be marked as unassigned + + if (failedShard.relocatingNodeId() != null) { + // handle relocation source shards. we need to find the target initializing shard that is recovering from, and remove it... + assert failedShard.initializing() == false; // should have been dealt with and returned + assert failedShard.relocating(); + + RoutingNodes.RoutingNodeIterator initializingNode = routingNodes.routingNodeIter(failedShard.relocatingNodeId()); + if (initializingNode != null) { + while (initializingNode.hasNext()) { + ShardRouting shardRouting = initializingNode.next(); + if (shardRouting.allocationId().getId().equals(failedShard.allocationId().getRelocationId())) { + assert shardRouting.initializing() : shardRouting; + assert failedShard.allocationId().getId().equals(shardRouting.allocationId().getRelocationId()) + : "found target shard's allocation relocation id is different than source"; + logger.trace("{} is removed due to the failure of the source shard", shardRouting); + initializingNode.remove(); + } + } + } } + + // move all the shards matching the failed shard to the end of the unassigned list + // so we give a chance for other allocations and won't create poison failed allocations + // that can keep other shards from being allocated (because of limits applied on how many + // shards we can start per node) + List shardsToMove = Lists.newArrayList(); + for (Iterator unassignedIt = routingNodes.unassigned().iterator(); unassignedIt.hasNext(); ) { + ShardRouting unassignedShardRouting = unassignedIt.next(); + if (unassignedShardRouting.shardId().equals(failedShard.shardId())) { + unassignedIt.remove(); + shardsToMove.add(unassignedShardRouting); + } + } + if (!shardsToMove.isEmpty()) { + routingNodes.unassigned().addAll(shardsToMove); + } + + matchedNode.moveToUnassigned(unassignedInfo); } - return dirty; + assert matchedNode.isRemoved() : "failedShard " + failedShard + " was matched but wasn't removed"; + return true; } } diff --git a/core/src/test/java/org/elasticsearch/ExceptionSerializationTests.java b/core/src/test/java/org/elasticsearch/ExceptionSerializationTests.java index 6381532c997..32c0382e4c4 100644 --- a/core/src/test/java/org/elasticsearch/ExceptionSerializationTests.java +++ b/core/src/test/java/org/elasticsearch/ExceptionSerializationTests.java @@ -192,17 +192,18 @@ public class ExceptionSerializationTests extends ElasticsearchTestCase { } public void testIllegalShardRoutingStateException() throws IOException { - ShardRouting routing = TestShardRouting.newShardRouting("test", 0, "xyz", "def", false, ShardRoutingState.STARTED, 0); + final ShardRouting routing = TestShardRouting.newShardRouting("test", 0, "xyz", "def", false, ShardRoutingState.STARTED, 0); + final String routingAsString = routing.toString(); IllegalShardRoutingStateException serialize = serialize(new IllegalShardRoutingStateException(routing, "foo", new NullPointerException())); assertNotNull(serialize.shard()); assertEquals(routing, serialize.shard()); - assertEquals("[test][0], node[xyz], relocating [def], [R], s[STARTED]: foo", serialize.getMessage()); + assertEquals(routingAsString + ": foo", serialize.getMessage()); assertTrue(serialize.getCause() instanceof NullPointerException); serialize = serialize(new IllegalShardRoutingStateException(routing, "bar", null)); assertNotNull(serialize.shard()); assertEquals(routing, serialize.shard()); - assertEquals("[test][0], node[xyz], relocating [def], [R], s[STARTED]: bar", serialize.getMessage()); + assertEquals(routingAsString + ": bar", serialize.getMessage()); assertNull(serialize.getCause()); } diff --git a/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTest.java b/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTest.java deleted file mode 100644 index de81e852385..00000000000 --- a/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTest.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.cluster.action.shard; - -import org.elasticsearch.Version; -import org.elasticsearch.cluster.ClusterName; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.cluster.metadata.MetaData; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.cluster.routing.*; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.transport.DummyTransportAddress; -import org.elasticsearch.test.ElasticsearchTestCase; - -import java.util.ArrayList; -import java.util.List; - -import static org.hamcrest.Matchers.equalTo; - - -public class ShardStateActionTest extends ElasticsearchTestCase { - - public void testShardFiltering() { - final IndexMetaData indexMetaData = IndexMetaData.builder("test") - .settings(Settings.builder() - .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) - .put(IndexMetaData.SETTING_INDEX_UUID, "test_uuid")) - .numberOfShards(2).numberOfReplicas(0) - .build(); - ClusterState.Builder stateBuilder = ClusterState.builder(ClusterName.DEFAULT) - .nodes(DiscoveryNodes.builder() - .put(new DiscoveryNode("node1", DummyTransportAddress.INSTANCE, Version.CURRENT)).masterNodeId("node1") - .put(new DiscoveryNode("node2", DummyTransportAddress.INSTANCE, Version.CURRENT)) - ) - .metaData(MetaData.builder().put(indexMetaData, false)); - - final ShardRouting initShard = TestShardRouting.newShardRouting("test", 0, "node1", randomBoolean(), ShardRoutingState.INITIALIZING, 1); - final ShardRouting startedShard = TestShardRouting.newShardRouting("test", 1, "node2", randomBoolean(), ShardRoutingState.STARTED, 1); - final ShardRouting relocatingShard = TestShardRouting.newShardRouting("test", 2, "node1", "node2", randomBoolean(), ShardRoutingState.RELOCATING, 1); - stateBuilder.routingTable(RoutingTable.builder().add(IndexRoutingTable.builder("test") - .addIndexShard(new IndexShardRoutingTable.Builder(initShard.shardId(), true).addShard(initShard).build()) - .addIndexShard(new IndexShardRoutingTable.Builder(startedShard.shardId(), true).addShard(startedShard).build()) - .addIndexShard(new IndexShardRoutingTable.Builder(relocatingShard.shardId(), true).addShard(relocatingShard).build()))); - - ClusterState state = stateBuilder.build(); - - ArrayList listToFilter = new ArrayList<>(); - ArrayList expectedToBeApplied = new ArrayList<>(); - - listToFilter.add(new ShardStateAction.ShardRoutingEntry(initShard, indexMetaData.indexUUID() + "_suffix", "wrong_uuid", null)); - - listToFilter.add(new ShardStateAction.ShardRoutingEntry(relocatingShard.buildTargetRelocatingShard(), indexMetaData.indexUUID(), "relocating_to_node", null)); - expectedToBeApplied.add(listToFilter.get(listToFilter.size() - 1)); - - listToFilter.add(new ShardStateAction.ShardRoutingEntry(startedShard, indexMetaData.indexUUID(), "started shard", null)); - expectedToBeApplied.add(listToFilter.get(listToFilter.size() - 1)); - - listToFilter.add(new ShardStateAction.ShardRoutingEntry(TestShardRouting.newShardRouting(initShard.index() + "_NA", initShard.id(), - initShard.currentNodeId(), initShard.primary(), initShard.state(), initShard.version()), indexMetaData.indexUUID(), "wrong_uuid", null)); - - List toBeApplied = ShardStateAction.extractShardsToBeApplied(listToFilter, "for testing", state.metaData(), logger); - if (toBeApplied.size() != expectedToBeApplied.size()) { - fail("size mismatch.\n Got: \n [" + toBeApplied + "], \n expected: \n [" + expectedToBeApplied + "]"); - } - for (int i = 0; i < toBeApplied.size(); i++) { - final ShardStateAction.ShardRoutingEntry found = toBeApplied.get(i); - final ShardStateAction.ShardRoutingEntry expected = expectedToBeApplied.get(i); - assertThat(found, equalTo(expected)); - } - } -} \ No newline at end of file diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/AllocationIdTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/AllocationIdTests.java index 74a8b2fde46..2f64f39f059 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/AllocationIdTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/AllocationIdTests.java @@ -62,16 +62,15 @@ public class AllocationIdTests extends ElasticsearchTestCase { assertThat(shard.allocationId(), not(equalTo(allocationId))); assertThat(shard.allocationId().getId(), equalTo(allocationId.getId())); assertThat(shard.allocationId().getRelocationId(), notNullValue()); - allocationId = shard.allocationId(); ShardRouting target = shard.buildTargetRelocatingShard(); assertThat(target.allocationId().getId(), equalTo(shard.allocationId().getRelocationId())); - assertThat(target.allocationId().getRelocationId(), nullValue()); + assertThat(target.allocationId().getRelocationId(), equalTo(shard.allocationId().getId())); logger.info("-- finalize the relocation"); - shard.moveToStarted(); - assertThat(shard.allocationId().getId(), equalTo(target.allocationId().getId())); - assertThat(shard.allocationId().getRelocationId(), nullValue()); + target.moveToStarted(); + assertThat(target.allocationId().getId(), equalTo(shard.allocationId().getRelocationId())); + assertThat(target.allocationId().getRelocationId(), nullValue()); } @Test diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/TestShardRouting.java b/core/src/test/java/org/elasticsearch/cluster/routing/TestShardRouting.java index 99d3d8b3a0b..82d3afc6e91 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/TestShardRouting.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/TestShardRouting.java @@ -33,6 +33,10 @@ public class TestShardRouting { return new ShardRouting(index, shardId, currentNodeId, relocatingNodeId, null, primary, state, version, null, buildAllocationId(state), true); } + public static ShardRouting newShardRouting(String index, int shardId, String currentNodeId, String relocatingNodeId, boolean primary, ShardRoutingState state, AllocationId allocationId, long version) { + return new ShardRouting(index, shardId, currentNodeId, relocatingNodeId, null, primary, state, version, null, allocationId, true); + } + public static ShardRouting newShardRouting(String index, int shardId, String currentNodeId, String relocatingNodeId, RestoreSource restoreSource, boolean primary, ShardRoutingState state, long version) { return new ShardRouting(index, shardId, currentNodeId, relocatingNodeId, restoreSource, primary, state, version, null, buildAllocationId(state), true); } diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java index f6fc184bb1f..569018c431d 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java @@ -256,9 +256,9 @@ public class FailedShardsRoutingTests extends ElasticsearchAllocationTestCase { logger.info("fail the first shard, will have no place to be rerouted to (single node), so stays unassigned"); prevRoutingTable = routingTable; - routingTable = strategy.applyFailedShard(clusterState, TestShardRouting.newShardRouting("test", 0, "node1", true, INITIALIZING, 0)).routingTable(); + ShardRouting firstShard = clusterState.routingNodes().node("node1").get(0); + routingTable = strategy.applyFailedShard(clusterState, firstShard).routingTable(); clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); - RoutingNodes routingNodes = clusterState.routingNodes(); assertThat(prevRoutingTable != routingTable, equalTo(true)); assertThat(routingTable.index("test").shards().size(), equalTo(1)); @@ -272,7 +272,7 @@ public class FailedShardsRoutingTests extends ElasticsearchAllocationTestCase { } logger.info("fail the shard again, see that nothing happens"); - assertThat(strategy.applyFailedShard(clusterState, TestShardRouting.newShardRouting("test", 0, "node1", true, INITIALIZING, 0)).changed(), equalTo(false)); + assertThat(strategy.applyFailedShard(clusterState, firstShard).changed(), equalTo(false)); } @Test @@ -371,9 +371,9 @@ public class FailedShardsRoutingTests extends ElasticsearchAllocationTestCase { logger.info("fail the first shard, will start INITIALIZING on the second node"); prevRoutingTable = routingTable; - routingTable = strategy.applyFailedShard(clusterState, TestShardRouting.newShardRouting("test", 0, nodeHoldingPrimary, true, INITIALIZING, 0)).routingTable(); + final ShardRouting firstShard = clusterState.routingNodes().node("node1").get(0); + routingTable = strategy.applyFailedShard(clusterState, firstShard).routingTable(); clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); - RoutingNodes routingNodes = clusterState.routingNodes(); assertThat(prevRoutingTable != routingTable, equalTo(true)); assertThat(routingTable.index("test").shards().size(), equalTo(1)); @@ -387,7 +387,7 @@ public class FailedShardsRoutingTests extends ElasticsearchAllocationTestCase { } logger.info("fail the shard again, see that nothing happens"); - assertThat(strategy.applyFailedShard(clusterState, TestShardRouting.newShardRouting("test", 0, nodeHoldingPrimary, true, INITIALIZING, 0)).changed(), equalTo(false)); + assertThat(strategy.applyFailedShard(clusterState, firstShard).changed(), equalTo(false)); } @Test diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/StartedShardsRoutingTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/StartedShardsRoutingTests.java index 9a36ba3d3a0..506d3d29e0f 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/StartedShardsRoutingTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/StartedShardsRoutingTests.java @@ -64,68 +64,57 @@ public class StartedShardsRoutingTests extends ElasticsearchAllocationTestCase { RoutingAllocation.Result result = allocation.applyStartedShards(state, Arrays.asList( TestShardRouting.newShardRouting(initShard.index(), initShard.id(), initShard.currentNodeId(), initShard.relocatingNodeId(), initShard.primary(), - ShardRoutingState.INITIALIZING, randomInt())), false); + ShardRoutingState.INITIALIZING, initShard.allocationId(), randomInt())), false); assertTrue("failed to start " + initShard + "\ncurrent routing table:" + result.routingTable().prettyPrint(), result.changed()); assertTrue(initShard + "isn't started \ncurrent routing table:" + result.routingTable().prettyPrint(), result.routingTable().index("test").shard(initShard.id()).allShardsStarted()); - logger.info("--> testing shard variants that shouldn't match the started shard"); + logger.info("--> testing shard variants that shouldn't match the initializing shard"); result = allocation.applyStartedShards(state, Arrays.asList( - TestShardRouting.newShardRouting(initShard.index(), initShard.id(), initShard.currentNodeId(), initShard.relocatingNodeId(), !initShard.primary(), + TestShardRouting.newShardRouting(initShard.index(), initShard.id(), initShard.currentNodeId(), initShard.relocatingNodeId(), initShard.primary(), ShardRoutingState.INITIALIZING, 1)), false); - assertFalse("wrong primary flag shouldn't start shard " + initShard + "\ncurrent routing table:" + result.routingTable().prettyPrint(), result.changed()); + assertFalse("wrong allocation id flag shouldn't start shard " + initShard + "\ncurrent routing table:" + result.routingTable().prettyPrint(), result.changed()); result = allocation.applyStartedShards(state, Arrays.asList( TestShardRouting.newShardRouting(initShard.index(), initShard.id(), "some_node", initShard.currentNodeId(), initShard.primary(), - ShardRoutingState.INITIALIZING, 1)), false); + ShardRoutingState.INITIALIZING, AllocationId.newTargetRelocation(AllocationId.newRelocation(initShard.allocationId())) + , 1)), false); assertFalse("relocating shard from node shouldn't start shard " + initShard + "\ncurrent routing table:" + result.routingTable().prettyPrint(), result.changed()); - result = allocation.applyStartedShards(state, Arrays.asList( - TestShardRouting.newShardRouting(initShard.index(), initShard.id(), initShard.currentNodeId(), "some_node", initShard.primary(), - ShardRoutingState.INITIALIZING, 1)), false); - assertFalse("relocating shard to node shouldn't start shard " + initShard + "\ncurrent routing table:" + result.routingTable().prettyPrint(), result.changed()); logger.info("--> testing double starting"); result = allocation.applyStartedShards(state, Arrays.asList( TestShardRouting.newShardRouting(startedShard.index(), startedShard.id(), startedShard.currentNodeId(), startedShard.relocatingNodeId(), startedShard.primary(), - ShardRoutingState.INITIALIZING, 1)), false); + ShardRoutingState.INITIALIZING, startedShard.allocationId(), 1)), false); assertFalse("duplicate starting of the same shard should be ignored \ncurrent routing table:" + result.routingTable().prettyPrint(), result.changed()); logger.info("--> testing starting of relocating shards"); + final AllocationId targetAllocationId = AllocationId.newTargetRelocation(relocatingShard.allocationId()); result = allocation.applyStartedShards(state, Arrays.asList( TestShardRouting.newShardRouting(relocatingShard.index(), relocatingShard.id(), relocatingShard.relocatingNodeId(), relocatingShard.currentNodeId(), relocatingShard.primary(), - ShardRoutingState.INITIALIZING, randomInt())), false); + ShardRoutingState.INITIALIZING, targetAllocationId, randomInt())), false); + assertTrue("failed to start " + relocatingShard + "\ncurrent routing table:" + result.routingTable().prettyPrint(), result.changed()); ShardRouting shardRouting = result.routingTable().index("test").shard(relocatingShard.id()).getShards().get(0); assertThat(shardRouting.state(), equalTo(ShardRoutingState.STARTED)); assertThat(shardRouting.currentNodeId(), equalTo("node2")); assertThat(shardRouting.relocatingNodeId(), nullValue()); - logger.info("--> testing shard variants that shouldn't match the relocating shard"); + logger.info("--> testing shard variants that shouldn't match the initializing relocating shard"); result = allocation.applyStartedShards(state, Arrays.asList( - TestShardRouting.newShardRouting(relocatingShard.index(), relocatingShard.id(), relocatingShard.relocatingNodeId(), relocatingShard.currentNodeId(), !relocatingShard.primary(), - ShardRoutingState.INITIALIZING, 1)), false); - assertFalse("wrong primary flag shouldn't start shard " + relocatingShard + "\ncurrent routing table:" + result.routingTable().prettyPrint(), result.changed()); + TestShardRouting.newShardRouting(relocatingShard.index(), relocatingShard.id(), relocatingShard.relocatingNodeId(), relocatingShard.currentNodeId(), relocatingShard.primary(), + ShardRoutingState.INITIALIZING, relocatingShard.version()))); + assertFalse("wrong allocation id shouldn't start shard" + relocatingShard + "\ncurrent routing table:" + result.routingTable().prettyPrint(), result.changed()); result = allocation.applyStartedShards(state, Arrays.asList( - TestShardRouting.newShardRouting(relocatingShard.index(), relocatingShard.id(), "some_node", relocatingShard.currentNodeId(), relocatingShard.primary(), - ShardRoutingState.INITIALIZING, 1)), false); - assertFalse("relocating shard to a different node shouldn't start shard " + relocatingShard + "\ncurrent routing table:" + result.routingTable().prettyPrint(), result.changed()); - - result = allocation.applyStartedShards(state, Arrays.asList( - TestShardRouting.newShardRouting(relocatingShard.index(), relocatingShard.id(), relocatingShard.relocatingNodeId(), "some_node", relocatingShard.primary(), - ShardRoutingState.INITIALIZING, 1)), false); - assertFalse("relocating shard from a different node shouldn't start shard " + relocatingShard + "\ncurrent routing table:" + result.routingTable().prettyPrint(), result.changed()); - - result = allocation.applyStartedShards(state, Arrays.asList( - TestShardRouting.newShardRouting(relocatingShard.index(), relocatingShard.id(), relocatingShard.relocatingNodeId(), relocatingShard.primary(), - ShardRoutingState.INITIALIZING, 1)), false); - assertFalse("non-relocating shard shouldn't start shard" + relocatingShard + "\ncurrent routing table:" + result.routingTable().prettyPrint(), result.changed()); + TestShardRouting.newShardRouting(relocatingShard.index(), relocatingShard.id(), relocatingShard.relocatingNodeId(), relocatingShard.currentNodeId(), relocatingShard.primary(), + ShardRoutingState.INITIALIZING, relocatingShard.allocationId(), randomInt())), false); + assertFalse("wrong allocation id shouldn't start shard even if relocatingId==shard.id" + relocatingShard + "\ncurrent routing table:" + result.routingTable().prettyPrint(), result.changed()); } }