From 965efa51cc1108f51088dbf25ddb75b7df80a758 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Sat, 3 Feb 2018 09:41:53 -0500 Subject: [PATCH] Allows failing shards without marking as stale (#28054) Currently when failing a shard we also mark it as stale (eg. remove its allocationId from from the InSync set). However in some cases, we need to be able to fail shards but keep them InSync set. This commit adds such capacity. This is a preparatory change to make the primary-replica resync less lenient. Relates #24841 --- .../replication/TransportWriteAction.java | 4 +- .../action/shard/ShardStateAction.java | 212 +++++++++++------- .../routing/allocation/AllocationService.java | 7 +- .../routing/allocation/FailedShard.java | 13 +- .../allocation/IndexMetaDataUpdater.java | 23 +- .../routing/allocation/RoutingAllocation.java | 8 + .../command/CancelAllocationCommand.java | 2 + .../cluster/reroute/ClusterRerouteTests.java | 2 +- .../TransportWriteActionTests.java | 2 +- ...rdFailedClusterStateTaskExecutorTests.java | 105 ++++++--- .../action/shard/ShardStateActionTests.java | 55 ++++- .../cluster/routing/PrimaryTermsTests.java | 2 +- .../cluster/routing/UnassignedInfoTests.java | 2 +- .../allocation/FailedNodeRoutingTests.java | 2 +- .../allocation/FailedShardsRoutingTests.java | 22 +- .../allocation/InSyncAllocationIdTests.java | 20 +- .../MaxRetryAllocationDeciderTests.java | 14 +- .../SingleShardNoReplicasRoutingTests.java | 2 +- .../decider/FilterAllocationDeciderTests.java | 4 +- .../indices/cluster/ClusterStateChanges.java | 13 +- ...ClusterStateServiceRandomUpdatesTests.java | 2 +- 21 files changed, 331 insertions(+), 185 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java index ec3dcd94d30..28b8f0826cd 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java @@ -387,14 +387,14 @@ public abstract class TransportWriteAction< logger.warn((org.apache.logging.log4j.util.Supplier) () -> new ParameterizedMessage("[{}] {}", replica.shardId(), message), exception); - shardStateAction.remoteShardFailed(replica.shardId(), replica.allocationId().getId(), primaryTerm, message, exception, + shardStateAction.remoteShardFailed(replica.shardId(), replica.allocationId().getId(), primaryTerm, true, message, exception, createListener(onSuccess, onPrimaryDemoted, onIgnoredFailure)); } @Override public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, Runnable onSuccess, Consumer onPrimaryDemoted, Consumer onIgnoredFailure) { - shardStateAction.remoteShardFailed(shardId, allocationId, primaryTerm, "mark copy as stale", null, + shardStateAction.remoteShardFailed(shardId, allocationId, primaryTerm, true, "mark copy as stale", null, createListener(onSuccess, onPrimaryDemoted, onIgnoredFailure)); } diff --git a/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java b/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java index 543118a172f..f6e0fa116d6 100644 --- a/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java +++ b/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java @@ -24,6 +24,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterStateTaskExecutor; import org.elasticsearch.cluster.ClusterState; @@ -88,21 +89,21 @@ public class ShardStateAction extends AbstractComponent { this.clusterService = clusterService; this.threadPool = threadPool; - transportService.registerRequestHandler(SHARD_STARTED_ACTION_NAME, ShardEntry::new, ThreadPool.Names.SAME, new ShardStartedTransportHandler(clusterService, new ShardStartedClusterStateTaskExecutor(allocationService, logger), logger)); - transportService.registerRequestHandler(SHARD_FAILED_ACTION_NAME, ShardEntry::new, ThreadPool.Names.SAME, new ShardFailedTransportHandler(clusterService, new ShardFailedClusterStateTaskExecutor(allocationService, routingService, logger), logger)); + transportService.registerRequestHandler(SHARD_STARTED_ACTION_NAME, ThreadPool.Names.SAME, StartedShardEntry::new, new ShardStartedTransportHandler(clusterService, new ShardStartedClusterStateTaskExecutor(allocationService, logger), logger)); + transportService.registerRequestHandler(SHARD_FAILED_ACTION_NAME, ThreadPool.Names.SAME, FailedShardEntry::new, new ShardFailedTransportHandler(clusterService, new ShardFailedClusterStateTaskExecutor(allocationService, routingService, logger), logger)); } - private void sendShardAction(final String actionName, final ClusterState currentState, final ShardEntry shardEntry, final Listener listener) { + private void sendShardAction(final String actionName, final ClusterState currentState, final TransportRequest request, final Listener listener) { ClusterStateObserver observer = new ClusterStateObserver(currentState, clusterService, null, logger, threadPool.getThreadContext()); DiscoveryNode masterNode = currentState.nodes().getMasterNode(); Predicate changePredicate = MasterNodeChangePredicate.build(currentState); if (masterNode == null) { - logger.warn("{} no master known for action [{}] for shard entry [{}]", shardEntry.shardId, actionName, shardEntry); - waitForNewMasterAndRetry(actionName, observer, shardEntry, listener, changePredicate); + logger.warn("no master known for action [{}] for shard entry [{}]", actionName, request); + waitForNewMasterAndRetry(actionName, observer, request, listener, changePredicate); } else { - logger.debug("{} sending [{}] to [{}] for shard entry [{}]", shardEntry.shardId, actionName, masterNode.getId(), shardEntry); + logger.debug("sending [{}] to [{}] for shard entry [{}]", actionName, masterNode.getId(), request); transportService.sendRequest(masterNode, - actionName, shardEntry, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) { + actionName, request, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) { @Override public void handleResponse(TransportResponse.Empty response) { listener.onSuccess(); @@ -111,9 +112,9 @@ public class ShardStateAction extends AbstractComponent { @Override public void handleException(TransportException exp) { if (isMasterChannelException(exp)) { - waitForNewMasterAndRetry(actionName, observer, shardEntry, listener, changePredicate); + waitForNewMasterAndRetry(actionName, observer, request, listener, changePredicate); } else { - logger.warn((Supplier) () -> new ParameterizedMessage("{} unexpected failure while sending request [{}] to [{}] for shard entry [{}]", shardEntry.shardId, actionName, masterNode, shardEntry), exp); + logger.warn(new ParameterizedMessage("unexpected failure while sending request [{}] to [{}] for shard entry [{}]", actionName, masterNode, request), exp); listener.onFailure(exp instanceof RemoteTransportException ? (Exception) (exp.getCause() instanceof Exception ? exp.getCause() : new ElasticsearchException(exp.getCause())) : exp); } } @@ -139,13 +140,15 @@ public class ShardStateAction extends AbstractComponent { * @param shardId shard id of the shard to fail * @param allocationId allocation id of the shard to fail * @param primaryTerm the primary term associated with the primary shard that is failing the shard. Must be strictly positive. + * @param markAsStale whether or not to mark a failing shard as stale (eg. removing from in-sync set) when failing the shard. * @param message the reason for the failure * @param failure the underlying cause of the failure * @param listener callback upon completion of the request */ - public void remoteShardFailed(final ShardId shardId, String allocationId, long primaryTerm, final String message, @Nullable final Exception failure, Listener listener) { + public void remoteShardFailed(final ShardId shardId, String allocationId, long primaryTerm, boolean markAsStale, final String message, @Nullable final Exception failure, Listener listener) { assert primaryTerm > 0L : "primary term should be strictly positive"; - shardFailed(shardId, allocationId, primaryTerm, message, failure, listener, clusterService.state()); + FailedShardEntry shardEntry = new FailedShardEntry(shardId, allocationId, primaryTerm, message, failure, markAsStale); + sendShardAction(SHARD_FAILED_ACTION_NAME, clusterService.state(), shardEntry, listener); } /** @@ -160,29 +163,24 @@ public class ShardStateAction extends AbstractComponent { */ public void localShardFailed(final ShardRouting shardRouting, final String message, @Nullable final Exception failure, Listener listener, final ClusterState currentState) { - shardFailed(shardRouting.shardId(), shardRouting.allocationId().getId(), 0L, message, failure, listener, currentState); - } - - private void shardFailed(final ShardId shardId, String allocationId, long primaryTerm, final String message, - @Nullable final Exception failure, Listener listener, ClusterState currentState) { - ShardEntry shardEntry = new ShardEntry(shardId, allocationId, primaryTerm, message, failure); + FailedShardEntry shardEntry = new FailedShardEntry(shardRouting.shardId(), shardRouting.allocationId().getId(), 0L, message, failure, true); sendShardAction(SHARD_FAILED_ACTION_NAME, currentState, shardEntry, listener); } // visible for testing - protected void waitForNewMasterAndRetry(String actionName, ClusterStateObserver observer, ShardEntry shardEntry, Listener listener, Predicate changePredicate) { + protected void waitForNewMasterAndRetry(String actionName, ClusterStateObserver observer, TransportRequest request, Listener listener, Predicate changePredicate) { observer.waitForNextChange(new ClusterStateObserver.Listener() { @Override public void onNewClusterState(ClusterState state) { if (logger.isTraceEnabled()) { - logger.trace("new cluster state [{}] after waiting for master election to fail shard entry [{}]", state, shardEntry); + logger.trace("new cluster state [{}] after waiting for master election for shard entry [{}]", state, request); } - sendShardAction(actionName, state, shardEntry, listener); + sendShardAction(actionName, state, request, listener); } @Override public void onClusterServiceClose() { - logger.warn((Supplier) () -> new ParameterizedMessage("{} node closed while execution action [{}] for shard entry [{}]", shardEntry.shardId, actionName, shardEntry), shardEntry.failure); + logger.warn("node closed while execution action [{}] for shard entry [{}]", actionName, request); listener.onFailure(new NodeClosedException(clusterService.localNode())); } @@ -194,7 +192,7 @@ public class ShardStateAction extends AbstractComponent { }, changePredicate); } - private static class ShardFailedTransportHandler implements TransportRequestHandler { + private static class ShardFailedTransportHandler implements TransportRequestHandler { private final ClusterService clusterService; private final ShardFailedClusterStateTaskExecutor shardFailedClusterStateTaskExecutor; private final Logger logger; @@ -206,7 +204,7 @@ public class ShardStateAction extends AbstractComponent { } @Override - public void messageReceived(ShardEntry request, TransportChannel channel) throws Exception { + public void messageReceived(FailedShardEntry request, TransportChannel channel) throws Exception { logger.warn((Supplier) () -> new ParameterizedMessage("{} received shard failed for {}", request.shardId, request), request.failure); clusterService.submitStateUpdateTask( "shard-failed", @@ -248,7 +246,7 @@ public class ShardStateAction extends AbstractComponent { } } - public static class ShardFailedClusterStateTaskExecutor implements ClusterStateTaskExecutor { + public static class ShardFailedClusterStateTaskExecutor implements ClusterStateTaskExecutor { private final AllocationService allocationService; private final RoutingService routingService; private final Logger logger; @@ -260,13 +258,13 @@ public class ShardStateAction extends AbstractComponent { } @Override - public ClusterTasksResult execute(ClusterState currentState, List tasks) throws Exception { - ClusterTasksResult.Builder batchResultBuilder = ClusterTasksResult.builder(); - List tasksToBeApplied = new ArrayList<>(); + public ClusterTasksResult execute(ClusterState currentState, List tasks) throws Exception { + ClusterTasksResult.Builder batchResultBuilder = ClusterTasksResult.builder(); + List tasksToBeApplied = new ArrayList<>(); List failedShardsToBeApplied = new ArrayList<>(); List staleShardsToBeApplied = new ArrayList<>(); - for (ShardEntry task : tasks) { + for (FailedShardEntry task : tasks) { IndexMetaData indexMetaData = currentState.metaData().index(task.shardId.getIndex()); if (indexMetaData == null) { // tasks that correspond to non-existent indices are marked as successful @@ -314,7 +312,7 @@ public class ShardStateAction extends AbstractComponent { // failing a shard also possibly marks it as stale (see IndexMetaDataUpdater) logger.debug("{} failing shard {} (shard failed task: [{}])", task.shardId, matched, task); tasksToBeApplied.add(task); - failedShardsToBeApplied.add(new FailedShard(matched, task.message, task.failure)); + failedShardsToBeApplied.add(new FailedShard(matched, task.message, task.failure, task.markAsStale)); } } } @@ -352,15 +350,82 @@ public class ShardStateAction extends AbstractComponent { } } + public static class FailedShardEntry extends TransportRequest { + final ShardId shardId; + final String allocationId; + final long primaryTerm; + final String message; + final Exception failure; + final boolean markAsStale; + + FailedShardEntry(StreamInput in) throws IOException { + super(in); + shardId = ShardId.readShardId(in); + allocationId = in.readString(); + primaryTerm = in.readVLong(); + message = in.readString(); + failure = in.readException(); + if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) { + markAsStale = in.readBoolean(); + } else { + markAsStale = true; + } + } + + public FailedShardEntry(ShardId shardId, String allocationId, long primaryTerm, String message, Exception failure, boolean markAsStale) { + this.shardId = shardId; + this.allocationId = allocationId; + this.primaryTerm = primaryTerm; + this.message = message; + this.failure = failure; + this.markAsStale = markAsStale; + } + + public ShardId getShardId() { + return shardId; + } + + public String getAllocationId() { + return allocationId; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + shardId.writeTo(out); + out.writeString(allocationId); + out.writeVLong(primaryTerm); + out.writeString(message); + out.writeException(failure); + if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) { + out.writeBoolean(markAsStale); + } + } + + @Override + public String toString() { + List components = new ArrayList<>(6); + components.add("shard id [" + shardId + "]"); + components.add("allocation id [" + allocationId + "]"); + components.add("primary term [" + primaryTerm + "]"); + components.add("message [" + message + "]"); + if (failure != null) { + components.add("failure [" + ExceptionsHelper.detailedMessage(failure) + "]"); + } + components.add("markAsStale [" + markAsStale + "]"); + return String.join(", ", components); + } + } + public void shardStarted(final ShardRouting shardRouting, final String message, Listener listener) { shardStarted(shardRouting, message, listener, clusterService.state()); } public void shardStarted(final ShardRouting shardRouting, final String message, Listener listener, ClusterState currentState) { - ShardEntry shardEntry = new ShardEntry(shardRouting.shardId(), shardRouting.allocationId().getId(), 0L, message, null); + StartedShardEntry shardEntry = new StartedShardEntry(shardRouting.shardId(), shardRouting.allocationId().getId(), message); sendShardAction(SHARD_STARTED_ACTION_NAME, currentState, shardEntry, listener); } - private static class ShardStartedTransportHandler implements TransportRequestHandler { + private static class ShardStartedTransportHandler implements TransportRequestHandler { private final ClusterService clusterService; private final ShardStartedClusterStateTaskExecutor shardStartedClusterStateTaskExecutor; private final Logger logger; @@ -372,7 +437,7 @@ public class ShardStateAction extends AbstractComponent { } @Override - public void messageReceived(ShardEntry request, TransportChannel channel) throws Exception { + public void messageReceived(StartedShardEntry request, TransportChannel channel) throws Exception { logger.debug("{} received shard started for [{}]", request.shardId, request); clusterService.submitStateUpdateTask( "shard-started " + request, @@ -384,7 +449,7 @@ public class ShardStateAction extends AbstractComponent { } } - public static class ShardStartedClusterStateTaskExecutor implements ClusterStateTaskExecutor, ClusterStateTaskListener { + public static class ShardStartedClusterStateTaskExecutor implements ClusterStateTaskExecutor, ClusterStateTaskListener { private final AllocationService allocationService; private final Logger logger; @@ -394,14 +459,12 @@ public class ShardStateAction extends AbstractComponent { } @Override - public ClusterTasksResult execute(ClusterState currentState, List tasks) throws Exception { - ClusterTasksResult.Builder builder = ClusterTasksResult.builder(); - List tasksToBeApplied = new ArrayList<>(); + public ClusterTasksResult execute(ClusterState currentState, List tasks) throws Exception { + ClusterTasksResult.Builder builder = ClusterTasksResult.builder(); + List tasksToBeApplied = new ArrayList<>(); List shardRoutingsToBeApplied = new ArrayList<>(tasks.size()); Set seenShardRoutings = new HashSet<>(); // to prevent duplicates - for (ShardEntry task : tasks) { - assert task.primaryTerm == 0L : "shard is only started by itself: " + task; - + for (StartedShardEntry task : tasks) { ShardRouting matched = currentState.getRoutingTable().getByAllocationId(task.shardId, task.allocationId); if (matched == null) { // tasks that correspond to non-existent shards are marked as successful. The reason is that we resend shard started @@ -451,40 +514,30 @@ public class ShardStateAction extends AbstractComponent { } } - public static class ShardEntry extends TransportRequest { - ShardId shardId; - String allocationId; - long primaryTerm; - String message; - Exception failure; + public static class StartedShardEntry extends TransportRequest { + final ShardId shardId; + final String allocationId; + final String message; - public ShardEntry() { - } - - public ShardEntry(ShardId shardId, String allocationId, long primaryTerm, String message, @Nullable Exception failure) { - this.shardId = shardId; - this.allocationId = allocationId; - this.primaryTerm = primaryTerm; - this.message = message; - this.failure = failure; - } - - public ShardId getShardId() { - return shardId; - } - - public String getAllocationId() { - return allocationId; - } - - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); + StartedShardEntry(StreamInput in) throws IOException { + super(in); shardId = ShardId.readShardId(in); allocationId = in.readString(); - primaryTerm = in.readVLong(); - message = in.readString(); - failure = in.readException(); + if (in.getVersion().before(Version.V_7_0_0_alpha1)) { + final long primaryTerm = in.readVLong(); + assert primaryTerm == 0L : "shard is only started by itself: primary term [" + primaryTerm + "]"; + } + this.message = in.readString(); + if (in.getVersion().before(Version.V_7_0_0_alpha1)) { + final Exception ex = in.readException(); + assert ex == null : "started shard must not have failure [" + ex + "]"; + } + } + + public StartedShardEntry(ShardId shardId, String allocationId, String message) { + this.shardId = shardId; + this.allocationId = allocationId; + this.message = message; } @Override @@ -492,22 +545,19 @@ public class ShardStateAction extends AbstractComponent { super.writeTo(out); shardId.writeTo(out); out.writeString(allocationId); - out.writeVLong(primaryTerm); + if (out.getVersion().before(Version.V_7_0_0_alpha1)) { + out.writeVLong(0L); + } out.writeString(message); - out.writeException(failure); + if (out.getVersion().before(Version.V_7_0_0_alpha1)) { + out.writeException(null); + } } @Override public String toString() { - List components = new ArrayList<>(4); - components.add("shard id [" + shardId + "]"); - components.add("allocation id [" + allocationId + "]"); - components.add("primary term [" + primaryTerm + "]"); - components.add("message [" + message + "]"); - if (failure != null) { - components.add("failure [" + ExceptionsHelper.detailedMessage(failure) + "]"); - } - return String.join(", ", components); + return String.format(Locale.ROOT, "StartedShardEntry{shardId [%s], allocationId [%s], message [%s]}", + shardId, allocationId, message); } } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java index 2326274c36c..6dc405d00a3 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java @@ -138,8 +138,8 @@ public class AllocationService extends AbstractComponent { } // Used for testing - public ClusterState applyFailedShard(ClusterState clusterState, ShardRouting failedShard) { - return applyFailedShards(clusterState, singletonList(new FailedShard(failedShard, null, null)), emptyList()); + public ClusterState applyFailedShard(ClusterState clusterState, ShardRouting failedShard, boolean markAsStale) { + return applyFailedShards(clusterState, singletonList(new FailedShard(failedShard, null, null, markAsStale)), emptyList()); } // Used for testing @@ -185,6 +185,9 @@ public class AllocationService extends AbstractComponent { UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, message, failedShardEntry.getFailure(), failedAllocations + 1, currentNanoTime, System.currentTimeMillis(), false, AllocationStatus.NO_ATTEMPT); + if (failedShardEntry.markAsStale()) { + allocation.removeAllocationId(failedShard); + } routingNodes.failShard(logger, failedShard, unassignedInfo, indexMetaData, allocation.changes()); } else { logger.trace("{} shard routing failed in an earlier iteration (routing: {})", shardToFail.shardId(), shardToFail); diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/FailedShard.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/FailedShard.java index 9bf9fa86d18..f6d9a1bc119 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/FailedShard.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/FailedShard.java @@ -30,18 +30,20 @@ public class FailedShard { private final ShardRouting routingEntry; private final String message; private final Exception failure; + private final boolean markAsStale; - public FailedShard(ShardRouting routingEntry, String message, Exception failure) { + public FailedShard(ShardRouting routingEntry, String message, Exception failure, boolean markAsStale) { assert routingEntry.assignedToNode() : "only assigned shards can be failed " + routingEntry; this.routingEntry = routingEntry; this.message = message; this.failure = failure; + this.markAsStale = markAsStale; } @Override public String toString() { return "failed shard, shard " + routingEntry + ", message [" + message + "], failure [" + - ExceptionsHelper.detailedMessage(failure) + "]"; + ExceptionsHelper.detailedMessage(failure) + "], markAsStale [" + markAsStale + "]"; } /** @@ -66,4 +68,11 @@ public class FailedShard { public Exception getFailure() { return failure; } + + /** + * Whether or not to mark the shard as stale (eg. removing from in-sync set) when failing the shard. + */ + public boolean markAsStale() { + return markAsStale; + } } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/IndexMetaDataUpdater.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/IndexMetaDataUpdater.java index b24a961829d..c5cb6d2af5b 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/IndexMetaDataUpdater.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/IndexMetaDataUpdater.java @@ -72,19 +72,12 @@ public class IndexMetaDataUpdater extends RoutingChangesObserver.AbstractRouting @Override public void shardFailed(ShardRouting failedShard, UnassignedInfo unassignedInfo) { - if (failedShard.active() && unassignedInfo.getReason() != UnassignedInfo.Reason.NODE_LEFT) { - removeAllocationId(failedShard); - - if (failedShard.primary()) { - Updates updates = changes(failedShard.shardId()); - if (updates.firstFailedPrimary == null) { - // more than one primary can be failed (because of batching, primary can be failed, replica promoted and then failed...) - updates.firstFailedPrimary = failedShard; - } - } - } - if (failedShard.active() && failedShard.primary()) { + Updates updates = changes(failedShard.shardId()); + if (updates.firstFailedPrimary == null) { + // more than one primary can be failed (because of batching, primary can be failed, replica promoted and then failed...) + updates.firstFailedPrimary = failedShard; + } increasePrimaryTerm(failedShard.shardId()); } } @@ -286,8 +279,10 @@ public class IndexMetaDataUpdater extends RoutingChangesObserver.AbstractRouting /** * Remove allocation id of this shard from the set of in-sync shard copies */ - private void removeAllocationId(ShardRouting shardRouting) { - changes(shardRouting.shardId()).removedAllocationIds.add(shardRouting.allocationId().getId()); + void removeAllocationId(ShardRouting shardRouting) { + if (shardRouting.active()) { + changes(shardRouting.shardId()).removedAllocationIds.add(shardRouting.allocationId().getId()); + } } /** diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java index abc363931c1..e0be712a230 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java @@ -27,6 +27,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.RoutingChangesObserver; import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; import org.elasticsearch.cluster.routing.allocation.decider.Decision; import org.elasticsearch.common.collect.ImmutableOpenMap; @@ -222,6 +223,13 @@ public class RoutingAllocation { return unmodifiableSet(new HashSet<>(ignore)); } + /** + * Remove the allocation id of the provided shard from the set of in-sync shard copies + */ + public void removeAllocationId(ShardRouting shardRouting) { + indexMetaDataUpdater.removeAllocationId(shardRouting); + } + /** * Returns observer to use for changes made to the routing nodes */ diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/command/CancelAllocationCommand.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/command/CancelAllocationCommand.java index eae4739c127..afeb0e0dab1 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/command/CancelAllocationCommand.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/command/CancelAllocationCommand.java @@ -156,6 +156,8 @@ public class CancelAllocationCommand implements AllocationCommand { } routingNodes.failShard(Loggers.getLogger(CancelAllocationCommand.class), shardRouting, new UnassignedInfo(UnassignedInfo.Reason.REROUTE_CANCELLED, null), indexMetaData, allocation.changes()); + // TODO: We don't have to remove a cancelled shard from in-sync set once we have a strict resync implementation. + allocation.removeAllocationId(shardRouting); return new RerouteExplanation(this, allocation.decision(Decision.YES, "cancel_allocation_command", "shard " + shardId + " on node " + discoNode + " can be cancelled")); } diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/reroute/ClusterRerouteTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/reroute/ClusterRerouteTests.java index d3a0d12a853..6e34a751007 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/reroute/ClusterRerouteTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/reroute/ClusterRerouteTests.java @@ -120,7 +120,7 @@ public class ClusterRerouteTests extends ESAllocationTestCase { assertEquals(routingTable.index("idx").shard(0).shards().get(0).unassignedInfo().getNumFailedAllocations(), i); List failedShards = Collections.singletonList( new FailedShard(routingTable.index("idx").shard(0).shards().get(0), "boom" + i, - new UnsupportedOperationException())); + new UnsupportedOperationException(), randomBoolean())); newState = allocationService.applyFailedShards(clusterState, failedShards); assertThat(newState, not(equalTo(clusterState))); clusterState = newState; diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java index 47ce090d895..805553b4a61 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java @@ -315,7 +315,7 @@ public class TransportWriteActionTests extends ESTestCase { // A write replication action proxy should fail the shard assertEquals(1, shardFailedRequests.length); CapturingTransport.CapturedRequest shardFailedRequest = shardFailedRequests[0]; - ShardStateAction.ShardEntry shardEntry = (ShardStateAction.ShardEntry) shardFailedRequest.request; + ShardStateAction.FailedShardEntry shardEntry = (ShardStateAction.FailedShardEntry) shardFailedRequest.request; // the shard the request was sent to and the shard to be failed should be the same assertEquals(shardEntry.getShardId(), replica.shardId()); assertEquals(shardEntry.getAllocationId(), replica.allocationId().getId()); diff --git a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardFailedClusterStateTaskExecutorTests.java b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardFailedClusterStateTaskExecutorTests.java index 794f71786ca..89be94b4506 100644 --- a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardFailedClusterStateTaskExecutorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardFailedClusterStateTaskExecutorTests.java @@ -22,6 +22,7 @@ package org.elasticsearch.cluster.action.shard; import com.carrotsearch.hppc.cursors.ObjectCursor; import org.apache.lucene.index.CorruptIndexException; import org.elasticsearch.Version; +import org.elasticsearch.cluster.action.shard.ShardStateAction.FailedShardEntry; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateTaskExecutor; @@ -30,6 +31,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.GroupShardsIterator; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.ShardIterator; @@ -42,20 +44,26 @@ import org.elasticsearch.cluster.routing.allocation.StaleShard; import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; import org.junit.Before; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; +import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING; +import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.Matchers.contains; public class ShardFailedClusterStateTaskExecutorTests extends ESAllocationTestCase { @@ -87,8 +95,8 @@ public class ShardFailedClusterStateTaskExecutorTests extends ESAllocationTestCa } public void testEmptyTaskListProducesSameClusterState() throws Exception { - List tasks = Collections.emptyList(); - ClusterStateTaskExecutor.ClusterTasksResult result = + List tasks = Collections.emptyList(); + ClusterStateTaskExecutor.ClusterTasksResult result = executor.execute(clusterState, tasks); assertTasksSuccessful(tasks, result, clusterState, false); } @@ -96,35 +104,35 @@ public class ShardFailedClusterStateTaskExecutorTests extends ESAllocationTestCa public void testDuplicateFailuresAreOkay() throws Exception { String reason = "test duplicate failures are okay"; ClusterState currentState = createClusterStateWithStartedShards(reason); - List tasks = createExistingShards(currentState, reason); - ClusterStateTaskExecutor.ClusterTasksResult result = executor.execute(currentState, tasks); + List tasks = createExistingShards(currentState, reason); + ClusterStateTaskExecutor.ClusterTasksResult result = executor.execute(currentState, tasks); assertTasksSuccessful(tasks, result, clusterState, true); } public void testNonExistentShardsAreMarkedAsSuccessful() throws Exception { String reason = "test non existent shards are marked as successful"; ClusterState currentState = createClusterStateWithStartedShards(reason); - List tasks = createNonExistentShards(currentState, reason); - ClusterStateTaskExecutor.ClusterTasksResult result = executor.execute(clusterState, tasks); + List tasks = createNonExistentShards(currentState, reason); + ClusterStateTaskExecutor.ClusterTasksResult result = executor.execute(clusterState, tasks); assertTasksSuccessful(tasks, result, clusterState, false); } public void testTriviallySuccessfulTasksBatchedWithFailingTasks() throws Exception { String reason = "test trivially successful tasks batched with failing tasks"; ClusterState currentState = createClusterStateWithStartedShards(reason); - List failingTasks = createExistingShards(currentState, reason); - List nonExistentTasks = createNonExistentShards(currentState, reason); + List failingTasks = createExistingShards(currentState, reason); + List nonExistentTasks = createNonExistentShards(currentState, reason); ShardStateAction.ShardFailedClusterStateTaskExecutor failingExecutor = new ShardStateAction.ShardFailedClusterStateTaskExecutor(allocationService, null, logger) { @Override ClusterState applyFailedShards(ClusterState currentState, List failedShards, List staleShards) { throw new RuntimeException("simulated applyFailedShards failure"); } }; - List tasks = new ArrayList<>(); + List tasks = new ArrayList<>(); tasks.addAll(failingTasks); tasks.addAll(nonExistentTasks); - ClusterStateTaskExecutor.ClusterTasksResult result = failingExecutor.execute(currentState, tasks); - Map taskResultMap = + ClusterStateTaskExecutor.ClusterTasksResult result = failingExecutor.execute(currentState, tasks); + Map taskResultMap = failingTasks.stream().collect(Collectors.toMap(Function.identity(), task -> ClusterStateTaskExecutor.TaskResult.failure(new RuntimeException("simulated applyFailedShards failure")))); taskResultMap.putAll(nonExistentTasks.stream().collect(Collectors.toMap(Function.identity(), task -> ClusterStateTaskExecutor.TaskResult.success()))); assertTaskResults(taskResultMap, result, currentState, false); @@ -133,23 +141,47 @@ public class ShardFailedClusterStateTaskExecutorTests extends ESAllocationTestCa public void testIllegalShardFailureRequests() throws Exception { String reason = "test illegal shard failure requests"; ClusterState currentState = createClusterStateWithStartedShards(reason); - List failingTasks = createExistingShards(currentState, reason); - List tasks = new ArrayList<>(); - for (ShardStateAction.ShardEntry failingTask : failingTasks) { + List failingTasks = createExistingShards(currentState, reason); + List tasks = new ArrayList<>(); + for (ShardStateAction.FailedShardEntry failingTask : failingTasks) { long primaryTerm = currentState.metaData().index(failingTask.shardId.getIndex()).primaryTerm(failingTask.shardId.id()); - tasks.add(new ShardStateAction.ShardEntry(failingTask.shardId, failingTask.allocationId, - randomIntBetween(1, (int) primaryTerm - 1), failingTask.message, failingTask.failure)); + tasks.add(new FailedShardEntry(failingTask.shardId, failingTask.allocationId, + randomIntBetween(1, (int) primaryTerm - 1), failingTask.message, failingTask.failure, randomBoolean())); } - Map taskResultMap = + Map taskResultMap = tasks.stream().collect(Collectors.toMap( Function.identity(), task -> ClusterStateTaskExecutor.TaskResult.failure(new ShardStateAction.NoLongerPrimaryShardException(task.shardId, "primary term [" + task.primaryTerm + "] did not match current primary term [" + currentState.metaData().index(task.shardId.getIndex()).primaryTerm(task.shardId.id()) + "]")))); - ClusterStateTaskExecutor.ClusterTasksResult result = executor.execute(currentState, tasks); + ClusterStateTaskExecutor.ClusterTasksResult result = executor.execute(currentState, tasks); assertTaskResults(taskResultMap, result, currentState, false); } + public void testMarkAsStaleWhenFailingShard() throws Exception { + final MockAllocationService allocation = createAllocationService(); + ClusterState clusterState = createClusterStateWithStartedShards("test markAsStale"); + clusterState = allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING)); + IndexShardRoutingTable shardRoutingTable = clusterState.routingTable().index(INDEX).shard(0); + long primaryTerm = clusterState.metaData().index(INDEX).primaryTerm(0); + final Set oldInSync = clusterState.metaData().index(INDEX).inSyncAllocationIds(0); + { + ShardStateAction.FailedShardEntry failShardOnly = new ShardStateAction.FailedShardEntry(shardRoutingTable.shardId(), + randomFrom(oldInSync), primaryTerm, "dummy", null, false); + ClusterState appliedState = executor.execute(clusterState, Arrays.asList(failShardOnly)).resultingState; + Set newInSync = appliedState.metaData().index(INDEX).inSyncAllocationIds(0); + assertThat(newInSync, equalTo(oldInSync)); + } + { + final String failedAllocationId = randomFrom(oldInSync); + ShardStateAction.FailedShardEntry failAndMarkAsStale = new ShardStateAction.FailedShardEntry(shardRoutingTable.shardId(), + failedAllocationId, primaryTerm, "dummy", null, true); + ClusterState appliedState = executor.execute(clusterState, Arrays.asList(failAndMarkAsStale)).resultingState; + Set newInSync = appliedState.metaData().index(INDEX).inSyncAllocationIds(0); + assertThat(Sets.difference(oldInSync, newInSync), contains(failedAllocationId)); + } + } + private ClusterState createClusterStateWithStartedShards(String reason) { int numberOfNodes = 1 + numberOfReplicas; DiscoveryNodes.Builder nodes = DiscoveryNodes.builder(); @@ -163,7 +195,7 @@ public class ShardFailedClusterStateTaskExecutorTests extends ESAllocationTestCa return allocationService.applyStartedShards(stateAfterReroute, routingNodes.shardsWithState(ShardRoutingState.INITIALIZING)); } - private List createExistingShards(ClusterState currentState, String reason) { + private List createExistingShards(ClusterState currentState, String reason) { List shards = new ArrayList<>(); GroupShardsIterator shardGroups = currentState.routingTable().allAssignedShardsGrouped(new String[] { INDEX }, true); for (ShardIterator shardIt : shardGroups) { @@ -181,7 +213,7 @@ public class ShardFailedClusterStateTaskExecutorTests extends ESAllocationTestCa return toTasks(currentState, shardsToFail, indexUUID, reason); } - private List createNonExistentShards(ClusterState currentState, String reason) { + private List createNonExistentShards(ClusterState currentState, String reason) { // add shards from a non-existent index String nonExistentIndexUUID = "non-existent"; Index index = new Index("non-existent", nonExistentIndexUUID); @@ -195,14 +227,15 @@ public class ShardFailedClusterStateTaskExecutorTests extends ESAllocationTestCa nonExistentShards.add(nonExistentShardRouting(index, nodeIds, false)); } - List existingShards = createExistingShards(currentState, reason); - List shardsWithMismatchedAllocationIds = new ArrayList<>(); - for (ShardStateAction.ShardEntry existingShard : existingShards) { - shardsWithMismatchedAllocationIds.add(new ShardStateAction.ShardEntry(existingShard.shardId, UUIDs.randomBase64UUID(), 0L, existingShard.message, existingShard.failure)); + List existingShards = createExistingShards(currentState, reason); + List shardsWithMismatchedAllocationIds = new ArrayList<>(); + for (ShardStateAction.FailedShardEntry existingShard : existingShards) { + shardsWithMismatchedAllocationIds.add(new ShardStateAction.FailedShardEntry(existingShard.shardId, UUIDs.randomBase64UUID(), 0L, existingShard.message, existingShard.failure, randomBoolean())); } - List tasks = new ArrayList<>(); - nonExistentShards.forEach(shard -> tasks.add(new ShardStateAction.ShardEntry(shard.shardId(), shard.allocationId().getId(), 0L, reason, new CorruptIndexException("simulated", nonExistentIndexUUID)))); + List tasks = new ArrayList<>(); + nonExistentShards.forEach(shard -> tasks.add(new ShardStateAction.FailedShardEntry(shard.shardId(), shard.allocationId().getId(), 0L, + reason, new CorruptIndexException("simulated", nonExistentIndexUUID), randomBoolean()))); tasks.addAll(shardsWithMismatchedAllocationIds); return tasks; } @@ -214,26 +247,26 @@ public class ShardFailedClusterStateTaskExecutorTests extends ESAllocationTestCa } private static void assertTasksSuccessful( - List tasks, - ClusterStateTaskExecutor.ClusterTasksResult result, + List tasks, + ClusterStateTaskExecutor.ClusterTasksResult result, ClusterState clusterState, boolean clusterStateChanged ) { - Map taskResultMap = + Map taskResultMap = tasks.stream().collect(Collectors.toMap(Function.identity(), task -> ClusterStateTaskExecutor.TaskResult.success())); assertTaskResults(taskResultMap, result, clusterState, clusterStateChanged); } private static void assertTaskResults( - Map taskResultMap, - ClusterStateTaskExecutor.ClusterTasksResult result, + Map taskResultMap, + ClusterStateTaskExecutor.ClusterTasksResult result, ClusterState clusterState, boolean clusterStateChanged ) { // there should be as many task results as tasks assertEquals(taskResultMap.size(), result.executionResults.size()); - for (Map.Entry entry : taskResultMap.entrySet()) { + for (Map.Entry entry : taskResultMap.entrySet()) { // every task should have a corresponding task result assertTrue(result.executionResults.containsKey(entry.getKey())); @@ -242,7 +275,7 @@ public class ShardFailedClusterStateTaskExecutorTests extends ESAllocationTestCa } List shards = clusterState.getRoutingTable().allShards(); - for (Map.Entry entry : taskResultMap.entrySet()) { + for (Map.Entry entry : taskResultMap.entrySet()) { if (entry.getValue().isSuccess()) { // the shard was successfully failed and so should not be in the routing table for (ShardRouting shard : shards) { @@ -267,15 +300,15 @@ public class ShardFailedClusterStateTaskExecutorTests extends ESAllocationTestCa } } - private static List toTasks(ClusterState currentState, List shards, String indexUUID, String message) { + private static List toTasks(ClusterState currentState, List shards, String indexUUID, String message) { return shards .stream() - .map(shard -> new ShardStateAction.ShardEntry( + .map(shard -> new ShardStateAction.FailedShardEntry( shard.shardId(), shard.allocationId().getId(), randomBoolean() ? 0L : currentState.metaData().getIndexSafe(shard.index()).primaryTerm(shard.id()), message, - new CorruptIndexException("simulated", indexUUID))) + new CorruptIndexException("simulated", indexUUID), randomBoolean())) .collect(Collectors.toList()); } } diff --git a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java index b1ff626fa39..9c8564694d1 100644 --- a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java @@ -20,10 +20,13 @@ package org.elasticsearch.cluster.action.shard; import org.apache.lucene.index.CorruptIndexException; +import org.elasticsearch.Version; import org.elasticsearch.action.support.replication.ClusterStateCreationUtils; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateObserver; import org.elasticsearch.cluster.NotMasterException; +import org.elasticsearch.cluster.action.shard.ShardStateAction.FailedShardEntry; +import org.elasticsearch.cluster.action.shard.ShardStateAction.StartedShardEntry; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.RoutingService; @@ -32,15 +35,22 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardsIterator; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.discovery.Discovery; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.VersionUtils; import org.elasticsearch.test.transport.CapturingTransport; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.NodeDisconnectedException; import org.elasticsearch.transport.NodeNotConnectedException; import org.elasticsearch.transport.TransportException; +import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportService; import org.junit.After; @@ -48,6 +58,8 @@ import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; +import java.io.IOException; +import java.util.UUID; import java.util.Collections; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -63,6 +75,7 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; public class ShardStateActionTests extends ESTestCase { private static ThreadPool THREAD_POOL; @@ -90,9 +103,9 @@ public class ShardStateActionTests extends ESTestCase { } @Override - protected void waitForNewMasterAndRetry(String actionName, ClusterStateObserver observer, ShardEntry shardEntry, Listener listener, Predicate changePredicate) { + protected void waitForNewMasterAndRetry(String actionName, ClusterStateObserver observer, TransportRequest request, Listener listener, Predicate changePredicate) { onBeforeWaitForNewMasterAndRetry.run(); - super.waitForNewMasterAndRetry(actionName, observer, shardEntry, listener, changePredicate); + super.waitForNewMasterAndRetry(actionName, observer, request, listener, changePredicate); onAfterWaitForNewMasterAndRetry.run(); } } @@ -160,8 +173,8 @@ public class ShardStateActionTests extends ESTestCase { CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear(); assertEquals(1, capturedRequests.length); // the request is a shard failed request - assertThat(capturedRequests[0].request, is(instanceOf(ShardStateAction.ShardEntry.class))); - ShardStateAction.ShardEntry shardEntry = (ShardStateAction.ShardEntry) capturedRequests[0].request; + assertThat(capturedRequests[0].request, is(instanceOf(ShardStateAction.FailedShardEntry.class))); + ShardStateAction.FailedShardEntry shardEntry = (ShardStateAction.FailedShardEntry) capturedRequests[0].request; // for the right shard assertEquals(shardEntry.shardId, shardRouting.shardId()); assertEquals(shardEntry.allocationId, shardRouting.allocationId().getId()); @@ -342,7 +355,7 @@ public class ShardStateActionTests extends ESTestCase { long primaryTerm = clusterService.state().metaData().index(index).primaryTerm(failedShard.id()); assertThat(primaryTerm, greaterThanOrEqualTo(1L)); - shardStateAction.remoteShardFailed(failedShard.shardId(), failedShard.allocationId().getId(), primaryTerm + 1, "test", + shardStateAction.remoteShardFailed(failedShard.shardId(), failedShard.allocationId().getId(), primaryTerm + 1, randomBoolean(), "test", getSimulatedFailure(), new ShardStateAction.Listener() { @Override public void onSuccess() { @@ -407,4 +420,36 @@ public class ShardStateActionTests extends ESTestCase { private Exception getSimulatedFailure() { return new CorruptIndexException("simulated", (String) null); } + + public void testShardEntryBWCSerialize() throws Exception { + final Version bwcVersion = randomValueOtherThanMany( + version -> version.onOrAfter(Version.V_7_0_0_alpha1), () -> VersionUtils.randomVersion(random())); + final ShardId shardId = new ShardId(randomRealisticUnicodeOfLengthBetween(10, 100), UUID.randomUUID().toString(), between(0, 1000)); + final String allocationId = randomRealisticUnicodeOfCodepointLengthBetween(10, 100); + final String reason = randomRealisticUnicodeOfCodepointLengthBetween(10, 100); + try (StreamInput in = serialize(new StartedShardEntry(shardId, allocationId, reason), bwcVersion).streamInput()) { + in.setVersion(bwcVersion); + final FailedShardEntry failedShardEntry = new FailedShardEntry(in); + assertThat(failedShardEntry.shardId, equalTo(shardId)); + assertThat(failedShardEntry.allocationId, equalTo(allocationId)); + assertThat(failedShardEntry.message, equalTo(reason)); + assertThat(failedShardEntry.failure, nullValue()); + assertThat(failedShardEntry.markAsStale, equalTo(true)); + } + try (StreamInput in = serialize(new FailedShardEntry(shardId, allocationId, 0L, reason, null, false), bwcVersion).streamInput()) { + in.setVersion(bwcVersion); + final StartedShardEntry startedShardEntry = new StartedShardEntry(in); + assertThat(startedShardEntry.shardId, equalTo(shardId)); + assertThat(startedShardEntry.allocationId, equalTo(allocationId)); + assertThat(startedShardEntry.message, equalTo(reason)); + } + } + + BytesReference serialize(Writeable writeable, Version version) throws IOException { + try (BytesStreamOutput out = new BytesStreamOutput()) { + out.setVersion(version); + writeable.writeTo(out); + return out.bytes(); + } + } } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/PrimaryTermsTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/PrimaryTermsTests.java index 863a33b1327..65526896864 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/PrimaryTermsTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/PrimaryTermsTests.java @@ -144,7 +144,7 @@ public class PrimaryTermsTests extends ESAllocationTestCase { logger.info("failing primary shards {} for index [{}]", shardIdsToFail, index); List failedShards = new ArrayList<>(); for (int shard : shardIdsToFail) { - failedShards.add(new FailedShard(indexShardRoutingTable.shard(shard).primaryShard(), "test", null)); + failedShards.add(new FailedShard(indexShardRoutingTable.shard(shard).primaryShard(), "test", null, randomBoolean())); incrementPrimaryTerm(index, shard); // the primary failure should increment the primary term; } applyRerouteResult(allocationService.applyFailedShards(this.clusterState, failedShards,Collections.emptyList())); diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java index 18835e40eb3..d8f7f6552f9 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java @@ -254,7 +254,7 @@ public class UnassignedInfoTests extends ESAllocationTestCase { assertThat(clusterState.getRoutingNodes().unassigned().size() > 0, equalTo(false)); // fail shard ShardRouting shardToFail = clusterState.getRoutingNodes().shardsWithState(STARTED).get(0); - clusterState = allocation.applyFailedShards(clusterState, Collections.singletonList(new FailedShard(shardToFail, "test fail", null))); + clusterState = allocation.applyFailedShards(clusterState, Collections.singletonList(new FailedShard(shardToFail, "test fail", null, randomBoolean()))); // verify the reason and details assertThat(clusterState.getRoutingNodes().unassigned().size() > 0, equalTo(true)); assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(1)); diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedNodeRoutingTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedNodeRoutingTests.java index 4b941a6ce4a..8038d9b5e18 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedNodeRoutingTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedNodeRoutingTests.java @@ -167,7 +167,7 @@ public class FailedNodeRoutingTests extends ESAllocationTestCase { List shardsToFail = new ArrayList<>(); List failedPrimaries = randomSubsetOf(primaries); failedPrimaries.stream().forEach(sr -> { - shardsToFail.add(new FailedShard(randomFrom(sr), "failed primary", new Exception())); + shardsToFail.add(new FailedShard(randomFrom(sr), "failed primary", new Exception(), randomBoolean())); }); logger.info("--> state before failing shards: {}", state); diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java index 2eedeba63f3..1fa1ff3a154 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java @@ -117,7 +117,7 @@ public class FailedShardsRoutingTests extends ESAllocationTestCase { assertThat(clusterState.getRoutingNodes().node("node3").iterator().next().state(), equalTo(INITIALIZING)); logger.info("--> fail primary shard recovering instance on node3 being initialized"); - clusterState = allocation.applyFailedShard(clusterState, clusterState.getRoutingNodes().node("node3").iterator().next()); + clusterState = allocation.applyFailedShard(clusterState, clusterState.getRoutingNodes().node("node3").iterator().next(), randomBoolean()); assertThat(clusterState.getRoutingNodes().node(origPrimaryNodeId).iterator().next().state(), equalTo(STARTED)); assertThat(clusterState.getRoutingNodes().node("node3").size(), equalTo(0)); @@ -132,7 +132,7 @@ public class FailedShardsRoutingTests extends ESAllocationTestCase { assertThat(clusterState.getRoutingNodes().node("node3").iterator().next().state(), equalTo(INITIALIZING)); logger.info("--> fail primary shard recovering instance on node1 being relocated"); - clusterState = allocation.applyFailedShard(clusterState, clusterState.getRoutingNodes().node(origPrimaryNodeId).iterator().next()); + clusterState = allocation.applyFailedShard(clusterState, clusterState.getRoutingNodes().node(origPrimaryNodeId).iterator().next(), randomBoolean()); // check promotion of replica to primary assertThat(clusterState.getRoutingNodes().node(origReplicaNodeId).iterator().next().state(), equalTo(STARTED)); @@ -200,7 +200,7 @@ public class FailedShardsRoutingTests extends ESAllocationTestCase { logger.info("fail the primary shard, will have no place to be rerouted to (single node), so stays unassigned"); ShardRouting shardToFail = clusterState.routingTable().index("test").shard(0).primaryShard(); - newState = strategy.applyFailedShard(clusterState, shardToFail); + newState = strategy.applyFailedShard(clusterState, shardToFail, randomBoolean()); assertThat(newState, not(equalTo(clusterState))); clusterState = newState; @@ -249,7 +249,7 @@ public class FailedShardsRoutingTests extends ESAllocationTestCase { logger.info("fail the first shard, will have no place to be rerouted to (single node), so stays unassigned"); ShardRouting firstShard = clusterState.getRoutingNodes().node("node1").iterator().next(); - newState = strategy.applyFailedShard(clusterState, firstShard); + newState = strategy.applyFailedShard(clusterState, firstShard, randomBoolean()); assertThat(newState, not(equalTo(clusterState))); clusterState = newState; @@ -305,7 +305,7 @@ public class FailedShardsRoutingTests extends ESAllocationTestCase { logger.info("failing shard on node [{}]", failedNode); ShardRouting shardToFail = routingNodes.node(failedNode).iterator().next(); if (shardRoutingsToFail.contains(shardToFail) == false) { - failedShards.add(new FailedShard(shardToFail, null, null)); + failedShards.add(new FailedShard(shardToFail, null, null, randomBoolean())); failedNodes.add(failedNode); shardRoutingsToFail.add(shardToFail); } @@ -364,7 +364,7 @@ public class FailedShardsRoutingTests extends ESAllocationTestCase { logger.info("fail the first shard, will start INITIALIZING on the second node"); final ShardRouting firstShard = clusterState.getRoutingNodes().node(nodeHoldingPrimary).iterator().next(); - newState = strategy.applyFailedShard(clusterState, firstShard); + newState = strategy.applyFailedShard(clusterState, firstShard, randomBoolean()); assertThat(newState, not(equalTo(clusterState))); clusterState = newState; @@ -455,7 +455,7 @@ public class FailedShardsRoutingTests extends ESAllocationTestCase { logger.info("Fail the shards on node 3"); ShardRouting shardToFail = routingNodes.node("node3").iterator().next(); - newState = strategy.applyFailedShard(clusterState, shardToFail); + newState = strategy.applyFailedShard(clusterState, shardToFail, randomBoolean()); assertThat(newState, not(equalTo(clusterState))); clusterState = newState; routingNodes = clusterState.getRoutingNodes(); @@ -507,7 +507,7 @@ public class FailedShardsRoutingTests extends ESAllocationTestCase { // fail the primary shard, check replicas get removed as well... ShardRouting primaryShardToFail = clusterState.routingTable().index("test").shard(0).primaryShard(); - ClusterState newState = allocation.applyFailedShard(clusterState, primaryShardToFail); + ClusterState newState = allocation.applyFailedShard(clusterState, primaryShardToFail, randomBoolean()); assertThat(newState, not(equalTo(clusterState))); clusterState = newState; // the primary gets allocated on another node, replicas are initializing @@ -550,7 +550,7 @@ public class FailedShardsRoutingTests extends ESAllocationTestCase { // fail the primary shard, check one replica gets elected to primary, others become INITIALIZING (from it) ShardRouting primaryShardToFail = clusterState.routingTable().index("test").shard(0).primaryShard(); - ClusterState newState = allocation.applyFailedShard(clusterState, primaryShardToFail); + ClusterState newState = allocation.applyFailedShard(clusterState, primaryShardToFail, randomBoolean()); assertThat(newState, not(equalTo(clusterState))); clusterState = newState; assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(1)); @@ -620,7 +620,7 @@ public class FailedShardsRoutingTests extends ESAllocationTestCase { // fail the primary shard again and make sure the correct replica is promoted ShardRouting primaryShardToFail = clusterState.routingTable().index("test").shard(0).primaryShard(); - ClusterState newState = allocation.applyFailedShard(clusterState, primaryShardToFail); + ClusterState newState = allocation.applyFailedShard(clusterState, primaryShardToFail, randomBoolean()); assertThat(newState, not(equalTo(clusterState))); clusterState = newState; // the primary gets allocated on another node @@ -649,7 +649,7 @@ public class FailedShardsRoutingTests extends ESAllocationTestCase { // fail the primary shard again, and ensure the same thing happens ShardRouting secondPrimaryShardToFail = clusterState.routingTable().index("test").shard(0).primaryShard(); - newState = allocation.applyFailedShard(clusterState, secondPrimaryShardToFail); + newState = allocation.applyFailedShard(clusterState, secondPrimaryShardToFail, randomBoolean()); assertThat(newState, not(equalTo(clusterState))); clusterState = newState; // the primary gets allocated on another node diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/InSyncAllocationIdTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/InSyncAllocationIdTests.java index 616eff4381c..5f39336569f 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/InSyncAllocationIdTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/InSyncAllocationIdTests.java @@ -23,7 +23,7 @@ import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ESAllocationTestCase; import org.elasticsearch.cluster.action.shard.ShardStateAction; -import org.elasticsearch.cluster.action.shard.ShardStateAction.ShardEntry; +import org.elasticsearch.cluster.action.shard.ShardStateAction.FailedShardEntry; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNodes; @@ -135,7 +135,7 @@ public class InSyncAllocationIdTests extends ESAllocationTestCase { logger.info("fail primary shard"); ShardRouting startedPrimary = clusterState.getRoutingNodes().shardsWithState(STARTED).get(0); - clusterState = allocation.applyFailedShard(clusterState, startedPrimary); + clusterState = allocation.applyFailedShard(clusterState, startedPrimary, true); assertThat(clusterState.getRoutingTable().shardsWithState(STARTED).size(), equalTo(0)); assertEquals(Collections.singleton(startedPrimary.allocationId().getId()), @@ -167,7 +167,7 @@ public class InSyncAllocationIdTests extends ESAllocationTestCase { new ShardStateAction.ShardFailedClusterStateTaskExecutor(allocation, null, logger); long primaryTerm = clusterState.metaData().index("test").primaryTerm(0); clusterState = failedClusterStateTaskExecutor.execute(clusterState, Arrays.asList( - new ShardEntry(shardRoutingTable.shardId(), replicaShard.allocationId().getId(), primaryTerm, "dummy", null)) + new FailedShardEntry(shardRoutingTable.shardId(), replicaShard.allocationId().getId(), primaryTerm, "dummy", null, true)) ).resultingState; assertThat(clusterState.metaData().index("test").inSyncAllocationIds(0).size(), equalTo(1)); @@ -189,11 +189,11 @@ public class InSyncAllocationIdTests extends ESAllocationTestCase { long primaryTerm = clusterState.metaData().index("test").primaryTerm(0); - List failureEntries = new ArrayList<>(); - failureEntries.add(new ShardEntry( - shardRoutingTable.shardId(), primaryShard.allocationId().getId(), 0L, "dummy", null)); - failureEntries.add(new ShardEntry( - shardRoutingTable.shardId(), replicaShard.allocationId().getId(), primaryTerm, "dummy", null)); + List failureEntries = new ArrayList<>(); + failureEntries.add(new FailedShardEntry( + shardRoutingTable.shardId(), primaryShard.allocationId().getId(), 0L, "dummy", null, true)); + failureEntries.add(new FailedShardEntry( + shardRoutingTable.shardId(), replicaShard.allocationId().getId(), primaryTerm, "dummy", null, true)); Collections.shuffle(failureEntries, random()); logger.info("Failing {}", failureEntries); @@ -333,8 +333,8 @@ public class InSyncAllocationIdTests extends ESAllocationTestCase { assertEquals(inSyncSet, clusterState.metaData().index("test").inSyncAllocationIds(0)); logger.info("fail primary shard"); - clusterState = failedClusterStateTaskExecutor.execute(clusterState, Collections.singletonList(new ShardEntry( - shardRoutingTable.shardId(), primaryShard.allocationId().getId(), 0L, "dummy", null))).resultingState; + clusterState = failedClusterStateTaskExecutor.execute(clusterState, Collections.singletonList(new FailedShardEntry( + shardRoutingTable.shardId(), primaryShard.allocationId().getId(), 0L, "dummy", null, true))).resultingState; assertThat(clusterState.routingTable().index("test").shard(0).assignedShards().size(), equalTo(0)); // in-sync allocation ids should not be updated diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/MaxRetryAllocationDeciderTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/MaxRetryAllocationDeciderTests.java index 797e1adc12c..994ee8f1438 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/MaxRetryAllocationDeciderTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/MaxRetryAllocationDeciderTests.java @@ -91,7 +91,7 @@ public class MaxRetryAllocationDeciderTests extends ESAllocationTestCase { for (int i = 0; i < retries-1; i++) { List failedShards = Collections.singletonList( new FailedShard(routingTable.index("idx").shard(0).shards().get(0), "boom" + i, - new UnsupportedOperationException())); + new UnsupportedOperationException(), randomBoolean())); ClusterState newState = strategy.applyFailedShards(clusterState, failedShards); assertThat(newState, not(equalTo(clusterState))); clusterState = newState; @@ -104,7 +104,7 @@ public class MaxRetryAllocationDeciderTests extends ESAllocationTestCase { // now we go and check that we are actually stick to unassigned on the next failure List failedShards = Collections.singletonList( new FailedShard(routingTable.index("idx").shard(0).shards().get(0), "boom", - new UnsupportedOperationException())); + new UnsupportedOperationException(), randomBoolean())); ClusterState newState = strategy.applyFailedShards(clusterState, failedShards); assertThat(newState, not(equalTo(clusterState))); clusterState = newState; @@ -130,7 +130,7 @@ public class MaxRetryAllocationDeciderTests extends ESAllocationTestCase { for (int i = 0; i < retries-1; i++) { failedShards = Collections.singletonList( new FailedShard(routingTable.index("idx").shard(0).shards().get(0), "boom", - new UnsupportedOperationException())); + new UnsupportedOperationException(), randomBoolean())); newState = strategy.applyFailedShards(clusterState, failedShards); assertThat(newState, not(equalTo(clusterState))); @@ -145,7 +145,7 @@ public class MaxRetryAllocationDeciderTests extends ESAllocationTestCase { // now we go and check that we are actually stick to unassigned on the next failure failedShards = Collections.singletonList( new FailedShard(routingTable.index("idx").shard(0).shards().get(0), "boom", - new UnsupportedOperationException())); + new UnsupportedOperationException(), randomBoolean())); newState = strategy.applyFailedShards(clusterState, failedShards); assertThat(newState, not(equalTo(clusterState))); clusterState = newState; @@ -164,7 +164,7 @@ public class MaxRetryAllocationDeciderTests extends ESAllocationTestCase { for (int i = 0; i < retries-1; i++) { List failedShards = Collections.singletonList( new FailedShard(routingTable.index("idx").shard(0).shards().get(0), "boom" + i, - new UnsupportedOperationException())); + new UnsupportedOperationException(), randomBoolean())); ClusterState newState = strategy.applyFailedShards(clusterState, failedShards); assertThat(newState, not(equalTo(clusterState))); clusterState = newState; @@ -182,7 +182,7 @@ public class MaxRetryAllocationDeciderTests extends ESAllocationTestCase { { List failedShards = Collections.singletonList( new FailedShard(routingTable.index("idx").shard(0).shards().get(0), "boom", - new UnsupportedOperationException())); + new UnsupportedOperationException(), randomBoolean())); ClusterState newState = strategy.applyFailedShards(clusterState, failedShards); assertThat(newState, not(equalTo(clusterState))); clusterState = newState; @@ -231,7 +231,7 @@ public class MaxRetryAllocationDeciderTests extends ESAllocationTestCase { // now fail again and see if it has a new counter List failedShards = Collections.singletonList( new FailedShard(routingTable.index("idx").shard(0).shards().get(0), "ZOOOMG", - new UnsupportedOperationException())); + new UnsupportedOperationException(), randomBoolean())); newState = strategy.applyFailedShards(clusterState, failedShards); assertThat(newState, not(equalTo(clusterState))); clusterState = newState; diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/SingleShardNoReplicasRoutingTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/SingleShardNoReplicasRoutingTests.java index 8bd4b39d076..0668ba41524 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/SingleShardNoReplicasRoutingTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/SingleShardNoReplicasRoutingTests.java @@ -194,7 +194,7 @@ public class SingleShardNoReplicasRoutingTests extends ESAllocationTestCase { logger.info("Marking the shard as failed"); RoutingNodes routingNodes = clusterState.getRoutingNodes(); - newState = strategy.applyFailedShard(clusterState, routingNodes.node("node1").shardsWithState(INITIALIZING).get(0)); + newState = strategy.applyFailedShard(clusterState, routingNodes.node("node1").shardsWithState(INITIALIZING).get(0), randomBoolean()); assertThat(newState, not(equalTo(clusterState))); clusterState = newState; diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/FilterAllocationDeciderTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/FilterAllocationDeciderTests.java index 8381f2f960b..b1fa8346e2c 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/FilterAllocationDeciderTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/FilterAllocationDeciderTests.java @@ -66,7 +66,7 @@ public class FilterAllocationDeciderTests extends ESAllocationTestCase { // we can initally only allocate on node2 assertEquals(routingTable.index("idx").shard(0).shards().get(0).state(), INITIALIZING); assertEquals(routingTable.index("idx").shard(0).shards().get(0).currentNodeId(), "node2"); - routingTable = service.applyFailedShard(state, routingTable.index("idx").shard(0).shards().get(0)).routingTable(); + routingTable = service.applyFailedShard(state, routingTable.index("idx").shard(0).shards().get(0), randomBoolean()).routingTable(); state = ClusterState.builder(state).routingTable(routingTable).build(); assertEquals(routingTable.index("idx").shard(0).shards().get(0).state(), UNASSIGNED); assertNull(routingTable.index("idx").shard(0).shards().get(0).currentNodeId()); @@ -114,7 +114,7 @@ public class FilterAllocationDeciderTests extends ESAllocationTestCase { state = service.deassociateDeadNodes( ClusterState.builder(state).nodes(DiscoveryNodes.builder(state.nodes()).remove("node1")).build(), true, "test"); - state = service.applyFailedShard(state, routingTable.index("idx").shard(0).primaryShard()); + state = service.applyFailedShard(state, routingTable.index("idx").shard(0).primaryShard(), randomBoolean()); // now bring back node1 and see it's assigned state = service.reroute( diff --git a/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java b/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java index dd10dd2747d..3f7f0583593 100644 --- a/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java +++ b/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java @@ -46,7 +46,8 @@ import org.elasticsearch.cluster.ClusterStateTaskExecutor.ClusterTasksResult; import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.EmptyClusterInfoService; import org.elasticsearch.cluster.action.shard.ShardStateAction; -import org.elasticsearch.cluster.action.shard.ShardStateAction.ShardEntry; +import org.elasticsearch.cluster.action.shard.ShardStateAction.StartedShardEntry; +import org.elasticsearch.cluster.action.shard.ShardStateAction.FailedShardEntry; import org.elasticsearch.cluster.metadata.AliasValidator; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; @@ -221,16 +222,16 @@ public class ClusterStateChanges extends AbstractComponent { } public ClusterState applyFailedShards(ClusterState clusterState, List failedShards) { - List entries = failedShards.stream().map(failedShard -> - new ShardEntry(failedShard.getRoutingEntry().shardId(), failedShard.getRoutingEntry().allocationId().getId(), - 0L, failedShard.getMessage(), failedShard.getFailure())) + List entries = failedShards.stream().map(failedShard -> + new FailedShardEntry(failedShard.getRoutingEntry().shardId(), failedShard.getRoutingEntry().allocationId().getId(), + 0L, failedShard.getMessage(), failedShard.getFailure(), failedShard.markAsStale())) .collect(Collectors.toList()); return runTasks(shardFailedClusterStateTaskExecutor, clusterState, entries); } public ClusterState applyStartedShards(ClusterState clusterState, List startedShards) { - List entries = startedShards.stream().map(startedShard -> - new ShardEntry(startedShard.shardId(), startedShard.allocationId().getId(), 0L, "shard started", null)) + List entries = startedShards.stream().map(startedShard -> + new StartedShardEntry(startedShard.shardId(), startedShard.allocationId().getId(), "shard started")) .collect(Collectors.toList()); return runTasks(shardStartedClusterStateTaskExecutor, clusterState, entries); } diff --git a/server/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java b/server/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java index d76429c53f3..e4d73ce0f41 100644 --- a/server/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java +++ b/server/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java @@ -333,7 +333,7 @@ public class IndicesClusterStateServiceRandomUpdatesTests extends AbstractIndice if (persistedShardRouting.initializing() && randomBoolean()) { startedShards.add(persistedShardRouting); } else if (rarely()) { - failedShards.add(new FailedShard(persistedShardRouting, "fake shard failure", new Exception())); + failedShards.add(new FailedShard(persistedShardRouting, "fake shard failure", new Exception(), randomBoolean())); } } }