diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java index ec3dcd94d30..28b8f0826cd 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java @@ -387,14 +387,14 @@ public abstract class TransportWriteAction< logger.warn((org.apache.logging.log4j.util.Supplier) () -> new ParameterizedMessage("[{}] {}", replica.shardId(), message), exception); - shardStateAction.remoteShardFailed(replica.shardId(), replica.allocationId().getId(), primaryTerm, message, exception, + shardStateAction.remoteShardFailed(replica.shardId(), replica.allocationId().getId(), primaryTerm, true, message, exception, createListener(onSuccess, onPrimaryDemoted, onIgnoredFailure)); } @Override public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, Runnable onSuccess, Consumer onPrimaryDemoted, Consumer onIgnoredFailure) { - shardStateAction.remoteShardFailed(shardId, allocationId, primaryTerm, "mark copy as stale", null, + shardStateAction.remoteShardFailed(shardId, allocationId, primaryTerm, true, "mark copy as stale", null, createListener(onSuccess, onPrimaryDemoted, onIgnoredFailure)); } diff --git a/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java b/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java index 543118a172f..f6e0fa116d6 100644 --- a/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java +++ b/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java @@ -24,6 +24,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterStateTaskExecutor; import org.elasticsearch.cluster.ClusterState; @@ -88,21 +89,21 @@ public class ShardStateAction extends AbstractComponent { this.clusterService = clusterService; this.threadPool = threadPool; - transportService.registerRequestHandler(SHARD_STARTED_ACTION_NAME, ShardEntry::new, ThreadPool.Names.SAME, new ShardStartedTransportHandler(clusterService, new ShardStartedClusterStateTaskExecutor(allocationService, logger), logger)); - transportService.registerRequestHandler(SHARD_FAILED_ACTION_NAME, ShardEntry::new, ThreadPool.Names.SAME, new ShardFailedTransportHandler(clusterService, new ShardFailedClusterStateTaskExecutor(allocationService, routingService, logger), logger)); + transportService.registerRequestHandler(SHARD_STARTED_ACTION_NAME, ThreadPool.Names.SAME, StartedShardEntry::new, new ShardStartedTransportHandler(clusterService, new ShardStartedClusterStateTaskExecutor(allocationService, logger), logger)); + transportService.registerRequestHandler(SHARD_FAILED_ACTION_NAME, ThreadPool.Names.SAME, FailedShardEntry::new, new ShardFailedTransportHandler(clusterService, new ShardFailedClusterStateTaskExecutor(allocationService, routingService, logger), logger)); } - private void sendShardAction(final String actionName, final ClusterState currentState, final ShardEntry shardEntry, final Listener listener) { + private void sendShardAction(final String actionName, final ClusterState currentState, final TransportRequest request, final Listener listener) { ClusterStateObserver observer = new ClusterStateObserver(currentState, clusterService, null, logger, threadPool.getThreadContext()); DiscoveryNode masterNode = currentState.nodes().getMasterNode(); Predicate changePredicate = MasterNodeChangePredicate.build(currentState); if (masterNode == null) { - logger.warn("{} no master known for action [{}] for shard entry [{}]", shardEntry.shardId, actionName, shardEntry); - waitForNewMasterAndRetry(actionName, observer, shardEntry, listener, changePredicate); + logger.warn("no master known for action [{}] for shard entry [{}]", actionName, request); + waitForNewMasterAndRetry(actionName, observer, request, listener, changePredicate); } else { - logger.debug("{} sending [{}] to [{}] for shard entry [{}]", shardEntry.shardId, actionName, masterNode.getId(), shardEntry); + logger.debug("sending [{}] to [{}] for shard entry [{}]", actionName, masterNode.getId(), request); transportService.sendRequest(masterNode, - actionName, shardEntry, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) { + actionName, request, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) { @Override public void handleResponse(TransportResponse.Empty response) { listener.onSuccess(); @@ -111,9 +112,9 @@ public class ShardStateAction extends AbstractComponent { @Override public void handleException(TransportException exp) { if (isMasterChannelException(exp)) { - waitForNewMasterAndRetry(actionName, observer, shardEntry, listener, changePredicate); + waitForNewMasterAndRetry(actionName, observer, request, listener, changePredicate); } else { - logger.warn((Supplier) () -> new ParameterizedMessage("{} unexpected failure while sending request [{}] to [{}] for shard entry [{}]", shardEntry.shardId, actionName, masterNode, shardEntry), exp); + logger.warn(new ParameterizedMessage("unexpected failure while sending request [{}] to [{}] for shard entry [{}]", actionName, masterNode, request), exp); listener.onFailure(exp instanceof RemoteTransportException ? (Exception) (exp.getCause() instanceof Exception ? exp.getCause() : new ElasticsearchException(exp.getCause())) : exp); } } @@ -139,13 +140,15 @@ public class ShardStateAction extends AbstractComponent { * @param shardId shard id of the shard to fail * @param allocationId allocation id of the shard to fail * @param primaryTerm the primary term associated with the primary shard that is failing the shard. Must be strictly positive. + * @param markAsStale whether or not to mark a failing shard as stale (eg. removing from in-sync set) when failing the shard. * @param message the reason for the failure * @param failure the underlying cause of the failure * @param listener callback upon completion of the request */ - public void remoteShardFailed(final ShardId shardId, String allocationId, long primaryTerm, final String message, @Nullable final Exception failure, Listener listener) { + public void remoteShardFailed(final ShardId shardId, String allocationId, long primaryTerm, boolean markAsStale, final String message, @Nullable final Exception failure, Listener listener) { assert primaryTerm > 0L : "primary term should be strictly positive"; - shardFailed(shardId, allocationId, primaryTerm, message, failure, listener, clusterService.state()); + FailedShardEntry shardEntry = new FailedShardEntry(shardId, allocationId, primaryTerm, message, failure, markAsStale); + sendShardAction(SHARD_FAILED_ACTION_NAME, clusterService.state(), shardEntry, listener); } /** @@ -160,29 +163,24 @@ public class ShardStateAction extends AbstractComponent { */ public void localShardFailed(final ShardRouting shardRouting, final String message, @Nullable final Exception failure, Listener listener, final ClusterState currentState) { - shardFailed(shardRouting.shardId(), shardRouting.allocationId().getId(), 0L, message, failure, listener, currentState); - } - - private void shardFailed(final ShardId shardId, String allocationId, long primaryTerm, final String message, - @Nullable final Exception failure, Listener listener, ClusterState currentState) { - ShardEntry shardEntry = new ShardEntry(shardId, allocationId, primaryTerm, message, failure); + FailedShardEntry shardEntry = new FailedShardEntry(shardRouting.shardId(), shardRouting.allocationId().getId(), 0L, message, failure, true); sendShardAction(SHARD_FAILED_ACTION_NAME, currentState, shardEntry, listener); } // visible for testing - protected void waitForNewMasterAndRetry(String actionName, ClusterStateObserver observer, ShardEntry shardEntry, Listener listener, Predicate changePredicate) { + protected void waitForNewMasterAndRetry(String actionName, ClusterStateObserver observer, TransportRequest request, Listener listener, Predicate changePredicate) { observer.waitForNextChange(new ClusterStateObserver.Listener() { @Override public void onNewClusterState(ClusterState state) { if (logger.isTraceEnabled()) { - logger.trace("new cluster state [{}] after waiting for master election to fail shard entry [{}]", state, shardEntry); + logger.trace("new cluster state [{}] after waiting for master election for shard entry [{}]", state, request); } - sendShardAction(actionName, state, shardEntry, listener); + sendShardAction(actionName, state, request, listener); } @Override public void onClusterServiceClose() { - logger.warn((Supplier) () -> new ParameterizedMessage("{} node closed while execution action [{}] for shard entry [{}]", shardEntry.shardId, actionName, shardEntry), shardEntry.failure); + logger.warn("node closed while execution action [{}] for shard entry [{}]", actionName, request); listener.onFailure(new NodeClosedException(clusterService.localNode())); } @@ -194,7 +192,7 @@ public class ShardStateAction extends AbstractComponent { }, changePredicate); } - private static class ShardFailedTransportHandler implements TransportRequestHandler { + private static class ShardFailedTransportHandler implements TransportRequestHandler { private final ClusterService clusterService; private final ShardFailedClusterStateTaskExecutor shardFailedClusterStateTaskExecutor; private final Logger logger; @@ -206,7 +204,7 @@ public class ShardStateAction extends AbstractComponent { } @Override - public void messageReceived(ShardEntry request, TransportChannel channel) throws Exception { + public void messageReceived(FailedShardEntry request, TransportChannel channel) throws Exception { logger.warn((Supplier) () -> new ParameterizedMessage("{} received shard failed for {}", request.shardId, request), request.failure); clusterService.submitStateUpdateTask( "shard-failed", @@ -248,7 +246,7 @@ public class ShardStateAction extends AbstractComponent { } } - public static class ShardFailedClusterStateTaskExecutor implements ClusterStateTaskExecutor { + public static class ShardFailedClusterStateTaskExecutor implements ClusterStateTaskExecutor { private final AllocationService allocationService; private final RoutingService routingService; private final Logger logger; @@ -260,13 +258,13 @@ public class ShardStateAction extends AbstractComponent { } @Override - public ClusterTasksResult execute(ClusterState currentState, List tasks) throws Exception { - ClusterTasksResult.Builder batchResultBuilder = ClusterTasksResult.builder(); - List tasksToBeApplied = new ArrayList<>(); + public ClusterTasksResult execute(ClusterState currentState, List tasks) throws Exception { + ClusterTasksResult.Builder batchResultBuilder = ClusterTasksResult.builder(); + List tasksToBeApplied = new ArrayList<>(); List failedShardsToBeApplied = new ArrayList<>(); List staleShardsToBeApplied = new ArrayList<>(); - for (ShardEntry task : tasks) { + for (FailedShardEntry task : tasks) { IndexMetaData indexMetaData = currentState.metaData().index(task.shardId.getIndex()); if (indexMetaData == null) { // tasks that correspond to non-existent indices are marked as successful @@ -314,7 +312,7 @@ public class ShardStateAction extends AbstractComponent { // failing a shard also possibly marks it as stale (see IndexMetaDataUpdater) logger.debug("{} failing shard {} (shard failed task: [{}])", task.shardId, matched, task); tasksToBeApplied.add(task); - failedShardsToBeApplied.add(new FailedShard(matched, task.message, task.failure)); + failedShardsToBeApplied.add(new FailedShard(matched, task.message, task.failure, task.markAsStale)); } } } @@ -352,15 +350,82 @@ public class ShardStateAction extends AbstractComponent { } } + public static class FailedShardEntry extends TransportRequest { + final ShardId shardId; + final String allocationId; + final long primaryTerm; + final String message; + final Exception failure; + final boolean markAsStale; + + FailedShardEntry(StreamInput in) throws IOException { + super(in); + shardId = ShardId.readShardId(in); + allocationId = in.readString(); + primaryTerm = in.readVLong(); + message = in.readString(); + failure = in.readException(); + if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) { + markAsStale = in.readBoolean(); + } else { + markAsStale = true; + } + } + + public FailedShardEntry(ShardId shardId, String allocationId, long primaryTerm, String message, Exception failure, boolean markAsStale) { + this.shardId = shardId; + this.allocationId = allocationId; + this.primaryTerm = primaryTerm; + this.message = message; + this.failure = failure; + this.markAsStale = markAsStale; + } + + public ShardId getShardId() { + return shardId; + } + + public String getAllocationId() { + return allocationId; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + shardId.writeTo(out); + out.writeString(allocationId); + out.writeVLong(primaryTerm); + out.writeString(message); + out.writeException(failure); + if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) { + out.writeBoolean(markAsStale); + } + } + + @Override + public String toString() { + List components = new ArrayList<>(6); + components.add("shard id [" + shardId + "]"); + components.add("allocation id [" + allocationId + "]"); + components.add("primary term [" + primaryTerm + "]"); + components.add("message [" + message + "]"); + if (failure != null) { + components.add("failure [" + ExceptionsHelper.detailedMessage(failure) + "]"); + } + components.add("markAsStale [" + markAsStale + "]"); + return String.join(", ", components); + } + } + public void shardStarted(final ShardRouting shardRouting, final String message, Listener listener) { shardStarted(shardRouting, message, listener, clusterService.state()); } public void shardStarted(final ShardRouting shardRouting, final String message, Listener listener, ClusterState currentState) { - ShardEntry shardEntry = new ShardEntry(shardRouting.shardId(), shardRouting.allocationId().getId(), 0L, message, null); + StartedShardEntry shardEntry = new StartedShardEntry(shardRouting.shardId(), shardRouting.allocationId().getId(), message); sendShardAction(SHARD_STARTED_ACTION_NAME, currentState, shardEntry, listener); } - private static class ShardStartedTransportHandler implements TransportRequestHandler { + private static class ShardStartedTransportHandler implements TransportRequestHandler { private final ClusterService clusterService; private final ShardStartedClusterStateTaskExecutor shardStartedClusterStateTaskExecutor; private final Logger logger; @@ -372,7 +437,7 @@ public class ShardStateAction extends AbstractComponent { } @Override - public void messageReceived(ShardEntry request, TransportChannel channel) throws Exception { + public void messageReceived(StartedShardEntry request, TransportChannel channel) throws Exception { logger.debug("{} received shard started for [{}]", request.shardId, request); clusterService.submitStateUpdateTask( "shard-started " + request, @@ -384,7 +449,7 @@ public class ShardStateAction extends AbstractComponent { } } - public static class ShardStartedClusterStateTaskExecutor implements ClusterStateTaskExecutor, ClusterStateTaskListener { + public static class ShardStartedClusterStateTaskExecutor implements ClusterStateTaskExecutor, ClusterStateTaskListener { private final AllocationService allocationService; private final Logger logger; @@ -394,14 +459,12 @@ public class ShardStateAction extends AbstractComponent { } @Override - public ClusterTasksResult execute(ClusterState currentState, List tasks) throws Exception { - ClusterTasksResult.Builder builder = ClusterTasksResult.builder(); - List tasksToBeApplied = new ArrayList<>(); + public ClusterTasksResult execute(ClusterState currentState, List tasks) throws Exception { + ClusterTasksResult.Builder builder = ClusterTasksResult.builder(); + List tasksToBeApplied = new ArrayList<>(); List shardRoutingsToBeApplied = new ArrayList<>(tasks.size()); Set seenShardRoutings = new HashSet<>(); // to prevent duplicates - for (ShardEntry task : tasks) { - assert task.primaryTerm == 0L : "shard is only started by itself: " + task; - + for (StartedShardEntry task : tasks) { ShardRouting matched = currentState.getRoutingTable().getByAllocationId(task.shardId, task.allocationId); if (matched == null) { // tasks that correspond to non-existent shards are marked as successful. The reason is that we resend shard started @@ -451,40 +514,30 @@ public class ShardStateAction extends AbstractComponent { } } - public static class ShardEntry extends TransportRequest { - ShardId shardId; - String allocationId; - long primaryTerm; - String message; - Exception failure; + public static class StartedShardEntry extends TransportRequest { + final ShardId shardId; + final String allocationId; + final String message; - public ShardEntry() { - } - - public ShardEntry(ShardId shardId, String allocationId, long primaryTerm, String message, @Nullable Exception failure) { - this.shardId = shardId; - this.allocationId = allocationId; - this.primaryTerm = primaryTerm; - this.message = message; - this.failure = failure; - } - - public ShardId getShardId() { - return shardId; - } - - public String getAllocationId() { - return allocationId; - } - - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); + StartedShardEntry(StreamInput in) throws IOException { + super(in); shardId = ShardId.readShardId(in); allocationId = in.readString(); - primaryTerm = in.readVLong(); - message = in.readString(); - failure = in.readException(); + if (in.getVersion().before(Version.V_7_0_0_alpha1)) { + final long primaryTerm = in.readVLong(); + assert primaryTerm == 0L : "shard is only started by itself: primary term [" + primaryTerm + "]"; + } + this.message = in.readString(); + if (in.getVersion().before(Version.V_7_0_0_alpha1)) { + final Exception ex = in.readException(); + assert ex == null : "started shard must not have failure [" + ex + "]"; + } + } + + public StartedShardEntry(ShardId shardId, String allocationId, String message) { + this.shardId = shardId; + this.allocationId = allocationId; + this.message = message; } @Override @@ -492,22 +545,19 @@ public class ShardStateAction extends AbstractComponent { super.writeTo(out); shardId.writeTo(out); out.writeString(allocationId); - out.writeVLong(primaryTerm); + if (out.getVersion().before(Version.V_7_0_0_alpha1)) { + out.writeVLong(0L); + } out.writeString(message); - out.writeException(failure); + if (out.getVersion().before(Version.V_7_0_0_alpha1)) { + out.writeException(null); + } } @Override public String toString() { - List components = new ArrayList<>(4); - components.add("shard id [" + shardId + "]"); - components.add("allocation id [" + allocationId + "]"); - components.add("primary term [" + primaryTerm + "]"); - components.add("message [" + message + "]"); - if (failure != null) { - components.add("failure [" + ExceptionsHelper.detailedMessage(failure) + "]"); - } - return String.join(", ", components); + return String.format(Locale.ROOT, "StartedShardEntry{shardId [%s], allocationId [%s], message [%s]}", + shardId, allocationId, message); } } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java index 2326274c36c..6dc405d00a3 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java @@ -138,8 +138,8 @@ public class AllocationService extends AbstractComponent { } // Used for testing - public ClusterState applyFailedShard(ClusterState clusterState, ShardRouting failedShard) { - return applyFailedShards(clusterState, singletonList(new FailedShard(failedShard, null, null)), emptyList()); + public ClusterState applyFailedShard(ClusterState clusterState, ShardRouting failedShard, boolean markAsStale) { + return applyFailedShards(clusterState, singletonList(new FailedShard(failedShard, null, null, markAsStale)), emptyList()); } // Used for testing @@ -185,6 +185,9 @@ public class AllocationService extends AbstractComponent { UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, message, failedShardEntry.getFailure(), failedAllocations + 1, currentNanoTime, System.currentTimeMillis(), false, AllocationStatus.NO_ATTEMPT); + if (failedShardEntry.markAsStale()) { + allocation.removeAllocationId(failedShard); + } routingNodes.failShard(logger, failedShard, unassignedInfo, indexMetaData, allocation.changes()); } else { logger.trace("{} shard routing failed in an earlier iteration (routing: {})", shardToFail.shardId(), shardToFail); diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/FailedShard.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/FailedShard.java index 9bf9fa86d18..f6d9a1bc119 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/FailedShard.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/FailedShard.java @@ -30,18 +30,20 @@ public class FailedShard { private final ShardRouting routingEntry; private final String message; private final Exception failure; + private final boolean markAsStale; - public FailedShard(ShardRouting routingEntry, String message, Exception failure) { + public FailedShard(ShardRouting routingEntry, String message, Exception failure, boolean markAsStale) { assert routingEntry.assignedToNode() : "only assigned shards can be failed " + routingEntry; this.routingEntry = routingEntry; this.message = message; this.failure = failure; + this.markAsStale = markAsStale; } @Override public String toString() { return "failed shard, shard " + routingEntry + ", message [" + message + "], failure [" + - ExceptionsHelper.detailedMessage(failure) + "]"; + ExceptionsHelper.detailedMessage(failure) + "], markAsStale [" + markAsStale + "]"; } /** @@ -66,4 +68,11 @@ public class FailedShard { public Exception getFailure() { return failure; } + + /** + * Whether or not to mark the shard as stale (eg. removing from in-sync set) when failing the shard. + */ + public boolean markAsStale() { + return markAsStale; + } } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/IndexMetaDataUpdater.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/IndexMetaDataUpdater.java index b24a961829d..c5cb6d2af5b 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/IndexMetaDataUpdater.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/IndexMetaDataUpdater.java @@ -72,19 +72,12 @@ public class IndexMetaDataUpdater extends RoutingChangesObserver.AbstractRouting @Override public void shardFailed(ShardRouting failedShard, UnassignedInfo unassignedInfo) { - if (failedShard.active() && unassignedInfo.getReason() != UnassignedInfo.Reason.NODE_LEFT) { - removeAllocationId(failedShard); - - if (failedShard.primary()) { - Updates updates = changes(failedShard.shardId()); - if (updates.firstFailedPrimary == null) { - // more than one primary can be failed (because of batching, primary can be failed, replica promoted and then failed...) - updates.firstFailedPrimary = failedShard; - } - } - } - if (failedShard.active() && failedShard.primary()) { + Updates updates = changes(failedShard.shardId()); + if (updates.firstFailedPrimary == null) { + // more than one primary can be failed (because of batching, primary can be failed, replica promoted and then failed...) + updates.firstFailedPrimary = failedShard; + } increasePrimaryTerm(failedShard.shardId()); } } @@ -286,8 +279,10 @@ public class IndexMetaDataUpdater extends RoutingChangesObserver.AbstractRouting /** * Remove allocation id of this shard from the set of in-sync shard copies */ - private void removeAllocationId(ShardRouting shardRouting) { - changes(shardRouting.shardId()).removedAllocationIds.add(shardRouting.allocationId().getId()); + void removeAllocationId(ShardRouting shardRouting) { + if (shardRouting.active()) { + changes(shardRouting.shardId()).removedAllocationIds.add(shardRouting.allocationId().getId()); + } } /** diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java index abc363931c1..e0be712a230 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java @@ -27,6 +27,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.RoutingChangesObserver; import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; import org.elasticsearch.cluster.routing.allocation.decider.Decision; import org.elasticsearch.common.collect.ImmutableOpenMap; @@ -222,6 +223,13 @@ public class RoutingAllocation { return unmodifiableSet(new HashSet<>(ignore)); } + /** + * Remove the allocation id of the provided shard from the set of in-sync shard copies + */ + public void removeAllocationId(ShardRouting shardRouting) { + indexMetaDataUpdater.removeAllocationId(shardRouting); + } + /** * Returns observer to use for changes made to the routing nodes */ diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/command/CancelAllocationCommand.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/command/CancelAllocationCommand.java index eae4739c127..afeb0e0dab1 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/command/CancelAllocationCommand.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/command/CancelAllocationCommand.java @@ -156,6 +156,8 @@ public class CancelAllocationCommand implements AllocationCommand { } routingNodes.failShard(Loggers.getLogger(CancelAllocationCommand.class), shardRouting, new UnassignedInfo(UnassignedInfo.Reason.REROUTE_CANCELLED, null), indexMetaData, allocation.changes()); + // TODO: We don't have to remove a cancelled shard from in-sync set once we have a strict resync implementation. + allocation.removeAllocationId(shardRouting); return new RerouteExplanation(this, allocation.decision(Decision.YES, "cancel_allocation_command", "shard " + shardId + " on node " + discoNode + " can be cancelled")); } diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/reroute/ClusterRerouteTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/reroute/ClusterRerouteTests.java index d3a0d12a853..6e34a751007 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/reroute/ClusterRerouteTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/reroute/ClusterRerouteTests.java @@ -120,7 +120,7 @@ public class ClusterRerouteTests extends ESAllocationTestCase { assertEquals(routingTable.index("idx").shard(0).shards().get(0).unassignedInfo().getNumFailedAllocations(), i); List failedShards = Collections.singletonList( new FailedShard(routingTable.index("idx").shard(0).shards().get(0), "boom" + i, - new UnsupportedOperationException())); + new UnsupportedOperationException(), randomBoolean())); newState = allocationService.applyFailedShards(clusterState, failedShards); assertThat(newState, not(equalTo(clusterState))); clusterState = newState; diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java index 47ce090d895..805553b4a61 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java @@ -315,7 +315,7 @@ public class TransportWriteActionTests extends ESTestCase { // A write replication action proxy should fail the shard assertEquals(1, shardFailedRequests.length); CapturingTransport.CapturedRequest shardFailedRequest = shardFailedRequests[0]; - ShardStateAction.ShardEntry shardEntry = (ShardStateAction.ShardEntry) shardFailedRequest.request; + ShardStateAction.FailedShardEntry shardEntry = (ShardStateAction.FailedShardEntry) shardFailedRequest.request; // the shard the request was sent to and the shard to be failed should be the same assertEquals(shardEntry.getShardId(), replica.shardId()); assertEquals(shardEntry.getAllocationId(), replica.allocationId().getId()); diff --git a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardFailedClusterStateTaskExecutorTests.java b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardFailedClusterStateTaskExecutorTests.java index 794f71786ca..89be94b4506 100644 --- a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardFailedClusterStateTaskExecutorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardFailedClusterStateTaskExecutorTests.java @@ -22,6 +22,7 @@ package org.elasticsearch.cluster.action.shard; import com.carrotsearch.hppc.cursors.ObjectCursor; import org.apache.lucene.index.CorruptIndexException; import org.elasticsearch.Version; +import org.elasticsearch.cluster.action.shard.ShardStateAction.FailedShardEntry; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateTaskExecutor; @@ -30,6 +31,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.GroupShardsIterator; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.ShardIterator; @@ -42,20 +44,26 @@ import org.elasticsearch.cluster.routing.allocation.StaleShard; import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; import org.junit.Before; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; +import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING; +import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.Matchers.contains; public class ShardFailedClusterStateTaskExecutorTests extends ESAllocationTestCase { @@ -87,8 +95,8 @@ public class ShardFailedClusterStateTaskExecutorTests extends ESAllocationTestCa } public void testEmptyTaskListProducesSameClusterState() throws Exception { - List tasks = Collections.emptyList(); - ClusterStateTaskExecutor.ClusterTasksResult result = + List tasks = Collections.emptyList(); + ClusterStateTaskExecutor.ClusterTasksResult result = executor.execute(clusterState, tasks); assertTasksSuccessful(tasks, result, clusterState, false); } @@ -96,35 +104,35 @@ public class ShardFailedClusterStateTaskExecutorTests extends ESAllocationTestCa public void testDuplicateFailuresAreOkay() throws Exception { String reason = "test duplicate failures are okay"; ClusterState currentState = createClusterStateWithStartedShards(reason); - List tasks = createExistingShards(currentState, reason); - ClusterStateTaskExecutor.ClusterTasksResult result = executor.execute(currentState, tasks); + List tasks = createExistingShards(currentState, reason); + ClusterStateTaskExecutor.ClusterTasksResult result = executor.execute(currentState, tasks); assertTasksSuccessful(tasks, result, clusterState, true); } public void testNonExistentShardsAreMarkedAsSuccessful() throws Exception { String reason = "test non existent shards are marked as successful"; ClusterState currentState = createClusterStateWithStartedShards(reason); - List tasks = createNonExistentShards(currentState, reason); - ClusterStateTaskExecutor.ClusterTasksResult result = executor.execute(clusterState, tasks); + List tasks = createNonExistentShards(currentState, reason); + ClusterStateTaskExecutor.ClusterTasksResult result = executor.execute(clusterState, tasks); assertTasksSuccessful(tasks, result, clusterState, false); } public void testTriviallySuccessfulTasksBatchedWithFailingTasks() throws Exception { String reason = "test trivially successful tasks batched with failing tasks"; ClusterState currentState = createClusterStateWithStartedShards(reason); - List failingTasks = createExistingShards(currentState, reason); - List nonExistentTasks = createNonExistentShards(currentState, reason); + List failingTasks = createExistingShards(currentState, reason); + List nonExistentTasks = createNonExistentShards(currentState, reason); ShardStateAction.ShardFailedClusterStateTaskExecutor failingExecutor = new ShardStateAction.ShardFailedClusterStateTaskExecutor(allocationService, null, logger) { @Override ClusterState applyFailedShards(ClusterState currentState, List failedShards, List staleShards) { throw new RuntimeException("simulated applyFailedShards failure"); } }; - List tasks = new ArrayList<>(); + List tasks = new ArrayList<>(); tasks.addAll(failingTasks); tasks.addAll(nonExistentTasks); - ClusterStateTaskExecutor.ClusterTasksResult result = failingExecutor.execute(currentState, tasks); - Map taskResultMap = + ClusterStateTaskExecutor.ClusterTasksResult result = failingExecutor.execute(currentState, tasks); + Map taskResultMap = failingTasks.stream().collect(Collectors.toMap(Function.identity(), task -> ClusterStateTaskExecutor.TaskResult.failure(new RuntimeException("simulated applyFailedShards failure")))); taskResultMap.putAll(nonExistentTasks.stream().collect(Collectors.toMap(Function.identity(), task -> ClusterStateTaskExecutor.TaskResult.success()))); assertTaskResults(taskResultMap, result, currentState, false); @@ -133,23 +141,47 @@ public class ShardFailedClusterStateTaskExecutorTests extends ESAllocationTestCa public void testIllegalShardFailureRequests() throws Exception { String reason = "test illegal shard failure requests"; ClusterState currentState = createClusterStateWithStartedShards(reason); - List failingTasks = createExistingShards(currentState, reason); - List tasks = new ArrayList<>(); - for (ShardStateAction.ShardEntry failingTask : failingTasks) { + List failingTasks = createExistingShards(currentState, reason); + List tasks = new ArrayList<>(); + for (ShardStateAction.FailedShardEntry failingTask : failingTasks) { long primaryTerm = currentState.metaData().index(failingTask.shardId.getIndex()).primaryTerm(failingTask.shardId.id()); - tasks.add(new ShardStateAction.ShardEntry(failingTask.shardId, failingTask.allocationId, - randomIntBetween(1, (int) primaryTerm - 1), failingTask.message, failingTask.failure)); + tasks.add(new FailedShardEntry(failingTask.shardId, failingTask.allocationId, + randomIntBetween(1, (int) primaryTerm - 1), failingTask.message, failingTask.failure, randomBoolean())); } - Map taskResultMap = + Map taskResultMap = tasks.stream().collect(Collectors.toMap( Function.identity(), task -> ClusterStateTaskExecutor.TaskResult.failure(new ShardStateAction.NoLongerPrimaryShardException(task.shardId, "primary term [" + task.primaryTerm + "] did not match current primary term [" + currentState.metaData().index(task.shardId.getIndex()).primaryTerm(task.shardId.id()) + "]")))); - ClusterStateTaskExecutor.ClusterTasksResult result = executor.execute(currentState, tasks); + ClusterStateTaskExecutor.ClusterTasksResult result = executor.execute(currentState, tasks); assertTaskResults(taskResultMap, result, currentState, false); } + public void testMarkAsStaleWhenFailingShard() throws Exception { + final MockAllocationService allocation = createAllocationService(); + ClusterState clusterState = createClusterStateWithStartedShards("test markAsStale"); + clusterState = allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING)); + IndexShardRoutingTable shardRoutingTable = clusterState.routingTable().index(INDEX).shard(0); + long primaryTerm = clusterState.metaData().index(INDEX).primaryTerm(0); + final Set oldInSync = clusterState.metaData().index(INDEX).inSyncAllocationIds(0); + { + ShardStateAction.FailedShardEntry failShardOnly = new ShardStateAction.FailedShardEntry(shardRoutingTable.shardId(), + randomFrom(oldInSync), primaryTerm, "dummy", null, false); + ClusterState appliedState = executor.execute(clusterState, Arrays.asList(failShardOnly)).resultingState; + Set newInSync = appliedState.metaData().index(INDEX).inSyncAllocationIds(0); + assertThat(newInSync, equalTo(oldInSync)); + } + { + final String failedAllocationId = randomFrom(oldInSync); + ShardStateAction.FailedShardEntry failAndMarkAsStale = new ShardStateAction.FailedShardEntry(shardRoutingTable.shardId(), + failedAllocationId, primaryTerm, "dummy", null, true); + ClusterState appliedState = executor.execute(clusterState, Arrays.asList(failAndMarkAsStale)).resultingState; + Set newInSync = appliedState.metaData().index(INDEX).inSyncAllocationIds(0); + assertThat(Sets.difference(oldInSync, newInSync), contains(failedAllocationId)); + } + } + private ClusterState createClusterStateWithStartedShards(String reason) { int numberOfNodes = 1 + numberOfReplicas; DiscoveryNodes.Builder nodes = DiscoveryNodes.builder(); @@ -163,7 +195,7 @@ public class ShardFailedClusterStateTaskExecutorTests extends ESAllocationTestCa return allocationService.applyStartedShards(stateAfterReroute, routingNodes.shardsWithState(ShardRoutingState.INITIALIZING)); } - private List createExistingShards(ClusterState currentState, String reason) { + private List createExistingShards(ClusterState currentState, String reason) { List shards = new ArrayList<>(); GroupShardsIterator shardGroups = currentState.routingTable().allAssignedShardsGrouped(new String[] { INDEX }, true); for (ShardIterator shardIt : shardGroups) { @@ -181,7 +213,7 @@ public class ShardFailedClusterStateTaskExecutorTests extends ESAllocationTestCa return toTasks(currentState, shardsToFail, indexUUID, reason); } - private List createNonExistentShards(ClusterState currentState, String reason) { + private List createNonExistentShards(ClusterState currentState, String reason) { // add shards from a non-existent index String nonExistentIndexUUID = "non-existent"; Index index = new Index("non-existent", nonExistentIndexUUID); @@ -195,14 +227,15 @@ public class ShardFailedClusterStateTaskExecutorTests extends ESAllocationTestCa nonExistentShards.add(nonExistentShardRouting(index, nodeIds, false)); } - List existingShards = createExistingShards(currentState, reason); - List shardsWithMismatchedAllocationIds = new ArrayList<>(); - for (ShardStateAction.ShardEntry existingShard : existingShards) { - shardsWithMismatchedAllocationIds.add(new ShardStateAction.ShardEntry(existingShard.shardId, UUIDs.randomBase64UUID(), 0L, existingShard.message, existingShard.failure)); + List existingShards = createExistingShards(currentState, reason); + List shardsWithMismatchedAllocationIds = new ArrayList<>(); + for (ShardStateAction.FailedShardEntry existingShard : existingShards) { + shardsWithMismatchedAllocationIds.add(new ShardStateAction.FailedShardEntry(existingShard.shardId, UUIDs.randomBase64UUID(), 0L, existingShard.message, existingShard.failure, randomBoolean())); } - List tasks = new ArrayList<>(); - nonExistentShards.forEach(shard -> tasks.add(new ShardStateAction.ShardEntry(shard.shardId(), shard.allocationId().getId(), 0L, reason, new CorruptIndexException("simulated", nonExistentIndexUUID)))); + List tasks = new ArrayList<>(); + nonExistentShards.forEach(shard -> tasks.add(new ShardStateAction.FailedShardEntry(shard.shardId(), shard.allocationId().getId(), 0L, + reason, new CorruptIndexException("simulated", nonExistentIndexUUID), randomBoolean()))); tasks.addAll(shardsWithMismatchedAllocationIds); return tasks; } @@ -214,26 +247,26 @@ public class ShardFailedClusterStateTaskExecutorTests extends ESAllocationTestCa } private static void assertTasksSuccessful( - List tasks, - ClusterStateTaskExecutor.ClusterTasksResult result, + List tasks, + ClusterStateTaskExecutor.ClusterTasksResult result, ClusterState clusterState, boolean clusterStateChanged ) { - Map taskResultMap = + Map taskResultMap = tasks.stream().collect(Collectors.toMap(Function.identity(), task -> ClusterStateTaskExecutor.TaskResult.success())); assertTaskResults(taskResultMap, result, clusterState, clusterStateChanged); } private static void assertTaskResults( - Map taskResultMap, - ClusterStateTaskExecutor.ClusterTasksResult result, + Map taskResultMap, + ClusterStateTaskExecutor.ClusterTasksResult result, ClusterState clusterState, boolean clusterStateChanged ) { // there should be as many task results as tasks assertEquals(taskResultMap.size(), result.executionResults.size()); - for (Map.Entry entry : taskResultMap.entrySet()) { + for (Map.Entry entry : taskResultMap.entrySet()) { // every task should have a corresponding task result assertTrue(result.executionResults.containsKey(entry.getKey())); @@ -242,7 +275,7 @@ public class ShardFailedClusterStateTaskExecutorTests extends ESAllocationTestCa } List shards = clusterState.getRoutingTable().allShards(); - for (Map.Entry entry : taskResultMap.entrySet()) { + for (Map.Entry entry : taskResultMap.entrySet()) { if (entry.getValue().isSuccess()) { // the shard was successfully failed and so should not be in the routing table for (ShardRouting shard : shards) { @@ -267,15 +300,15 @@ public class ShardFailedClusterStateTaskExecutorTests extends ESAllocationTestCa } } - private static List toTasks(ClusterState currentState, List shards, String indexUUID, String message) { + private static List toTasks(ClusterState currentState, List shards, String indexUUID, String message) { return shards .stream() - .map(shard -> new ShardStateAction.ShardEntry( + .map(shard -> new ShardStateAction.FailedShardEntry( shard.shardId(), shard.allocationId().getId(), randomBoolean() ? 0L : currentState.metaData().getIndexSafe(shard.index()).primaryTerm(shard.id()), message, - new CorruptIndexException("simulated", indexUUID))) + new CorruptIndexException("simulated", indexUUID), randomBoolean())) .collect(Collectors.toList()); } } diff --git a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java index b1ff626fa39..9c8564694d1 100644 --- a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java @@ -20,10 +20,13 @@ package org.elasticsearch.cluster.action.shard; import org.apache.lucene.index.CorruptIndexException; +import org.elasticsearch.Version; import org.elasticsearch.action.support.replication.ClusterStateCreationUtils; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateObserver; import org.elasticsearch.cluster.NotMasterException; +import org.elasticsearch.cluster.action.shard.ShardStateAction.FailedShardEntry; +import org.elasticsearch.cluster.action.shard.ShardStateAction.StartedShardEntry; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.RoutingService; @@ -32,15 +35,22 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardsIterator; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.discovery.Discovery; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.VersionUtils; import org.elasticsearch.test.transport.CapturingTransport; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.NodeDisconnectedException; import org.elasticsearch.transport.NodeNotConnectedException; import org.elasticsearch.transport.TransportException; +import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportService; import org.junit.After; @@ -48,6 +58,8 @@ import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; +import java.io.IOException; +import java.util.UUID; import java.util.Collections; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -63,6 +75,7 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; public class ShardStateActionTests extends ESTestCase { private static ThreadPool THREAD_POOL; @@ -90,9 +103,9 @@ public class ShardStateActionTests extends ESTestCase { } @Override - protected void waitForNewMasterAndRetry(String actionName, ClusterStateObserver observer, ShardEntry shardEntry, Listener listener, Predicate changePredicate) { + protected void waitForNewMasterAndRetry(String actionName, ClusterStateObserver observer, TransportRequest request, Listener listener, Predicate changePredicate) { onBeforeWaitForNewMasterAndRetry.run(); - super.waitForNewMasterAndRetry(actionName, observer, shardEntry, listener, changePredicate); + super.waitForNewMasterAndRetry(actionName, observer, request, listener, changePredicate); onAfterWaitForNewMasterAndRetry.run(); } } @@ -160,8 +173,8 @@ public class ShardStateActionTests extends ESTestCase { CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear(); assertEquals(1, capturedRequests.length); // the request is a shard failed request - assertThat(capturedRequests[0].request, is(instanceOf(ShardStateAction.ShardEntry.class))); - ShardStateAction.ShardEntry shardEntry = (ShardStateAction.ShardEntry) capturedRequests[0].request; + assertThat(capturedRequests[0].request, is(instanceOf(ShardStateAction.FailedShardEntry.class))); + ShardStateAction.FailedShardEntry shardEntry = (ShardStateAction.FailedShardEntry) capturedRequests[0].request; // for the right shard assertEquals(shardEntry.shardId, shardRouting.shardId()); assertEquals(shardEntry.allocationId, shardRouting.allocationId().getId()); @@ -342,7 +355,7 @@ public class ShardStateActionTests extends ESTestCase { long primaryTerm = clusterService.state().metaData().index(index).primaryTerm(failedShard.id()); assertThat(primaryTerm, greaterThanOrEqualTo(1L)); - shardStateAction.remoteShardFailed(failedShard.shardId(), failedShard.allocationId().getId(), primaryTerm + 1, "test", + shardStateAction.remoteShardFailed(failedShard.shardId(), failedShard.allocationId().getId(), primaryTerm + 1, randomBoolean(), "test", getSimulatedFailure(), new ShardStateAction.Listener() { @Override public void onSuccess() { @@ -407,4 +420,36 @@ public class ShardStateActionTests extends ESTestCase { private Exception getSimulatedFailure() { return new CorruptIndexException("simulated", (String) null); } + + public void testShardEntryBWCSerialize() throws Exception { + final Version bwcVersion = randomValueOtherThanMany( + version -> version.onOrAfter(Version.V_7_0_0_alpha1), () -> VersionUtils.randomVersion(random())); + final ShardId shardId = new ShardId(randomRealisticUnicodeOfLengthBetween(10, 100), UUID.randomUUID().toString(), between(0, 1000)); + final String allocationId = randomRealisticUnicodeOfCodepointLengthBetween(10, 100); + final String reason = randomRealisticUnicodeOfCodepointLengthBetween(10, 100); + try (StreamInput in = serialize(new StartedShardEntry(shardId, allocationId, reason), bwcVersion).streamInput()) { + in.setVersion(bwcVersion); + final FailedShardEntry failedShardEntry = new FailedShardEntry(in); + assertThat(failedShardEntry.shardId, equalTo(shardId)); + assertThat(failedShardEntry.allocationId, equalTo(allocationId)); + assertThat(failedShardEntry.message, equalTo(reason)); + assertThat(failedShardEntry.failure, nullValue()); + assertThat(failedShardEntry.markAsStale, equalTo(true)); + } + try (StreamInput in = serialize(new FailedShardEntry(shardId, allocationId, 0L, reason, null, false), bwcVersion).streamInput()) { + in.setVersion(bwcVersion); + final StartedShardEntry startedShardEntry = new StartedShardEntry(in); + assertThat(startedShardEntry.shardId, equalTo(shardId)); + assertThat(startedShardEntry.allocationId, equalTo(allocationId)); + assertThat(startedShardEntry.message, equalTo(reason)); + } + } + + BytesReference serialize(Writeable writeable, Version version) throws IOException { + try (BytesStreamOutput out = new BytesStreamOutput()) { + out.setVersion(version); + writeable.writeTo(out); + return out.bytes(); + } + } } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/PrimaryTermsTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/PrimaryTermsTests.java index 863a33b1327..65526896864 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/PrimaryTermsTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/PrimaryTermsTests.java @@ -144,7 +144,7 @@ public class PrimaryTermsTests extends ESAllocationTestCase { logger.info("failing primary shards {} for index [{}]", shardIdsToFail, index); List failedShards = new ArrayList<>(); for (int shard : shardIdsToFail) { - failedShards.add(new FailedShard(indexShardRoutingTable.shard(shard).primaryShard(), "test", null)); + failedShards.add(new FailedShard(indexShardRoutingTable.shard(shard).primaryShard(), "test", null, randomBoolean())); incrementPrimaryTerm(index, shard); // the primary failure should increment the primary term; } applyRerouteResult(allocationService.applyFailedShards(this.clusterState, failedShards,Collections.emptyList())); diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java index 18835e40eb3..d8f7f6552f9 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java @@ -254,7 +254,7 @@ public class UnassignedInfoTests extends ESAllocationTestCase { assertThat(clusterState.getRoutingNodes().unassigned().size() > 0, equalTo(false)); // fail shard ShardRouting shardToFail = clusterState.getRoutingNodes().shardsWithState(STARTED).get(0); - clusterState = allocation.applyFailedShards(clusterState, Collections.singletonList(new FailedShard(shardToFail, "test fail", null))); + clusterState = allocation.applyFailedShards(clusterState, Collections.singletonList(new FailedShard(shardToFail, "test fail", null, randomBoolean()))); // verify the reason and details assertThat(clusterState.getRoutingNodes().unassigned().size() > 0, equalTo(true)); assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(1)); diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedNodeRoutingTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedNodeRoutingTests.java index 4b941a6ce4a..8038d9b5e18 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedNodeRoutingTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedNodeRoutingTests.java @@ -167,7 +167,7 @@ public class FailedNodeRoutingTests extends ESAllocationTestCase { List shardsToFail = new ArrayList<>(); List failedPrimaries = randomSubsetOf(primaries); failedPrimaries.stream().forEach(sr -> { - shardsToFail.add(new FailedShard(randomFrom(sr), "failed primary", new Exception())); + shardsToFail.add(new FailedShard(randomFrom(sr), "failed primary", new Exception(), randomBoolean())); }); logger.info("--> state before failing shards: {}", state); diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java index 2eedeba63f3..1fa1ff3a154 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java @@ -117,7 +117,7 @@ public class FailedShardsRoutingTests extends ESAllocationTestCase { assertThat(clusterState.getRoutingNodes().node("node3").iterator().next().state(), equalTo(INITIALIZING)); logger.info("--> fail primary shard recovering instance on node3 being initialized"); - clusterState = allocation.applyFailedShard(clusterState, clusterState.getRoutingNodes().node("node3").iterator().next()); + clusterState = allocation.applyFailedShard(clusterState, clusterState.getRoutingNodes().node("node3").iterator().next(), randomBoolean()); assertThat(clusterState.getRoutingNodes().node(origPrimaryNodeId).iterator().next().state(), equalTo(STARTED)); assertThat(clusterState.getRoutingNodes().node("node3").size(), equalTo(0)); @@ -132,7 +132,7 @@ public class FailedShardsRoutingTests extends ESAllocationTestCase { assertThat(clusterState.getRoutingNodes().node("node3").iterator().next().state(), equalTo(INITIALIZING)); logger.info("--> fail primary shard recovering instance on node1 being relocated"); - clusterState = allocation.applyFailedShard(clusterState, clusterState.getRoutingNodes().node(origPrimaryNodeId).iterator().next()); + clusterState = allocation.applyFailedShard(clusterState, clusterState.getRoutingNodes().node(origPrimaryNodeId).iterator().next(), randomBoolean()); // check promotion of replica to primary assertThat(clusterState.getRoutingNodes().node(origReplicaNodeId).iterator().next().state(), equalTo(STARTED)); @@ -200,7 +200,7 @@ public class FailedShardsRoutingTests extends ESAllocationTestCase { logger.info("fail the primary shard, will have no place to be rerouted to (single node), so stays unassigned"); ShardRouting shardToFail = clusterState.routingTable().index("test").shard(0).primaryShard(); - newState = strategy.applyFailedShard(clusterState, shardToFail); + newState = strategy.applyFailedShard(clusterState, shardToFail, randomBoolean()); assertThat(newState, not(equalTo(clusterState))); clusterState = newState; @@ -249,7 +249,7 @@ public class FailedShardsRoutingTests extends ESAllocationTestCase { logger.info("fail the first shard, will have no place to be rerouted to (single node), so stays unassigned"); ShardRouting firstShard = clusterState.getRoutingNodes().node("node1").iterator().next(); - newState = strategy.applyFailedShard(clusterState, firstShard); + newState = strategy.applyFailedShard(clusterState, firstShard, randomBoolean()); assertThat(newState, not(equalTo(clusterState))); clusterState = newState; @@ -305,7 +305,7 @@ public class FailedShardsRoutingTests extends ESAllocationTestCase { logger.info("failing shard on node [{}]", failedNode); ShardRouting shardToFail = routingNodes.node(failedNode).iterator().next(); if (shardRoutingsToFail.contains(shardToFail) == false) { - failedShards.add(new FailedShard(shardToFail, null, null)); + failedShards.add(new FailedShard(shardToFail, null, null, randomBoolean())); failedNodes.add(failedNode); shardRoutingsToFail.add(shardToFail); } @@ -364,7 +364,7 @@ public class FailedShardsRoutingTests extends ESAllocationTestCase { logger.info("fail the first shard, will start INITIALIZING on the second node"); final ShardRouting firstShard = clusterState.getRoutingNodes().node(nodeHoldingPrimary).iterator().next(); - newState = strategy.applyFailedShard(clusterState, firstShard); + newState = strategy.applyFailedShard(clusterState, firstShard, randomBoolean()); assertThat(newState, not(equalTo(clusterState))); clusterState = newState; @@ -455,7 +455,7 @@ public class FailedShardsRoutingTests extends ESAllocationTestCase { logger.info("Fail the shards on node 3"); ShardRouting shardToFail = routingNodes.node("node3").iterator().next(); - newState = strategy.applyFailedShard(clusterState, shardToFail); + newState = strategy.applyFailedShard(clusterState, shardToFail, randomBoolean()); assertThat(newState, not(equalTo(clusterState))); clusterState = newState; routingNodes = clusterState.getRoutingNodes(); @@ -507,7 +507,7 @@ public class FailedShardsRoutingTests extends ESAllocationTestCase { // fail the primary shard, check replicas get removed as well... ShardRouting primaryShardToFail = clusterState.routingTable().index("test").shard(0).primaryShard(); - ClusterState newState = allocation.applyFailedShard(clusterState, primaryShardToFail); + ClusterState newState = allocation.applyFailedShard(clusterState, primaryShardToFail, randomBoolean()); assertThat(newState, not(equalTo(clusterState))); clusterState = newState; // the primary gets allocated on another node, replicas are initializing @@ -550,7 +550,7 @@ public class FailedShardsRoutingTests extends ESAllocationTestCase { // fail the primary shard, check one replica gets elected to primary, others become INITIALIZING (from it) ShardRouting primaryShardToFail = clusterState.routingTable().index("test").shard(0).primaryShard(); - ClusterState newState = allocation.applyFailedShard(clusterState, primaryShardToFail); + ClusterState newState = allocation.applyFailedShard(clusterState, primaryShardToFail, randomBoolean()); assertThat(newState, not(equalTo(clusterState))); clusterState = newState; assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(1)); @@ -620,7 +620,7 @@ public class FailedShardsRoutingTests extends ESAllocationTestCase { // fail the primary shard again and make sure the correct replica is promoted ShardRouting primaryShardToFail = clusterState.routingTable().index("test").shard(0).primaryShard(); - ClusterState newState = allocation.applyFailedShard(clusterState, primaryShardToFail); + ClusterState newState = allocation.applyFailedShard(clusterState, primaryShardToFail, randomBoolean()); assertThat(newState, not(equalTo(clusterState))); clusterState = newState; // the primary gets allocated on another node @@ -649,7 +649,7 @@ public class FailedShardsRoutingTests extends ESAllocationTestCase { // fail the primary shard again, and ensure the same thing happens ShardRouting secondPrimaryShardToFail = clusterState.routingTable().index("test").shard(0).primaryShard(); - newState = allocation.applyFailedShard(clusterState, secondPrimaryShardToFail); + newState = allocation.applyFailedShard(clusterState, secondPrimaryShardToFail, randomBoolean()); assertThat(newState, not(equalTo(clusterState))); clusterState = newState; // the primary gets allocated on another node diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/InSyncAllocationIdTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/InSyncAllocationIdTests.java index 616eff4381c..5f39336569f 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/InSyncAllocationIdTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/InSyncAllocationIdTests.java @@ -23,7 +23,7 @@ import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ESAllocationTestCase; import org.elasticsearch.cluster.action.shard.ShardStateAction; -import org.elasticsearch.cluster.action.shard.ShardStateAction.ShardEntry; +import org.elasticsearch.cluster.action.shard.ShardStateAction.FailedShardEntry; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNodes; @@ -135,7 +135,7 @@ public class InSyncAllocationIdTests extends ESAllocationTestCase { logger.info("fail primary shard"); ShardRouting startedPrimary = clusterState.getRoutingNodes().shardsWithState(STARTED).get(0); - clusterState = allocation.applyFailedShard(clusterState, startedPrimary); + clusterState = allocation.applyFailedShard(clusterState, startedPrimary, true); assertThat(clusterState.getRoutingTable().shardsWithState(STARTED).size(), equalTo(0)); assertEquals(Collections.singleton(startedPrimary.allocationId().getId()), @@ -167,7 +167,7 @@ public class InSyncAllocationIdTests extends ESAllocationTestCase { new ShardStateAction.ShardFailedClusterStateTaskExecutor(allocation, null, logger); long primaryTerm = clusterState.metaData().index("test").primaryTerm(0); clusterState = failedClusterStateTaskExecutor.execute(clusterState, Arrays.asList( - new ShardEntry(shardRoutingTable.shardId(), replicaShard.allocationId().getId(), primaryTerm, "dummy", null)) + new FailedShardEntry(shardRoutingTable.shardId(), replicaShard.allocationId().getId(), primaryTerm, "dummy", null, true)) ).resultingState; assertThat(clusterState.metaData().index("test").inSyncAllocationIds(0).size(), equalTo(1)); @@ -189,11 +189,11 @@ public class InSyncAllocationIdTests extends ESAllocationTestCase { long primaryTerm = clusterState.metaData().index("test").primaryTerm(0); - List failureEntries = new ArrayList<>(); - failureEntries.add(new ShardEntry( - shardRoutingTable.shardId(), primaryShard.allocationId().getId(), 0L, "dummy", null)); - failureEntries.add(new ShardEntry( - shardRoutingTable.shardId(), replicaShard.allocationId().getId(), primaryTerm, "dummy", null)); + List failureEntries = new ArrayList<>(); + failureEntries.add(new FailedShardEntry( + shardRoutingTable.shardId(), primaryShard.allocationId().getId(), 0L, "dummy", null, true)); + failureEntries.add(new FailedShardEntry( + shardRoutingTable.shardId(), replicaShard.allocationId().getId(), primaryTerm, "dummy", null, true)); Collections.shuffle(failureEntries, random()); logger.info("Failing {}", failureEntries); @@ -333,8 +333,8 @@ public class InSyncAllocationIdTests extends ESAllocationTestCase { assertEquals(inSyncSet, clusterState.metaData().index("test").inSyncAllocationIds(0)); logger.info("fail primary shard"); - clusterState = failedClusterStateTaskExecutor.execute(clusterState, Collections.singletonList(new ShardEntry( - shardRoutingTable.shardId(), primaryShard.allocationId().getId(), 0L, "dummy", null))).resultingState; + clusterState = failedClusterStateTaskExecutor.execute(clusterState, Collections.singletonList(new FailedShardEntry( + shardRoutingTable.shardId(), primaryShard.allocationId().getId(), 0L, "dummy", null, true))).resultingState; assertThat(clusterState.routingTable().index("test").shard(0).assignedShards().size(), equalTo(0)); // in-sync allocation ids should not be updated diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/MaxRetryAllocationDeciderTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/MaxRetryAllocationDeciderTests.java index 797e1adc12c..994ee8f1438 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/MaxRetryAllocationDeciderTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/MaxRetryAllocationDeciderTests.java @@ -91,7 +91,7 @@ public class MaxRetryAllocationDeciderTests extends ESAllocationTestCase { for (int i = 0; i < retries-1; i++) { List failedShards = Collections.singletonList( new FailedShard(routingTable.index("idx").shard(0).shards().get(0), "boom" + i, - new UnsupportedOperationException())); + new UnsupportedOperationException(), randomBoolean())); ClusterState newState = strategy.applyFailedShards(clusterState, failedShards); assertThat(newState, not(equalTo(clusterState))); clusterState = newState; @@ -104,7 +104,7 @@ public class MaxRetryAllocationDeciderTests extends ESAllocationTestCase { // now we go and check that we are actually stick to unassigned on the next failure List failedShards = Collections.singletonList( new FailedShard(routingTable.index("idx").shard(0).shards().get(0), "boom", - new UnsupportedOperationException())); + new UnsupportedOperationException(), randomBoolean())); ClusterState newState = strategy.applyFailedShards(clusterState, failedShards); assertThat(newState, not(equalTo(clusterState))); clusterState = newState; @@ -130,7 +130,7 @@ public class MaxRetryAllocationDeciderTests extends ESAllocationTestCase { for (int i = 0; i < retries-1; i++) { failedShards = Collections.singletonList( new FailedShard(routingTable.index("idx").shard(0).shards().get(0), "boom", - new UnsupportedOperationException())); + new UnsupportedOperationException(), randomBoolean())); newState = strategy.applyFailedShards(clusterState, failedShards); assertThat(newState, not(equalTo(clusterState))); @@ -145,7 +145,7 @@ public class MaxRetryAllocationDeciderTests extends ESAllocationTestCase { // now we go and check that we are actually stick to unassigned on the next failure failedShards = Collections.singletonList( new FailedShard(routingTable.index("idx").shard(0).shards().get(0), "boom", - new UnsupportedOperationException())); + new UnsupportedOperationException(), randomBoolean())); newState = strategy.applyFailedShards(clusterState, failedShards); assertThat(newState, not(equalTo(clusterState))); clusterState = newState; @@ -164,7 +164,7 @@ public class MaxRetryAllocationDeciderTests extends ESAllocationTestCase { for (int i = 0; i < retries-1; i++) { List failedShards = Collections.singletonList( new FailedShard(routingTable.index("idx").shard(0).shards().get(0), "boom" + i, - new UnsupportedOperationException())); + new UnsupportedOperationException(), randomBoolean())); ClusterState newState = strategy.applyFailedShards(clusterState, failedShards); assertThat(newState, not(equalTo(clusterState))); clusterState = newState; @@ -182,7 +182,7 @@ public class MaxRetryAllocationDeciderTests extends ESAllocationTestCase { { List failedShards = Collections.singletonList( new FailedShard(routingTable.index("idx").shard(0).shards().get(0), "boom", - new UnsupportedOperationException())); + new UnsupportedOperationException(), randomBoolean())); ClusterState newState = strategy.applyFailedShards(clusterState, failedShards); assertThat(newState, not(equalTo(clusterState))); clusterState = newState; @@ -231,7 +231,7 @@ public class MaxRetryAllocationDeciderTests extends ESAllocationTestCase { // now fail again and see if it has a new counter List failedShards = Collections.singletonList( new FailedShard(routingTable.index("idx").shard(0).shards().get(0), "ZOOOMG", - new UnsupportedOperationException())); + new UnsupportedOperationException(), randomBoolean())); newState = strategy.applyFailedShards(clusterState, failedShards); assertThat(newState, not(equalTo(clusterState))); clusterState = newState; diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/SingleShardNoReplicasRoutingTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/SingleShardNoReplicasRoutingTests.java index 8bd4b39d076..0668ba41524 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/SingleShardNoReplicasRoutingTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/SingleShardNoReplicasRoutingTests.java @@ -194,7 +194,7 @@ public class SingleShardNoReplicasRoutingTests extends ESAllocationTestCase { logger.info("Marking the shard as failed"); RoutingNodes routingNodes = clusterState.getRoutingNodes(); - newState = strategy.applyFailedShard(clusterState, routingNodes.node("node1").shardsWithState(INITIALIZING).get(0)); + newState = strategy.applyFailedShard(clusterState, routingNodes.node("node1").shardsWithState(INITIALIZING).get(0), randomBoolean()); assertThat(newState, not(equalTo(clusterState))); clusterState = newState; diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/FilterAllocationDeciderTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/FilterAllocationDeciderTests.java index 8381f2f960b..b1fa8346e2c 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/FilterAllocationDeciderTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/FilterAllocationDeciderTests.java @@ -66,7 +66,7 @@ public class FilterAllocationDeciderTests extends ESAllocationTestCase { // we can initally only allocate on node2 assertEquals(routingTable.index("idx").shard(0).shards().get(0).state(), INITIALIZING); assertEquals(routingTable.index("idx").shard(0).shards().get(0).currentNodeId(), "node2"); - routingTable = service.applyFailedShard(state, routingTable.index("idx").shard(0).shards().get(0)).routingTable(); + routingTable = service.applyFailedShard(state, routingTable.index("idx").shard(0).shards().get(0), randomBoolean()).routingTable(); state = ClusterState.builder(state).routingTable(routingTable).build(); assertEquals(routingTable.index("idx").shard(0).shards().get(0).state(), UNASSIGNED); assertNull(routingTable.index("idx").shard(0).shards().get(0).currentNodeId()); @@ -114,7 +114,7 @@ public class FilterAllocationDeciderTests extends ESAllocationTestCase { state = service.deassociateDeadNodes( ClusterState.builder(state).nodes(DiscoveryNodes.builder(state.nodes()).remove("node1")).build(), true, "test"); - state = service.applyFailedShard(state, routingTable.index("idx").shard(0).primaryShard()); + state = service.applyFailedShard(state, routingTable.index("idx").shard(0).primaryShard(), randomBoolean()); // now bring back node1 and see it's assigned state = service.reroute( diff --git a/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java b/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java index dd10dd2747d..3f7f0583593 100644 --- a/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java +++ b/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java @@ -46,7 +46,8 @@ import org.elasticsearch.cluster.ClusterStateTaskExecutor.ClusterTasksResult; import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.EmptyClusterInfoService; import org.elasticsearch.cluster.action.shard.ShardStateAction; -import org.elasticsearch.cluster.action.shard.ShardStateAction.ShardEntry; +import org.elasticsearch.cluster.action.shard.ShardStateAction.StartedShardEntry; +import org.elasticsearch.cluster.action.shard.ShardStateAction.FailedShardEntry; import org.elasticsearch.cluster.metadata.AliasValidator; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; @@ -221,16 +222,16 @@ public class ClusterStateChanges extends AbstractComponent { } public ClusterState applyFailedShards(ClusterState clusterState, List failedShards) { - List entries = failedShards.stream().map(failedShard -> - new ShardEntry(failedShard.getRoutingEntry().shardId(), failedShard.getRoutingEntry().allocationId().getId(), - 0L, failedShard.getMessage(), failedShard.getFailure())) + List entries = failedShards.stream().map(failedShard -> + new FailedShardEntry(failedShard.getRoutingEntry().shardId(), failedShard.getRoutingEntry().allocationId().getId(), + 0L, failedShard.getMessage(), failedShard.getFailure(), failedShard.markAsStale())) .collect(Collectors.toList()); return runTasks(shardFailedClusterStateTaskExecutor, clusterState, entries); } public ClusterState applyStartedShards(ClusterState clusterState, List startedShards) { - List entries = startedShards.stream().map(startedShard -> - new ShardEntry(startedShard.shardId(), startedShard.allocationId().getId(), 0L, "shard started", null)) + List entries = startedShards.stream().map(startedShard -> + new StartedShardEntry(startedShard.shardId(), startedShard.allocationId().getId(), "shard started")) .collect(Collectors.toList()); return runTasks(shardStartedClusterStateTaskExecutor, clusterState, entries); } diff --git a/server/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java b/server/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java index d76429c53f3..e4d73ce0f41 100644 --- a/server/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java +++ b/server/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java @@ -333,7 +333,7 @@ public class IndicesClusterStateServiceRandomUpdatesTests extends AbstractIndice if (persistedShardRouting.initializing() && randomBoolean()) { startedShards.add(persistedShardRouting); } else if (rarely()) { - failedShards.add(new FailedShard(persistedShardRouting, "fake shard failure", new Exception())); + failedShards.add(new FailedShard(persistedShardRouting, "fake shard failure", new Exception(), randomBoolean())); } } }