From 275848eb9bc51514f76ac38ffcfbd9e740f47cbd Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Thu, 16 Jul 2015 16:45:37 +0200 Subject: [PATCH] Allocation: use recently added allocation ids for shard started/failed On top of that: 1) A relocation target shards' allocation id is changed to include the allocation id of the source shard under relocatingId (similar to shard routing semantics) 2) The logic around state change for finalize shard relocation is simplified - one simple start the target shard (we previously had unused logic around relocating state) Closes #12299 --- .../action/shard/ShardStateAction.java | 40 +-- .../cluster/routing/AllocationId.java | 35 ++- .../cluster/routing/RoutingNodes.java | 24 +- .../cluster/routing/ShardRouting.java | 20 +- .../cluster/routing/UnassignedInfo.java | 11 +- .../routing/allocation/AllocationService.java | 256 ++++++++---------- .../ExceptionSerializationTests.java | 7 +- .../action/shard/ShardStateActionTest.java | 90 ------ .../cluster/routing/AllocationIdTests.java | 9 +- .../cluster/routing/TestShardRouting.java | 4 + .../allocation/FailedShardsRoutingTests.java | 12 +- .../allocation/StartedShardsRoutingTests.java | 45 ++- 12 files changed, 224 insertions(+), 329 deletions(-) delete mode 100644 core/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTest.java diff --git a/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java b/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java index 36ae62e437a..41ddb49bb65 100644 --- a/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java +++ b/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java @@ -145,17 +145,12 @@ public class ShardStateAction extends AbstractComponent { return currentState; } - final MetaData metaData = currentState.getMetaData(); - - List shardRoutingsToBeApplied = new ArrayList<>(shardRoutingEntries.size()); - for (ShardRoutingEntry entry : extractShardsToBeApplied(shardRoutingEntries, "failed", metaData, logger)) { - shardRoutingsToBeApplied.add(new FailedRerouteAllocation.FailedShard(entry.shardRouting, entry.message, entry.failure)); - } // mark all entries as processed for (ShardRoutingEntry entry : shardRoutingEntries) { entry.processed = true; + shardRoutingsToBeApplied.add(new FailedRerouteAllocation.FailedShard(entry.shardRouting, entry.message, entry.failure)); } RoutingAllocation.Result routingResult = allocationService.applyFailedShards(currentState, shardRoutingsToBeApplied); @@ -180,31 +175,6 @@ public class ShardStateAction extends AbstractComponent { }); } - static List extractShardsToBeApplied(List shardRoutingEntries, String type, MetaData metaData, ESLogger logger) { - List shardRoutingsToBeApplied = new ArrayList<>(shardRoutingEntries.size()); - for (int i = 0; i < shardRoutingEntries.size(); i++) { - ShardRoutingEntry shardRoutingEntry = shardRoutingEntries.get(i); - ShardRouting shardRouting = shardRoutingEntry.shardRouting; - IndexMetaData indexMetaData = metaData.index(shardRouting.index()); - // if there is no metadata or the current index is not of the right uuid, the index has been deleted while it was being allocated - // which is fine, we should just ignore this - if (indexMetaData == null) { - logger.debug("{} ignoring shard {}, unknown index in {}", shardRouting.shardId(), type, shardRoutingEntry); - continue; - } - if (!indexMetaData.isSameUUID(shardRoutingEntry.indexUUID)) { - logger.debug("{} ignoring shard {}, different index uuid, current {}, got {}", shardRouting.shardId(), type, indexMetaData.getIndexUUID(), shardRoutingEntry); - continue; - } - - // more debug info will be logged by the allocation service - logger.trace("{} will apply shard {} {}", shardRouting.shardId(), type, shardRoutingEntry); - shardRoutingsToBeApplied.add(shardRoutingEntry); - } - return shardRoutingsToBeApplied; - - } - private void shardStartedOnMaster(final ShardRoutingEntry shardRoutingEntry) { logger.debug("received shard started for {}", shardRoutingEntry); // buffer shard started requests, and the state update tasks will simply drain it @@ -230,18 +200,12 @@ public class ShardStateAction extends AbstractComponent { return currentState; } - RoutingTable routingTable = currentState.routingTable(); - MetaData metaData = currentState.getMetaData(); - - List shardRoutingToBeApplied = new ArrayList<>(shardRoutingEntries.size()); - for (ShardRoutingEntry entry : extractShardsToBeApplied(shardRoutingEntries, "started", metaData, logger)) { - shardRoutingToBeApplied.add(entry.shardRouting); - } // mark all entries as processed for (ShardRoutingEntry entry : shardRoutingEntries) { entry.processed = true; + shardRoutingToBeApplied.add(entry.shardRouting); } if (shardRoutingToBeApplied.isEmpty()) { diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/AllocationId.java b/core/src/main/java/org/elasticsearch/cluster/routing/AllocationId.java index 55e7ca729b7..ffcf9f1e80c 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/AllocationId.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/AllocationId.java @@ -22,6 +22,8 @@ package org.elasticsearch.cluster.routing; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; import java.io.IOException; @@ -34,7 +36,7 @@ import java.io.IOException; * relocationId. Once relocation is done, the new allocation id is set to the relocationId. This is similar * behavior to how ShardRouting#currentNodeId is used. */ -public class AllocationId { +public class AllocationId implements ToXContent { private final String id; private final String relocationId; @@ -67,7 +69,7 @@ public class AllocationId { */ public static AllocationId newTargetRelocation(AllocationId allocationId) { assert allocationId.getRelocationId() != null; - return new AllocationId(allocationId.getRelocationId(), null); + return new AllocationId(allocationId.getRelocationId(), allocationId.getId()); } /** @@ -81,19 +83,24 @@ public class AllocationId { /** * Creates a new allocation id representing a cancelled relocation. - */ + * + * Note that this is expected to be called on the allocation id + * of the *source* shard + * */ public static AllocationId cancelRelocation(AllocationId allocationId) { assert allocationId.getRelocationId() != null; return new AllocationId(allocationId.getId(), null); } /** - * Creates a new allocation id finalizing a relocation, moving the transient - * relocation id to be the actual id. + * Creates a new allocation id finalizing a relocation. + * + * Note that this is expected to be called on the allocation id + * of the *target* shard and thus it only needs to clear the relocating id. */ public static AllocationId finishRelocation(AllocationId allocationId) { assert allocationId.getRelocationId() != null; - return new AllocationId(allocationId.getRelocationId(), null); + return new AllocationId(allocationId.getId(), null); } /** @@ -126,4 +133,20 @@ public class AllocationId { result = 31 * result + (relocationId != null ? relocationId.hashCode() : 0); return result; } + + @Override + public String toString() { + return "[id=" + id + (relocationId == null ? "" : ", rId=" + relocationId) + "]"; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject("allocation_id"); + builder.field("id", id); + if (relocationId != null) { + builder.field("relocation_id", relocationId); + } + builder.endObject(); + return builder; + } } diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java b/core/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java index efe3f676f78..b348afbc192 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java @@ -394,15 +394,14 @@ public class RoutingNodes implements Iterable { * Mark a shard as started and adjusts internal statistics. */ public void started(ShardRouting shard) { - if (!shard.active() && shard.relocatingNodeId() == null) { + assert !shard.active() : "expected an intializing shard " + shard; + if (shard.relocatingNodeId() == null) { + // if this is not a target shard for relocation, we need to update statistics inactiveShardCount--; if (shard.primary()) { inactivePrimaryCount--; } - } else if (shard.relocating()) { - relocatingShards--; } - assert !shard.started(); shard.moveToStarted(); } @@ -757,6 +756,7 @@ public class RoutingNodes implements Iterable { private final RoutingNode iterable; private ShardRouting shard; private final Iterator delegate; + private boolean removed = false; public RoutingNodeIterator(RoutingNode iterable) { this.delegate = iterable.mutableIterator(); @@ -770,6 +770,7 @@ public class RoutingNodes implements Iterable { @Override public ShardRouting next() { + removed = false; return shard = delegate.next(); } @@ -777,6 +778,13 @@ public class RoutingNodes implements Iterable { public void remove() { delegate.remove(); RoutingNodes.this.remove(shard); + removed = true; + } + + + /** returns true if {@link #remove()} or {@link #moveToUnassigned(UnassignedInfo)} were called on the current shard */ + public boolean isRemoved() { + return removed; } @Override @@ -785,10 +793,16 @@ public class RoutingNodes implements Iterable { } public void moveToUnassigned(UnassignedInfo unassignedInfo) { - remove(); + if (isRemoved() == false) { + remove(); + } ShardRouting unassigned = new ShardRouting(shard); // protective copy of the mutable shard unassigned.moveToUnassigned(unassignedInfo); unassigned().add(unassigned); } + + public ShardRouting current() { + return shard; + } } } diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/ShardRouting.java b/core/src/main/java/org/elasticsearch/cluster/routing/ShardRouting.java index 7165fda8a4d..1cbacda02ee 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/ShardRouting.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/ShardRouting.java @@ -441,11 +441,12 @@ public final class ShardRouting implements Streamable, ToXContent { void moveToStarted() { ensureNotFrozen(); version++; - assert state == ShardRoutingState.INITIALIZING || state == ShardRoutingState.RELOCATING : this; + assert state == ShardRoutingState.INITIALIZING : "expected an initializing shard " + this; relocatingNodeId = null; restoreSource = null; unassignedInfo = null; // we keep the unassigned data until the shard is started - if (state == ShardRoutingState.RELOCATING) { + if (allocationId.getRelocationId() != null) { + // target relocation allocationId = AllocationId.finishRelocation(allocationId); } state = ShardRoutingState.STARTED; @@ -502,6 +503,9 @@ public final class ShardRouting implements Streamable, ToXContent { if (relocatingNodeId != null ? !relocatingNodeId.equals(that.relocatingNodeId) : that.relocatingNodeId != null) { return false; } + if (allocationId != null ? !allocationId.equals(that.allocationId) : that.allocationId != null) { + return false; + } if (state != that.state) { return false; } @@ -526,6 +530,7 @@ public final class ShardRouting implements Streamable, ToXContent { result = 31 * result + (primary ? 1 : 0); result = 31 * result + (state != null ? state.hashCode() : 0); result = 31 * result + (restoreSource != null ? restoreSource.hashCode() : 0); + result = 31 * result + (allocationId != null ? allocationId.hashCode() : 0); return hashCode = result; } @@ -549,10 +554,14 @@ public final class ShardRouting implements Streamable, ToXContent { } else { sb.append("[R]"); } + sb.append(", v[").append(version).append("]"); if (this.restoreSource != null) { sb.append(", restoring[" + restoreSource + "]"); } sb.append(", s[").append(state).append("]"); + if (allocationId != null) { + sb.append(", a").append(allocationId); + } if (this.unassignedInfo != null) { sb.append(", ").append(unassignedInfo.toString()); } @@ -567,11 +576,16 @@ public final class ShardRouting implements Streamable, ToXContent { .field("node", currentNodeId()) .field("relocating_node", relocatingNodeId()) .field("shard", shardId().id()) - .field("index", shardId().index().name()); + .field("index", shardId().index().name()) + .field("version", version); + if (restoreSource() != null) { builder.field("restore_source"); restoreSource().toXContent(builder, params); } + if (allocationId != null) { + allocationId.toXContent(builder, params); + } if (unassignedInfo != null) { unassignedInfo.toXContent(builder, params); } diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java b/core/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java index 080a8340daf..6e9c5c88914 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java @@ -257,19 +257,22 @@ public class UnassignedInfo implements ToXContent, Writeable { return nextDelay == Long.MAX_VALUE ? 0l : nextDelay; } - @Override - public String toString() { + public String shortSummary() { StringBuilder sb = new StringBuilder(); - sb.append("unassigned_info[[reason=").append(reason).append("]"); + sb.append("[reason=").append(reason).append("]"); sb.append(", at[").append(DATE_TIME_FORMATTER.printer().print(timestamp)).append("]"); String details = getDetails(); if (details != null) { sb.append(", details[").append(details).append("]"); } - sb.append("]"); return sb.toString(); } + @Override + public String toString() { + return "unassigned_info[" + shortSummary() + "]"; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject("unassigned_info"); diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java index f17171d1796..7a560642ec0 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java @@ -22,7 +22,6 @@ package org.elasticsearch.cluster.routing.allocation; import com.carrotsearch.hppc.cursors.ObjectCursor; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; -import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.cluster.ClusterInfoService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -325,47 +324,50 @@ public class AllocationService extends AbstractComponent { for (ShardRouting startedShard : startedShardEntries) { assert startedShard.initializing(); - // retrieve the relocating node id before calling startedShard(). - String relocatingNodeId = null; + // validate index still exists. strictly speaking this is not needed but it gives clearer logs + if (routingNodes.routingTable().index(startedShard.index()) == null) { + logger.debug("{} ignoring shard started, unknown index (routing: {})", startedShard.shardId(), startedShard); + continue; + } + RoutingNodes.RoutingNodeIterator currentRoutingNode = routingNodes.routingNodeIter(startedShard.currentNodeId()); - if (currentRoutingNode != null) { - for (ShardRouting shard : currentRoutingNode) { - if (shard.shardId().equals(startedShard.shardId())) { - if (shard.equals(startedShard)) { - relocatingNodeId = shard.relocatingNodeId(); - dirty = true; - routingNodes.started(shard); - logger.trace("{} marked as started", shard); - } else { - logger.debug("failed to find shard [{}] in order to start it [no matching shard on node], ignoring", startedShard); - } - break; - } - } - } else { - logger.debug("failed to find shard [{}] in order to start it [failed to find node], ignoring", startedShard); + if (currentRoutingNode == null) { + logger.debug("{} failed to find shard in order to start it [failed to find node], ignoring (routing: {})", startedShard.shardId(), startedShard); + continue; + } + for (ShardRouting shard : currentRoutingNode) { + if (shard.allocationId().getId().equals(startedShard.allocationId().getId())) { + if (shard.active()) { + logger.trace("{} shard is already started, ignoring (routing: {})", startedShard.shardId(), startedShard); + } else { + dirty = true; + // override started shard with the latest copy. Capture it now , before starting the shard destroys it... + startedShard = new ShardRouting(shard); + routingNodes.started(shard); + logger.trace("{} marked shard as started (routing: {})", startedShard.shardId(), startedShard); + } + break; + } } // startedShard is the current state of the shard (post relocation for example) // this means that after relocation, the state will be started and the currentNodeId will be // the node we relocated to - - if (relocatingNodeId == null) { + if (startedShard.relocatingNodeId() == null) { continue; } - RoutingNodes.RoutingNodeIterator sourceRoutingNode = routingNodes.routingNodeIter(relocatingNodeId); + RoutingNodes.RoutingNodeIterator sourceRoutingNode = routingNodes.routingNodeIter(startedShard.relocatingNodeId()); if (sourceRoutingNode != null) { while (sourceRoutingNode.hasNext()) { ShardRouting shard = sourceRoutingNode.next(); - if (shard.shardId().equals(startedShard.shardId())) { - if (shard.relocating()) { - dirty = true; - sourceRoutingNode.remove(); - break; - } + if (shard.allocationId().getId().equals(startedShard.allocationId().getRelocationId())) { + assert shard.relocating() : "source shard for relocation is not marked as relocating. source " + shard + ", started target " + startedShard; + dirty = true; + sourceRoutingNode.remove(); + break; } } } @@ -378,133 +380,105 @@ public class AllocationService extends AbstractComponent { * require relocation. */ private boolean applyFailedShard(RoutingAllocation allocation, ShardRouting failedShard, boolean addToIgnoreList, UnassignedInfo unassignedInfo) { - // create a copy of the failed shard, since we assume we can change possible references to it without - // changing the state of failed shard - failedShard = new ShardRouting(failedShard); - IndexRoutingTable indexRoutingTable = allocation.routingTable().index(failedShard.index()); if (indexRoutingTable == null) { + logger.debug("{} ignoring shard failure, unknown index in {} ({})", failedShard.shardId(), failedShard, unassignedInfo.shortSummary()); return false; } RoutingNodes routingNodes = allocation.routingNodes(); - boolean dirty = false; - if (failedShard.relocatingNodeId() != null) { - // the shard is relocating, either in initializing (recovery from another node) or relocating (moving to another node) - if (failedShard.initializing()) { - // the shard is initializing and recovering from another node - // first, we need to cancel the current node that is being initialized - RoutingNodes.RoutingNodeIterator initializingNode = routingNodes.routingNodeIter(failedShard.currentNodeId()); - if (initializingNode != null) { - while (initializingNode.hasNext()) { - ShardRouting shardRouting = initializingNode.next(); - if (shardRouting.equals(failedShard)) { - dirty = true; - initializingNode.remove(); - if (addToIgnoreList) { - // make sure we ignore this shard on the relevant node - allocation.addIgnoreShardForNode(failedShard.shardId(), failedShard.currentNodeId()); - } - break; - } - } - } - if (dirty) { - logger.debug("failed shard {} found in routingNodes, failing it", failedShard); - // now, find the node that we are relocating *from*, and cancel its relocation - RoutingNode relocatingFromNode = routingNodes.node(failedShard.relocatingNodeId()); - if (relocatingFromNode != null) { - for (ShardRouting shardRouting : relocatingFromNode) { - if (shardRouting.shardId().equals(failedShard.shardId()) && shardRouting.relocating()) { - dirty = true; - routingNodes.cancelRelocation(shardRouting); - break; - } - } - } - } else { - logger.debug("failed shard {} not found in routingNodes, ignoring it", failedShard); - } - return dirty; - } else if (failedShard.relocating()) { - // the shard is relocating, meaning its the source the shard is relocating from - // first, we need to cancel the current relocation from the current node - // now, find the node that we are recovering from, cancel the relocation, remove it from the node - // and add it to the unassigned shards list... - RoutingNodes.RoutingNodeIterator relocatingFromNode = routingNodes.routingNodeIter(failedShard.currentNodeId()); - if (relocatingFromNode != null) { - while (relocatingFromNode.hasNext()) { - ShardRouting shardRouting = relocatingFromNode.next(); - if (shardRouting.equals(failedShard)) { - dirty = true; - if (addToIgnoreList) { - // make sure we ignore this shard on the relevant node - allocation.addIgnoreShardForNode(failedShard.shardId(), failedShard.currentNodeId()); - } - relocatingFromNode.moveToUnassigned(unassignedInfo); - break; - } - } - } - if (dirty) { - logger.debug("failed shard {} found in routingNodes, failing it", failedShard); - // next, we need to find the target initializing shard that is recovering from, and remove it... - RoutingNodes.RoutingNodeIterator initializingNode = routingNodes.routingNodeIter(failedShard.relocatingNodeId()); - if (initializingNode != null) { - while (initializingNode.hasNext()) { - ShardRouting shardRouting = initializingNode.next(); - if (shardRouting.shardId().equals(failedShard.shardId()) && shardRouting.initializing()) { - dirty = true; - initializingNode.remove(); - } - } - } - } else { - logger.debug("failed shard {} not found in routingNodes, ignoring it", failedShard); - } - } else { - throw new IllegalStateException("illegal state for a failed shard, relocating node id is set, but state does not match: " + failedShard); + RoutingNodes.RoutingNodeIterator matchedNode = routingNodes.routingNodeIter(failedShard.currentNodeId()); + if (matchedNode == null) { + logger.debug("{} ignoring shard failure, unknown node in {} ({})", failedShard.shardId(), failedShard, unassignedInfo.shortSummary()); + return false; + } + + boolean matchedShard = false; + while (matchedNode.hasNext()) { + ShardRouting routing = matchedNode.next(); + if (routing.allocationId().getId().equals(failedShard.allocationId().getId())) { + matchedShard = true; + logger.debug("{} failed shard {} found in routingNodes, failing it ({})", failedShard.shardId(), failedShard, unassignedInfo.shortSummary()); + break; } - } else { - // the shard is not relocating, its either started, or initializing, just cancel it and move on... - RoutingNodes.RoutingNodeIterator node = routingNodes.routingNodeIter(failedShard.currentNodeId()); - if (node != null) { - while (node.hasNext()) { - ShardRouting shardRouting = node.next(); - if (shardRouting.equals(failedShard)) { - dirty = true; - if (addToIgnoreList) { - // make sure we ignore this shard on the relevant node - allocation.addIgnoreShardForNode(failedShard.shardId(), failedShard.currentNodeId()); - } - // move all the shards matching the failed shard to the end of the unassigned list - // so we give a chance for other allocations and won't create poison failed allocations - // that can keep other shards from being allocated (because of limits applied on how many - // shards we can start per node) - List shardsToMove = Lists.newArrayList(); - for (Iterator unassignedIt = routingNodes.unassigned().iterator(); unassignedIt.hasNext(); ) { - ShardRouting unassignedShardRouting = unassignedIt.next(); - if (unassignedShardRouting.shardId().equals(failedShard.shardId())) { - unassignedIt.remove(); - shardsToMove.add(unassignedShardRouting); - } - } - if (!shardsToMove.isEmpty()) { - routingNodes.unassigned().addAll(shardsToMove); - } + } - node.moveToUnassigned(unassignedInfo); + if (matchedShard == false) { + logger.debug("{} ignoring shard failure, unknown allocation id in {} ({})", failedShard.shardId(), failedShard, unassignedInfo.shortSummary()); + return false; + } + + // replace incoming instance to make sure we work on the latest one. Copy it to maintain information during modifications. + failedShard = new ShardRouting(matchedNode.current()); + + // remove the current copy of the shard + matchedNode.remove(); + + if (addToIgnoreList) { + // make sure we ignore this shard on the relevant node + allocation.addIgnoreShardForNode(failedShard.shardId(), failedShard.currentNodeId()); + } + + if (failedShard.relocatingNodeId() != null && failedShard.initializing()) { + // The shard is a target of a relocating shard. In that case we only + // need to remove the target shard and cancel the source relocation. + // No shard is left unassigned + logger.trace("{} is a relocation target, resolving source to cancel relocation ({})", failedShard, unassignedInfo.shortSummary()); + RoutingNode relocatingFromNode = routingNodes.node(failedShard.relocatingNodeId()); + if (relocatingFromNode != null) { + for (ShardRouting shardRouting : relocatingFromNode) { + if (shardRouting.allocationId().getId().equals(failedShard.allocationId().getRelocationId())) { + logger.trace("{}, resolved source to [{}]. canceling relocation ... ({})", failedShard.shardId(), shardRouting, unassignedInfo.shortSummary()); + routingNodes.cancelRelocation(shardRouting); break; } } } - if (dirty) { - logger.debug("failed shard {} found in routingNodes and failed", failedShard); - } else { - logger.debug("failed shard {} not found in routingNodes, ignoring it", failedShard); + } else { + // The fail shard is the main copy of the current shard routing. Any + // relocation will be cancelled (and the target shard removed as well) + // and the shard copy needs to be marked as unassigned + + if (failedShard.relocatingNodeId() != null) { + // handle relocation source shards. we need to find the target initializing shard that is recovering from, and remove it... + assert failedShard.initializing() == false; // should have been dealt with and returned + assert failedShard.relocating(); + + RoutingNodes.RoutingNodeIterator initializingNode = routingNodes.routingNodeIter(failedShard.relocatingNodeId()); + if (initializingNode != null) { + while (initializingNode.hasNext()) { + ShardRouting shardRouting = initializingNode.next(); + if (shardRouting.allocationId().getId().equals(failedShard.allocationId().getRelocationId())) { + assert shardRouting.initializing() : shardRouting; + assert failedShard.allocationId().getId().equals(shardRouting.allocationId().getRelocationId()) + : "found target shard's allocation relocation id is different than source"; + logger.trace("{} is removed due to the failure of the source shard", shardRouting); + initializingNode.remove(); + } + } + } } + + // move all the shards matching the failed shard to the end of the unassigned list + // so we give a chance for other allocations and won't create poison failed allocations + // that can keep other shards from being allocated (because of limits applied on how many + // shards we can start per node) + List shardsToMove = Lists.newArrayList(); + for (Iterator unassignedIt = routingNodes.unassigned().iterator(); unassignedIt.hasNext(); ) { + ShardRouting unassignedShardRouting = unassignedIt.next(); + if (unassignedShardRouting.shardId().equals(failedShard.shardId())) { + unassignedIt.remove(); + shardsToMove.add(unassignedShardRouting); + } + } + if (!shardsToMove.isEmpty()) { + routingNodes.unassigned().addAll(shardsToMove); + } + + matchedNode.moveToUnassigned(unassignedInfo); } - return dirty; + assert matchedNode.isRemoved() : "failedShard " + failedShard + " was matched but wasn't removed"; + return true; } } diff --git a/core/src/test/java/org/elasticsearch/ExceptionSerializationTests.java b/core/src/test/java/org/elasticsearch/ExceptionSerializationTests.java index 6381532c997..32c0382e4c4 100644 --- a/core/src/test/java/org/elasticsearch/ExceptionSerializationTests.java +++ b/core/src/test/java/org/elasticsearch/ExceptionSerializationTests.java @@ -192,17 +192,18 @@ public class ExceptionSerializationTests extends ElasticsearchTestCase { } public void testIllegalShardRoutingStateException() throws IOException { - ShardRouting routing = TestShardRouting.newShardRouting("test", 0, "xyz", "def", false, ShardRoutingState.STARTED, 0); + final ShardRouting routing = TestShardRouting.newShardRouting("test", 0, "xyz", "def", false, ShardRoutingState.STARTED, 0); + final String routingAsString = routing.toString(); IllegalShardRoutingStateException serialize = serialize(new IllegalShardRoutingStateException(routing, "foo", new NullPointerException())); assertNotNull(serialize.shard()); assertEquals(routing, serialize.shard()); - assertEquals("[test][0], node[xyz], relocating [def], [R], s[STARTED]: foo", serialize.getMessage()); + assertEquals(routingAsString + ": foo", serialize.getMessage()); assertTrue(serialize.getCause() instanceof NullPointerException); serialize = serialize(new IllegalShardRoutingStateException(routing, "bar", null)); assertNotNull(serialize.shard()); assertEquals(routing, serialize.shard()); - assertEquals("[test][0], node[xyz], relocating [def], [R], s[STARTED]: bar", serialize.getMessage()); + assertEquals(routingAsString + ": bar", serialize.getMessage()); assertNull(serialize.getCause()); } diff --git a/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTest.java b/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTest.java deleted file mode 100644 index de81e852385..00000000000 --- a/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTest.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.cluster.action.shard; - -import org.elasticsearch.Version; -import org.elasticsearch.cluster.ClusterName; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.cluster.metadata.MetaData; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.cluster.routing.*; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.transport.DummyTransportAddress; -import org.elasticsearch.test.ElasticsearchTestCase; - -import java.util.ArrayList; -import java.util.List; - -import static org.hamcrest.Matchers.equalTo; - - -public class ShardStateActionTest extends ElasticsearchTestCase { - - public void testShardFiltering() { - final IndexMetaData indexMetaData = IndexMetaData.builder("test") - .settings(Settings.builder() - .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) - .put(IndexMetaData.SETTING_INDEX_UUID, "test_uuid")) - .numberOfShards(2).numberOfReplicas(0) - .build(); - ClusterState.Builder stateBuilder = ClusterState.builder(ClusterName.DEFAULT) - .nodes(DiscoveryNodes.builder() - .put(new DiscoveryNode("node1", DummyTransportAddress.INSTANCE, Version.CURRENT)).masterNodeId("node1") - .put(new DiscoveryNode("node2", DummyTransportAddress.INSTANCE, Version.CURRENT)) - ) - .metaData(MetaData.builder().put(indexMetaData, false)); - - final ShardRouting initShard = TestShardRouting.newShardRouting("test", 0, "node1", randomBoolean(), ShardRoutingState.INITIALIZING, 1); - final ShardRouting startedShard = TestShardRouting.newShardRouting("test", 1, "node2", randomBoolean(), ShardRoutingState.STARTED, 1); - final ShardRouting relocatingShard = TestShardRouting.newShardRouting("test", 2, "node1", "node2", randomBoolean(), ShardRoutingState.RELOCATING, 1); - stateBuilder.routingTable(RoutingTable.builder().add(IndexRoutingTable.builder("test") - .addIndexShard(new IndexShardRoutingTable.Builder(initShard.shardId(), true).addShard(initShard).build()) - .addIndexShard(new IndexShardRoutingTable.Builder(startedShard.shardId(), true).addShard(startedShard).build()) - .addIndexShard(new IndexShardRoutingTable.Builder(relocatingShard.shardId(), true).addShard(relocatingShard).build()))); - - ClusterState state = stateBuilder.build(); - - ArrayList listToFilter = new ArrayList<>(); - ArrayList expectedToBeApplied = new ArrayList<>(); - - listToFilter.add(new ShardStateAction.ShardRoutingEntry(initShard, indexMetaData.indexUUID() + "_suffix", "wrong_uuid", null)); - - listToFilter.add(new ShardStateAction.ShardRoutingEntry(relocatingShard.buildTargetRelocatingShard(), indexMetaData.indexUUID(), "relocating_to_node", null)); - expectedToBeApplied.add(listToFilter.get(listToFilter.size() - 1)); - - listToFilter.add(new ShardStateAction.ShardRoutingEntry(startedShard, indexMetaData.indexUUID(), "started shard", null)); - expectedToBeApplied.add(listToFilter.get(listToFilter.size() - 1)); - - listToFilter.add(new ShardStateAction.ShardRoutingEntry(TestShardRouting.newShardRouting(initShard.index() + "_NA", initShard.id(), - initShard.currentNodeId(), initShard.primary(), initShard.state(), initShard.version()), indexMetaData.indexUUID(), "wrong_uuid", null)); - - List toBeApplied = ShardStateAction.extractShardsToBeApplied(listToFilter, "for testing", state.metaData(), logger); - if (toBeApplied.size() != expectedToBeApplied.size()) { - fail("size mismatch.\n Got: \n [" + toBeApplied + "], \n expected: \n [" + expectedToBeApplied + "]"); - } - for (int i = 0; i < toBeApplied.size(); i++) { - final ShardStateAction.ShardRoutingEntry found = toBeApplied.get(i); - final ShardStateAction.ShardRoutingEntry expected = expectedToBeApplied.get(i); - assertThat(found, equalTo(expected)); - } - } -} \ No newline at end of file diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/AllocationIdTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/AllocationIdTests.java index 74a8b2fde46..2f64f39f059 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/AllocationIdTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/AllocationIdTests.java @@ -62,16 +62,15 @@ public class AllocationIdTests extends ElasticsearchTestCase { assertThat(shard.allocationId(), not(equalTo(allocationId))); assertThat(shard.allocationId().getId(), equalTo(allocationId.getId())); assertThat(shard.allocationId().getRelocationId(), notNullValue()); - allocationId = shard.allocationId(); ShardRouting target = shard.buildTargetRelocatingShard(); assertThat(target.allocationId().getId(), equalTo(shard.allocationId().getRelocationId())); - assertThat(target.allocationId().getRelocationId(), nullValue()); + assertThat(target.allocationId().getRelocationId(), equalTo(shard.allocationId().getId())); logger.info("-- finalize the relocation"); - shard.moveToStarted(); - assertThat(shard.allocationId().getId(), equalTo(target.allocationId().getId())); - assertThat(shard.allocationId().getRelocationId(), nullValue()); + target.moveToStarted(); + assertThat(target.allocationId().getId(), equalTo(shard.allocationId().getRelocationId())); + assertThat(target.allocationId().getRelocationId(), nullValue()); } @Test diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/TestShardRouting.java b/core/src/test/java/org/elasticsearch/cluster/routing/TestShardRouting.java index 99d3d8b3a0b..82d3afc6e91 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/TestShardRouting.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/TestShardRouting.java @@ -33,6 +33,10 @@ public class TestShardRouting { return new ShardRouting(index, shardId, currentNodeId, relocatingNodeId, null, primary, state, version, null, buildAllocationId(state), true); } + public static ShardRouting newShardRouting(String index, int shardId, String currentNodeId, String relocatingNodeId, boolean primary, ShardRoutingState state, AllocationId allocationId, long version) { + return new ShardRouting(index, shardId, currentNodeId, relocatingNodeId, null, primary, state, version, null, allocationId, true); + } + public static ShardRouting newShardRouting(String index, int shardId, String currentNodeId, String relocatingNodeId, RestoreSource restoreSource, boolean primary, ShardRoutingState state, long version) { return new ShardRouting(index, shardId, currentNodeId, relocatingNodeId, restoreSource, primary, state, version, null, buildAllocationId(state), true); } diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java index f6fc184bb1f..569018c431d 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java @@ -256,9 +256,9 @@ public class FailedShardsRoutingTests extends ElasticsearchAllocationTestCase { logger.info("fail the first shard, will have no place to be rerouted to (single node), so stays unassigned"); prevRoutingTable = routingTable; - routingTable = strategy.applyFailedShard(clusterState, TestShardRouting.newShardRouting("test", 0, "node1", true, INITIALIZING, 0)).routingTable(); + ShardRouting firstShard = clusterState.routingNodes().node("node1").get(0); + routingTable = strategy.applyFailedShard(clusterState, firstShard).routingTable(); clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); - RoutingNodes routingNodes = clusterState.routingNodes(); assertThat(prevRoutingTable != routingTable, equalTo(true)); assertThat(routingTable.index("test").shards().size(), equalTo(1)); @@ -272,7 +272,7 @@ public class FailedShardsRoutingTests extends ElasticsearchAllocationTestCase { } logger.info("fail the shard again, see that nothing happens"); - assertThat(strategy.applyFailedShard(clusterState, TestShardRouting.newShardRouting("test", 0, "node1", true, INITIALIZING, 0)).changed(), equalTo(false)); + assertThat(strategy.applyFailedShard(clusterState, firstShard).changed(), equalTo(false)); } @Test @@ -371,9 +371,9 @@ public class FailedShardsRoutingTests extends ElasticsearchAllocationTestCase { logger.info("fail the first shard, will start INITIALIZING on the second node"); prevRoutingTable = routingTable; - routingTable = strategy.applyFailedShard(clusterState, TestShardRouting.newShardRouting("test", 0, nodeHoldingPrimary, true, INITIALIZING, 0)).routingTable(); + final ShardRouting firstShard = clusterState.routingNodes().node("node1").get(0); + routingTable = strategy.applyFailedShard(clusterState, firstShard).routingTable(); clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); - RoutingNodes routingNodes = clusterState.routingNodes(); assertThat(prevRoutingTable != routingTable, equalTo(true)); assertThat(routingTable.index("test").shards().size(), equalTo(1)); @@ -387,7 +387,7 @@ public class FailedShardsRoutingTests extends ElasticsearchAllocationTestCase { } logger.info("fail the shard again, see that nothing happens"); - assertThat(strategy.applyFailedShard(clusterState, TestShardRouting.newShardRouting("test", 0, nodeHoldingPrimary, true, INITIALIZING, 0)).changed(), equalTo(false)); + assertThat(strategy.applyFailedShard(clusterState, firstShard).changed(), equalTo(false)); } @Test diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/StartedShardsRoutingTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/StartedShardsRoutingTests.java index 9a36ba3d3a0..506d3d29e0f 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/StartedShardsRoutingTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/StartedShardsRoutingTests.java @@ -64,68 +64,57 @@ public class StartedShardsRoutingTests extends ElasticsearchAllocationTestCase { RoutingAllocation.Result result = allocation.applyStartedShards(state, Arrays.asList( TestShardRouting.newShardRouting(initShard.index(), initShard.id(), initShard.currentNodeId(), initShard.relocatingNodeId(), initShard.primary(), - ShardRoutingState.INITIALIZING, randomInt())), false); + ShardRoutingState.INITIALIZING, initShard.allocationId(), randomInt())), false); assertTrue("failed to start " + initShard + "\ncurrent routing table:" + result.routingTable().prettyPrint(), result.changed()); assertTrue(initShard + "isn't started \ncurrent routing table:" + result.routingTable().prettyPrint(), result.routingTable().index("test").shard(initShard.id()).allShardsStarted()); - logger.info("--> testing shard variants that shouldn't match the started shard"); + logger.info("--> testing shard variants that shouldn't match the initializing shard"); result = allocation.applyStartedShards(state, Arrays.asList( - TestShardRouting.newShardRouting(initShard.index(), initShard.id(), initShard.currentNodeId(), initShard.relocatingNodeId(), !initShard.primary(), + TestShardRouting.newShardRouting(initShard.index(), initShard.id(), initShard.currentNodeId(), initShard.relocatingNodeId(), initShard.primary(), ShardRoutingState.INITIALIZING, 1)), false); - assertFalse("wrong primary flag shouldn't start shard " + initShard + "\ncurrent routing table:" + result.routingTable().prettyPrint(), result.changed()); + assertFalse("wrong allocation id flag shouldn't start shard " + initShard + "\ncurrent routing table:" + result.routingTable().prettyPrint(), result.changed()); result = allocation.applyStartedShards(state, Arrays.asList( TestShardRouting.newShardRouting(initShard.index(), initShard.id(), "some_node", initShard.currentNodeId(), initShard.primary(), - ShardRoutingState.INITIALIZING, 1)), false); + ShardRoutingState.INITIALIZING, AllocationId.newTargetRelocation(AllocationId.newRelocation(initShard.allocationId())) + , 1)), false); assertFalse("relocating shard from node shouldn't start shard " + initShard + "\ncurrent routing table:" + result.routingTable().prettyPrint(), result.changed()); - result = allocation.applyStartedShards(state, Arrays.asList( - TestShardRouting.newShardRouting(initShard.index(), initShard.id(), initShard.currentNodeId(), "some_node", initShard.primary(), - ShardRoutingState.INITIALIZING, 1)), false); - assertFalse("relocating shard to node shouldn't start shard " + initShard + "\ncurrent routing table:" + result.routingTable().prettyPrint(), result.changed()); logger.info("--> testing double starting"); result = allocation.applyStartedShards(state, Arrays.asList( TestShardRouting.newShardRouting(startedShard.index(), startedShard.id(), startedShard.currentNodeId(), startedShard.relocatingNodeId(), startedShard.primary(), - ShardRoutingState.INITIALIZING, 1)), false); + ShardRoutingState.INITIALIZING, startedShard.allocationId(), 1)), false); assertFalse("duplicate starting of the same shard should be ignored \ncurrent routing table:" + result.routingTable().prettyPrint(), result.changed()); logger.info("--> testing starting of relocating shards"); + final AllocationId targetAllocationId = AllocationId.newTargetRelocation(relocatingShard.allocationId()); result = allocation.applyStartedShards(state, Arrays.asList( TestShardRouting.newShardRouting(relocatingShard.index(), relocatingShard.id(), relocatingShard.relocatingNodeId(), relocatingShard.currentNodeId(), relocatingShard.primary(), - ShardRoutingState.INITIALIZING, randomInt())), false); + ShardRoutingState.INITIALIZING, targetAllocationId, randomInt())), false); + assertTrue("failed to start " + relocatingShard + "\ncurrent routing table:" + result.routingTable().prettyPrint(), result.changed()); ShardRouting shardRouting = result.routingTable().index("test").shard(relocatingShard.id()).getShards().get(0); assertThat(shardRouting.state(), equalTo(ShardRoutingState.STARTED)); assertThat(shardRouting.currentNodeId(), equalTo("node2")); assertThat(shardRouting.relocatingNodeId(), nullValue()); - logger.info("--> testing shard variants that shouldn't match the relocating shard"); + logger.info("--> testing shard variants that shouldn't match the initializing relocating shard"); result = allocation.applyStartedShards(state, Arrays.asList( - TestShardRouting.newShardRouting(relocatingShard.index(), relocatingShard.id(), relocatingShard.relocatingNodeId(), relocatingShard.currentNodeId(), !relocatingShard.primary(), - ShardRoutingState.INITIALIZING, 1)), false); - assertFalse("wrong primary flag shouldn't start shard " + relocatingShard + "\ncurrent routing table:" + result.routingTable().prettyPrint(), result.changed()); + TestShardRouting.newShardRouting(relocatingShard.index(), relocatingShard.id(), relocatingShard.relocatingNodeId(), relocatingShard.currentNodeId(), relocatingShard.primary(), + ShardRoutingState.INITIALIZING, relocatingShard.version()))); + assertFalse("wrong allocation id shouldn't start shard" + relocatingShard + "\ncurrent routing table:" + result.routingTable().prettyPrint(), result.changed()); result = allocation.applyStartedShards(state, Arrays.asList( - TestShardRouting.newShardRouting(relocatingShard.index(), relocatingShard.id(), "some_node", relocatingShard.currentNodeId(), relocatingShard.primary(), - ShardRoutingState.INITIALIZING, 1)), false); - assertFalse("relocating shard to a different node shouldn't start shard " + relocatingShard + "\ncurrent routing table:" + result.routingTable().prettyPrint(), result.changed()); - - result = allocation.applyStartedShards(state, Arrays.asList( - TestShardRouting.newShardRouting(relocatingShard.index(), relocatingShard.id(), relocatingShard.relocatingNodeId(), "some_node", relocatingShard.primary(), - ShardRoutingState.INITIALIZING, 1)), false); - assertFalse("relocating shard from a different node shouldn't start shard " + relocatingShard + "\ncurrent routing table:" + result.routingTable().prettyPrint(), result.changed()); - - result = allocation.applyStartedShards(state, Arrays.asList( - TestShardRouting.newShardRouting(relocatingShard.index(), relocatingShard.id(), relocatingShard.relocatingNodeId(), relocatingShard.primary(), - ShardRoutingState.INITIALIZING, 1)), false); - assertFalse("non-relocating shard shouldn't start shard" + relocatingShard + "\ncurrent routing table:" + result.routingTable().prettyPrint(), result.changed()); + TestShardRouting.newShardRouting(relocatingShard.index(), relocatingShard.id(), relocatingShard.relocatingNodeId(), relocatingShard.currentNodeId(), relocatingShard.primary(), + ShardRoutingState.INITIALIZING, relocatingShard.allocationId(), randomInt())), false); + assertFalse("wrong allocation id shouldn't start shard even if relocatingId==shard.id" + relocatingShard + "\ncurrent routing table:" + result.routingTable().prettyPrint(), result.changed()); } }