Allows failing shards without marking as stale (#28054)

Currently when failing a shard we also mark it as stale (eg. remove its
allocationId from from the InSync set). However in some cases, we need 
to be able to fail shards but keep them InSync set. This commit adds
such capacity. This is a preparatory change to make the primary-replica
resync less lenient.

Relates #24841
This commit is contained in:
Nhat Nguyen 2018-02-03 09:41:53 -05:00 committed by GitHub
parent 13083e27da
commit 965efa51cc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 331 additions and 185 deletions

View File

@ -387,14 +387,14 @@ public abstract class TransportWriteAction<
logger.warn((org.apache.logging.log4j.util.Supplier<?>)
() -> new ParameterizedMessage("[{}] {}", replica.shardId(), message), exception);
shardStateAction.remoteShardFailed(replica.shardId(), replica.allocationId().getId(), primaryTerm, message, exception,
shardStateAction.remoteShardFailed(replica.shardId(), replica.allocationId().getId(), primaryTerm, true, message, exception,
createListener(onSuccess, onPrimaryDemoted, onIgnoredFailure));
}
@Override
public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, Runnable onSuccess,
Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure) {
shardStateAction.remoteShardFailed(shardId, allocationId, primaryTerm, "mark copy as stale", null,
shardStateAction.remoteShardFailed(shardId, allocationId, primaryTerm, true, "mark copy as stale", null,
createListener(onSuccess, onPrimaryDemoted, onIgnoredFailure));
}

View File

@ -24,6 +24,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.ClusterState;
@ -88,21 +89,21 @@ public class ShardStateAction extends AbstractComponent {
this.clusterService = clusterService;
this.threadPool = threadPool;
transportService.registerRequestHandler(SHARD_STARTED_ACTION_NAME, ShardEntry::new, ThreadPool.Names.SAME, new ShardStartedTransportHandler(clusterService, new ShardStartedClusterStateTaskExecutor(allocationService, logger), logger));
transportService.registerRequestHandler(SHARD_FAILED_ACTION_NAME, ShardEntry::new, ThreadPool.Names.SAME, new ShardFailedTransportHandler(clusterService, new ShardFailedClusterStateTaskExecutor(allocationService, routingService, logger), logger));
transportService.registerRequestHandler(SHARD_STARTED_ACTION_NAME, ThreadPool.Names.SAME, StartedShardEntry::new, new ShardStartedTransportHandler(clusterService, new ShardStartedClusterStateTaskExecutor(allocationService, logger), logger));
transportService.registerRequestHandler(SHARD_FAILED_ACTION_NAME, ThreadPool.Names.SAME, FailedShardEntry::new, new ShardFailedTransportHandler(clusterService, new ShardFailedClusterStateTaskExecutor(allocationService, routingService, logger), logger));
}
private void sendShardAction(final String actionName, final ClusterState currentState, final ShardEntry shardEntry, final Listener listener) {
private void sendShardAction(final String actionName, final ClusterState currentState, final TransportRequest request, final Listener listener) {
ClusterStateObserver observer = new ClusterStateObserver(currentState, clusterService, null, logger, threadPool.getThreadContext());
DiscoveryNode masterNode = currentState.nodes().getMasterNode();
Predicate<ClusterState> changePredicate = MasterNodeChangePredicate.build(currentState);
if (masterNode == null) {
logger.warn("{} no master known for action [{}] for shard entry [{}]", shardEntry.shardId, actionName, shardEntry);
waitForNewMasterAndRetry(actionName, observer, shardEntry, listener, changePredicate);
logger.warn("no master known for action [{}] for shard entry [{}]", actionName, request);
waitForNewMasterAndRetry(actionName, observer, request, listener, changePredicate);
} else {
logger.debug("{} sending [{}] to [{}] for shard entry [{}]", shardEntry.shardId, actionName, masterNode.getId(), shardEntry);
logger.debug("sending [{}] to [{}] for shard entry [{}]", actionName, masterNode.getId(), request);
transportService.sendRequest(masterNode,
actionName, shardEntry, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
actionName, request, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleResponse(TransportResponse.Empty response) {
listener.onSuccess();
@ -111,9 +112,9 @@ public class ShardStateAction extends AbstractComponent {
@Override
public void handleException(TransportException exp) {
if (isMasterChannelException(exp)) {
waitForNewMasterAndRetry(actionName, observer, shardEntry, listener, changePredicate);
waitForNewMasterAndRetry(actionName, observer, request, listener, changePredicate);
} else {
logger.warn((Supplier<?>) () -> new ParameterizedMessage("{} unexpected failure while sending request [{}] to [{}] for shard entry [{}]", shardEntry.shardId, actionName, masterNode, shardEntry), exp);
logger.warn(new ParameterizedMessage("unexpected failure while sending request [{}] to [{}] for shard entry [{}]", actionName, masterNode, request), exp);
listener.onFailure(exp instanceof RemoteTransportException ? (Exception) (exp.getCause() instanceof Exception ? exp.getCause() : new ElasticsearchException(exp.getCause())) : exp);
}
}
@ -139,13 +140,15 @@ public class ShardStateAction extends AbstractComponent {
* @param shardId shard id of the shard to fail
* @param allocationId allocation id of the shard to fail
* @param primaryTerm the primary term associated with the primary shard that is failing the shard. Must be strictly positive.
* @param markAsStale whether or not to mark a failing shard as stale (eg. removing from in-sync set) when failing the shard.
* @param message the reason for the failure
* @param failure the underlying cause of the failure
* @param listener callback upon completion of the request
*/
public void remoteShardFailed(final ShardId shardId, String allocationId, long primaryTerm, final String message, @Nullable final Exception failure, Listener listener) {
public void remoteShardFailed(final ShardId shardId, String allocationId, long primaryTerm, boolean markAsStale, final String message, @Nullable final Exception failure, Listener listener) {
assert primaryTerm > 0L : "primary term should be strictly positive";
shardFailed(shardId, allocationId, primaryTerm, message, failure, listener, clusterService.state());
FailedShardEntry shardEntry = new FailedShardEntry(shardId, allocationId, primaryTerm, message, failure, markAsStale);
sendShardAction(SHARD_FAILED_ACTION_NAME, clusterService.state(), shardEntry, listener);
}
/**
@ -160,29 +163,24 @@ public class ShardStateAction extends AbstractComponent {
*/
public void localShardFailed(final ShardRouting shardRouting, final String message, @Nullable final Exception failure, Listener listener,
final ClusterState currentState) {
shardFailed(shardRouting.shardId(), shardRouting.allocationId().getId(), 0L, message, failure, listener, currentState);
}
private void shardFailed(final ShardId shardId, String allocationId, long primaryTerm, final String message,
@Nullable final Exception failure, Listener listener, ClusterState currentState) {
ShardEntry shardEntry = new ShardEntry(shardId, allocationId, primaryTerm, message, failure);
FailedShardEntry shardEntry = new FailedShardEntry(shardRouting.shardId(), shardRouting.allocationId().getId(), 0L, message, failure, true);
sendShardAction(SHARD_FAILED_ACTION_NAME, currentState, shardEntry, listener);
}
// visible for testing
protected void waitForNewMasterAndRetry(String actionName, ClusterStateObserver observer, ShardEntry shardEntry, Listener listener, Predicate<ClusterState> changePredicate) {
protected void waitForNewMasterAndRetry(String actionName, ClusterStateObserver observer, TransportRequest request, Listener listener, Predicate<ClusterState> changePredicate) {
observer.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
if (logger.isTraceEnabled()) {
logger.trace("new cluster state [{}] after waiting for master election to fail shard entry [{}]", state, shardEntry);
logger.trace("new cluster state [{}] after waiting for master election for shard entry [{}]", state, request);
}
sendShardAction(actionName, state, shardEntry, listener);
sendShardAction(actionName, state, request, listener);
}
@Override
public void onClusterServiceClose() {
logger.warn((Supplier<?>) () -> new ParameterizedMessage("{} node closed while execution action [{}] for shard entry [{}]", shardEntry.shardId, actionName, shardEntry), shardEntry.failure);
logger.warn("node closed while execution action [{}] for shard entry [{}]", actionName, request);
listener.onFailure(new NodeClosedException(clusterService.localNode()));
}
@ -194,7 +192,7 @@ public class ShardStateAction extends AbstractComponent {
}, changePredicate);
}
private static class ShardFailedTransportHandler implements TransportRequestHandler<ShardEntry> {
private static class ShardFailedTransportHandler implements TransportRequestHandler<FailedShardEntry> {
private final ClusterService clusterService;
private final ShardFailedClusterStateTaskExecutor shardFailedClusterStateTaskExecutor;
private final Logger logger;
@ -206,7 +204,7 @@ public class ShardStateAction extends AbstractComponent {
}
@Override
public void messageReceived(ShardEntry request, TransportChannel channel) throws Exception {
public void messageReceived(FailedShardEntry request, TransportChannel channel) throws Exception {
logger.warn((Supplier<?>) () -> new ParameterizedMessage("{} received shard failed for {}", request.shardId, request), request.failure);
clusterService.submitStateUpdateTask(
"shard-failed",
@ -248,7 +246,7 @@ public class ShardStateAction extends AbstractComponent {
}
}
public static class ShardFailedClusterStateTaskExecutor implements ClusterStateTaskExecutor<ShardEntry> {
public static class ShardFailedClusterStateTaskExecutor implements ClusterStateTaskExecutor<FailedShardEntry> {
private final AllocationService allocationService;
private final RoutingService routingService;
private final Logger logger;
@ -260,13 +258,13 @@ public class ShardStateAction extends AbstractComponent {
}
@Override
public ClusterTasksResult<ShardEntry> execute(ClusterState currentState, List<ShardEntry> tasks) throws Exception {
ClusterTasksResult.Builder<ShardEntry> batchResultBuilder = ClusterTasksResult.builder();
List<ShardEntry> tasksToBeApplied = new ArrayList<>();
public ClusterTasksResult<FailedShardEntry> execute(ClusterState currentState, List<FailedShardEntry> tasks) throws Exception {
ClusterTasksResult.Builder<FailedShardEntry> batchResultBuilder = ClusterTasksResult.builder();
List<FailedShardEntry> tasksToBeApplied = new ArrayList<>();
List<FailedShard> failedShardsToBeApplied = new ArrayList<>();
List<StaleShard> staleShardsToBeApplied = new ArrayList<>();
for (ShardEntry task : tasks) {
for (FailedShardEntry task : tasks) {
IndexMetaData indexMetaData = currentState.metaData().index(task.shardId.getIndex());
if (indexMetaData == null) {
// tasks that correspond to non-existent indices are marked as successful
@ -314,7 +312,7 @@ public class ShardStateAction extends AbstractComponent {
// failing a shard also possibly marks it as stale (see IndexMetaDataUpdater)
logger.debug("{} failing shard {} (shard failed task: [{}])", task.shardId, matched, task);
tasksToBeApplied.add(task);
failedShardsToBeApplied.add(new FailedShard(matched, task.message, task.failure));
failedShardsToBeApplied.add(new FailedShard(matched, task.message, task.failure, task.markAsStale));
}
}
}
@ -352,15 +350,82 @@ public class ShardStateAction extends AbstractComponent {
}
}
public static class FailedShardEntry extends TransportRequest {
final ShardId shardId;
final String allocationId;
final long primaryTerm;
final String message;
final Exception failure;
final boolean markAsStale;
FailedShardEntry(StreamInput in) throws IOException {
super(in);
shardId = ShardId.readShardId(in);
allocationId = in.readString();
primaryTerm = in.readVLong();
message = in.readString();
failure = in.readException();
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
markAsStale = in.readBoolean();
} else {
markAsStale = true;
}
}
public FailedShardEntry(ShardId shardId, String allocationId, long primaryTerm, String message, Exception failure, boolean markAsStale) {
this.shardId = shardId;
this.allocationId = allocationId;
this.primaryTerm = primaryTerm;
this.message = message;
this.failure = failure;
this.markAsStale = markAsStale;
}
public ShardId getShardId() {
return shardId;
}
public String getAllocationId() {
return allocationId;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
shardId.writeTo(out);
out.writeString(allocationId);
out.writeVLong(primaryTerm);
out.writeString(message);
out.writeException(failure);
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
out.writeBoolean(markAsStale);
}
}
@Override
public String toString() {
List<String> components = new ArrayList<>(6);
components.add("shard id [" + shardId + "]");
components.add("allocation id [" + allocationId + "]");
components.add("primary term [" + primaryTerm + "]");
components.add("message [" + message + "]");
if (failure != null) {
components.add("failure [" + ExceptionsHelper.detailedMessage(failure) + "]");
}
components.add("markAsStale [" + markAsStale + "]");
return String.join(", ", components);
}
}
public void shardStarted(final ShardRouting shardRouting, final String message, Listener listener) {
shardStarted(shardRouting, message, listener, clusterService.state());
}
public void shardStarted(final ShardRouting shardRouting, final String message, Listener listener, ClusterState currentState) {
ShardEntry shardEntry = new ShardEntry(shardRouting.shardId(), shardRouting.allocationId().getId(), 0L, message, null);
StartedShardEntry shardEntry = new StartedShardEntry(shardRouting.shardId(), shardRouting.allocationId().getId(), message);
sendShardAction(SHARD_STARTED_ACTION_NAME, currentState, shardEntry, listener);
}
private static class ShardStartedTransportHandler implements TransportRequestHandler<ShardEntry> {
private static class ShardStartedTransportHandler implements TransportRequestHandler<StartedShardEntry> {
private final ClusterService clusterService;
private final ShardStartedClusterStateTaskExecutor shardStartedClusterStateTaskExecutor;
private final Logger logger;
@ -372,7 +437,7 @@ public class ShardStateAction extends AbstractComponent {
}
@Override
public void messageReceived(ShardEntry request, TransportChannel channel) throws Exception {
public void messageReceived(StartedShardEntry request, TransportChannel channel) throws Exception {
logger.debug("{} received shard started for [{}]", request.shardId, request);
clusterService.submitStateUpdateTask(
"shard-started " + request,
@ -384,7 +449,7 @@ public class ShardStateAction extends AbstractComponent {
}
}
public static class ShardStartedClusterStateTaskExecutor implements ClusterStateTaskExecutor<ShardEntry>, ClusterStateTaskListener {
public static class ShardStartedClusterStateTaskExecutor implements ClusterStateTaskExecutor<StartedShardEntry>, ClusterStateTaskListener {
private final AllocationService allocationService;
private final Logger logger;
@ -394,14 +459,12 @@ public class ShardStateAction extends AbstractComponent {
}
@Override
public ClusterTasksResult<ShardEntry> execute(ClusterState currentState, List<ShardEntry> tasks) throws Exception {
ClusterTasksResult.Builder<ShardEntry> builder = ClusterTasksResult.builder();
List<ShardEntry> tasksToBeApplied = new ArrayList<>();
public ClusterTasksResult<StartedShardEntry> execute(ClusterState currentState, List<StartedShardEntry> tasks) throws Exception {
ClusterTasksResult.Builder<StartedShardEntry> builder = ClusterTasksResult.builder();
List<StartedShardEntry> tasksToBeApplied = new ArrayList<>();
List<ShardRouting> shardRoutingsToBeApplied = new ArrayList<>(tasks.size());
Set<ShardRouting> seenShardRoutings = new HashSet<>(); // to prevent duplicates
for (ShardEntry task : tasks) {
assert task.primaryTerm == 0L : "shard is only started by itself: " + task;
for (StartedShardEntry task : tasks) {
ShardRouting matched = currentState.getRoutingTable().getByAllocationId(task.shardId, task.allocationId);
if (matched == null) {
// tasks that correspond to non-existent shards are marked as successful. The reason is that we resend shard started
@ -451,40 +514,30 @@ public class ShardStateAction extends AbstractComponent {
}
}
public static class ShardEntry extends TransportRequest {
ShardId shardId;
String allocationId;
long primaryTerm;
String message;
Exception failure;
public static class StartedShardEntry extends TransportRequest {
final ShardId shardId;
final String allocationId;
final String message;
public ShardEntry() {
}
public ShardEntry(ShardId shardId, String allocationId, long primaryTerm, String message, @Nullable Exception failure) {
this.shardId = shardId;
this.allocationId = allocationId;
this.primaryTerm = primaryTerm;
this.message = message;
this.failure = failure;
}
public ShardId getShardId() {
return shardId;
}
public String getAllocationId() {
return allocationId;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
StartedShardEntry(StreamInput in) throws IOException {
super(in);
shardId = ShardId.readShardId(in);
allocationId = in.readString();
primaryTerm = in.readVLong();
message = in.readString();
failure = in.readException();
if (in.getVersion().before(Version.V_7_0_0_alpha1)) {
final long primaryTerm = in.readVLong();
assert primaryTerm == 0L : "shard is only started by itself: primary term [" + primaryTerm + "]";
}
this.message = in.readString();
if (in.getVersion().before(Version.V_7_0_0_alpha1)) {
final Exception ex = in.readException();
assert ex == null : "started shard must not have failure [" + ex + "]";
}
}
public StartedShardEntry(ShardId shardId, String allocationId, String message) {
this.shardId = shardId;
this.allocationId = allocationId;
this.message = message;
}
@Override
@ -492,22 +545,19 @@ public class ShardStateAction extends AbstractComponent {
super.writeTo(out);
shardId.writeTo(out);
out.writeString(allocationId);
out.writeVLong(primaryTerm);
if (out.getVersion().before(Version.V_7_0_0_alpha1)) {
out.writeVLong(0L);
}
out.writeString(message);
out.writeException(failure);
if (out.getVersion().before(Version.V_7_0_0_alpha1)) {
out.writeException(null);
}
}
@Override
public String toString() {
List<String> components = new ArrayList<>(4);
components.add("shard id [" + shardId + "]");
components.add("allocation id [" + allocationId + "]");
components.add("primary term [" + primaryTerm + "]");
components.add("message [" + message + "]");
if (failure != null) {
components.add("failure [" + ExceptionsHelper.detailedMessage(failure) + "]");
}
return String.join(", ", components);
return String.format(Locale.ROOT, "StartedShardEntry{shardId [%s], allocationId [%s], message [%s]}",
shardId, allocationId, message);
}
}

View File

@ -138,8 +138,8 @@ public class AllocationService extends AbstractComponent {
}
// Used for testing
public ClusterState applyFailedShard(ClusterState clusterState, ShardRouting failedShard) {
return applyFailedShards(clusterState, singletonList(new FailedShard(failedShard, null, null)), emptyList());
public ClusterState applyFailedShard(ClusterState clusterState, ShardRouting failedShard, boolean markAsStale) {
return applyFailedShards(clusterState, singletonList(new FailedShard(failedShard, null, null, markAsStale)), emptyList());
}
// Used for testing
@ -185,6 +185,9 @@ public class AllocationService extends AbstractComponent {
UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, message,
failedShardEntry.getFailure(), failedAllocations + 1, currentNanoTime, System.currentTimeMillis(), false,
AllocationStatus.NO_ATTEMPT);
if (failedShardEntry.markAsStale()) {
allocation.removeAllocationId(failedShard);
}
routingNodes.failShard(logger, failedShard, unassignedInfo, indexMetaData, allocation.changes());
} else {
logger.trace("{} shard routing failed in an earlier iteration (routing: {})", shardToFail.shardId(), shardToFail);

View File

@ -30,18 +30,20 @@ public class FailedShard {
private final ShardRouting routingEntry;
private final String message;
private final Exception failure;
private final boolean markAsStale;
public FailedShard(ShardRouting routingEntry, String message, Exception failure) {
public FailedShard(ShardRouting routingEntry, String message, Exception failure, boolean markAsStale) {
assert routingEntry.assignedToNode() : "only assigned shards can be failed " + routingEntry;
this.routingEntry = routingEntry;
this.message = message;
this.failure = failure;
this.markAsStale = markAsStale;
}
@Override
public String toString() {
return "failed shard, shard " + routingEntry + ", message [" + message + "], failure [" +
ExceptionsHelper.detailedMessage(failure) + "]";
ExceptionsHelper.detailedMessage(failure) + "], markAsStale [" + markAsStale + "]";
}
/**
@ -66,4 +68,11 @@ public class FailedShard {
public Exception getFailure() {
return failure;
}
/**
* Whether or not to mark the shard as stale (eg. removing from in-sync set) when failing the shard.
*/
public boolean markAsStale() {
return markAsStale;
}
}

View File

@ -72,19 +72,12 @@ public class IndexMetaDataUpdater extends RoutingChangesObserver.AbstractRouting
@Override
public void shardFailed(ShardRouting failedShard, UnassignedInfo unassignedInfo) {
if (failedShard.active() && unassignedInfo.getReason() != UnassignedInfo.Reason.NODE_LEFT) {
removeAllocationId(failedShard);
if (failedShard.primary()) {
Updates updates = changes(failedShard.shardId());
if (updates.firstFailedPrimary == null) {
// more than one primary can be failed (because of batching, primary can be failed, replica promoted and then failed...)
updates.firstFailedPrimary = failedShard;
}
}
}
if (failedShard.active() && failedShard.primary()) {
Updates updates = changes(failedShard.shardId());
if (updates.firstFailedPrimary == null) {
// more than one primary can be failed (because of batching, primary can be failed, replica promoted and then failed...)
updates.firstFailedPrimary = failedShard;
}
increasePrimaryTerm(failedShard.shardId());
}
}
@ -286,8 +279,10 @@ public class IndexMetaDataUpdater extends RoutingChangesObserver.AbstractRouting
/**
* Remove allocation id of this shard from the set of in-sync shard copies
*/
private void removeAllocationId(ShardRouting shardRouting) {
changes(shardRouting.shardId()).removedAllocationIds.add(shardRouting.allocationId().getId());
void removeAllocationId(ShardRouting shardRouting) {
if (shardRouting.active()) {
changes(shardRouting.shardId()).removedAllocationIds.add(shardRouting.allocationId().getId());
}
}
/**

View File

@ -27,6 +27,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingChangesObserver;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.common.collect.ImmutableOpenMap;
@ -222,6 +223,13 @@ public class RoutingAllocation {
return unmodifiableSet(new HashSet<>(ignore));
}
/**
* Remove the allocation id of the provided shard from the set of in-sync shard copies
*/
public void removeAllocationId(ShardRouting shardRouting) {
indexMetaDataUpdater.removeAllocationId(shardRouting);
}
/**
* Returns observer to use for changes made to the routing nodes
*/

View File

@ -156,6 +156,8 @@ public class CancelAllocationCommand implements AllocationCommand {
}
routingNodes.failShard(Loggers.getLogger(CancelAllocationCommand.class), shardRouting,
new UnassignedInfo(UnassignedInfo.Reason.REROUTE_CANCELLED, null), indexMetaData, allocation.changes());
// TODO: We don't have to remove a cancelled shard from in-sync set once we have a strict resync implementation.
allocation.removeAllocationId(shardRouting);
return new RerouteExplanation(this, allocation.decision(Decision.YES, "cancel_allocation_command",
"shard " + shardId + " on node " + discoNode + " can be cancelled"));
}

View File

@ -120,7 +120,7 @@ public class ClusterRerouteTests extends ESAllocationTestCase {
assertEquals(routingTable.index("idx").shard(0).shards().get(0).unassignedInfo().getNumFailedAllocations(), i);
List<FailedShard> failedShards = Collections.singletonList(
new FailedShard(routingTable.index("idx").shard(0).shards().get(0), "boom" + i,
new UnsupportedOperationException()));
new UnsupportedOperationException(), randomBoolean()));
newState = allocationService.applyFailedShards(clusterState, failedShards);
assertThat(newState, not(equalTo(clusterState)));
clusterState = newState;

View File

@ -315,7 +315,7 @@ public class TransportWriteActionTests extends ESTestCase {
// A write replication action proxy should fail the shard
assertEquals(1, shardFailedRequests.length);
CapturingTransport.CapturedRequest shardFailedRequest = shardFailedRequests[0];
ShardStateAction.ShardEntry shardEntry = (ShardStateAction.ShardEntry) shardFailedRequest.request;
ShardStateAction.FailedShardEntry shardEntry = (ShardStateAction.FailedShardEntry) shardFailedRequest.request;
// the shard the request was sent to and the shard to be failed should be the same
assertEquals(shardEntry.getShardId(), replica.shardId());
assertEquals(shardEntry.getAllocationId(), replica.allocationId().getId());

View File

@ -22,6 +22,7 @@ package org.elasticsearch.cluster.action.shard;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.apache.lucene.index.CorruptIndexException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.action.shard.ShardStateAction.FailedShardEntry;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
@ -30,6 +31,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardIterator;
@ -42,20 +44,26 @@ import org.elasticsearch.cluster.routing.allocation.StaleShard;
import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.junit.Before;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.Matchers.contains;
public class ShardFailedClusterStateTaskExecutorTests extends ESAllocationTestCase {
@ -87,8 +95,8 @@ public class ShardFailedClusterStateTaskExecutorTests extends ESAllocationTestCa
}
public void testEmptyTaskListProducesSameClusterState() throws Exception {
List<ShardStateAction.ShardEntry> tasks = Collections.emptyList();
ClusterStateTaskExecutor.ClusterTasksResult<ShardStateAction.ShardEntry> result =
List<ShardStateAction.FailedShardEntry> tasks = Collections.emptyList();
ClusterStateTaskExecutor.ClusterTasksResult<ShardStateAction.FailedShardEntry> result =
executor.execute(clusterState, tasks);
assertTasksSuccessful(tasks, result, clusterState, false);
}
@ -96,35 +104,35 @@ public class ShardFailedClusterStateTaskExecutorTests extends ESAllocationTestCa
public void testDuplicateFailuresAreOkay() throws Exception {
String reason = "test duplicate failures are okay";
ClusterState currentState = createClusterStateWithStartedShards(reason);
List<ShardStateAction.ShardEntry> tasks = createExistingShards(currentState, reason);
ClusterStateTaskExecutor.ClusterTasksResult<ShardStateAction.ShardEntry> result = executor.execute(currentState, tasks);
List<FailedShardEntry> tasks = createExistingShards(currentState, reason);
ClusterStateTaskExecutor.ClusterTasksResult<FailedShardEntry> result = executor.execute(currentState, tasks);
assertTasksSuccessful(tasks, result, clusterState, true);
}
public void testNonExistentShardsAreMarkedAsSuccessful() throws Exception {
String reason = "test non existent shards are marked as successful";
ClusterState currentState = createClusterStateWithStartedShards(reason);
List<ShardStateAction.ShardEntry> tasks = createNonExistentShards(currentState, reason);
ClusterStateTaskExecutor.ClusterTasksResult<ShardStateAction.ShardEntry> result = executor.execute(clusterState, tasks);
List<FailedShardEntry> tasks = createNonExistentShards(currentState, reason);
ClusterStateTaskExecutor.ClusterTasksResult<FailedShardEntry> result = executor.execute(clusterState, tasks);
assertTasksSuccessful(tasks, result, clusterState, false);
}
public void testTriviallySuccessfulTasksBatchedWithFailingTasks() throws Exception {
String reason = "test trivially successful tasks batched with failing tasks";
ClusterState currentState = createClusterStateWithStartedShards(reason);
List<ShardStateAction.ShardEntry> failingTasks = createExistingShards(currentState, reason);
List<ShardStateAction.ShardEntry> nonExistentTasks = createNonExistentShards(currentState, reason);
List<FailedShardEntry> failingTasks = createExistingShards(currentState, reason);
List<FailedShardEntry> nonExistentTasks = createNonExistentShards(currentState, reason);
ShardStateAction.ShardFailedClusterStateTaskExecutor failingExecutor = new ShardStateAction.ShardFailedClusterStateTaskExecutor(allocationService, null, logger) {
@Override
ClusterState applyFailedShards(ClusterState currentState, List<FailedShard> failedShards, List<StaleShard> staleShards) {
throw new RuntimeException("simulated applyFailedShards failure");
}
};
List<ShardStateAction.ShardEntry> tasks = new ArrayList<>();
List<FailedShardEntry> tasks = new ArrayList<>();
tasks.addAll(failingTasks);
tasks.addAll(nonExistentTasks);
ClusterStateTaskExecutor.ClusterTasksResult<ShardStateAction.ShardEntry> result = failingExecutor.execute(currentState, tasks);
Map<ShardStateAction.ShardEntry, ClusterStateTaskExecutor.TaskResult> taskResultMap =
ClusterStateTaskExecutor.ClusterTasksResult<FailedShardEntry> result = failingExecutor.execute(currentState, tasks);
Map<FailedShardEntry, ClusterStateTaskExecutor.TaskResult> taskResultMap =
failingTasks.stream().collect(Collectors.toMap(Function.identity(), task -> ClusterStateTaskExecutor.TaskResult.failure(new RuntimeException("simulated applyFailedShards failure"))));
taskResultMap.putAll(nonExistentTasks.stream().collect(Collectors.toMap(Function.identity(), task -> ClusterStateTaskExecutor.TaskResult.success())));
assertTaskResults(taskResultMap, result, currentState, false);
@ -133,23 +141,47 @@ public class ShardFailedClusterStateTaskExecutorTests extends ESAllocationTestCa
public void testIllegalShardFailureRequests() throws Exception {
String reason = "test illegal shard failure requests";
ClusterState currentState = createClusterStateWithStartedShards(reason);
List<ShardStateAction.ShardEntry> failingTasks = createExistingShards(currentState, reason);
List<ShardStateAction.ShardEntry> tasks = new ArrayList<>();
for (ShardStateAction.ShardEntry failingTask : failingTasks) {
List<ShardStateAction.FailedShardEntry> failingTasks = createExistingShards(currentState, reason);
List<ShardStateAction.FailedShardEntry> tasks = new ArrayList<>();
for (ShardStateAction.FailedShardEntry failingTask : failingTasks) {
long primaryTerm = currentState.metaData().index(failingTask.shardId.getIndex()).primaryTerm(failingTask.shardId.id());
tasks.add(new ShardStateAction.ShardEntry(failingTask.shardId, failingTask.allocationId,
randomIntBetween(1, (int) primaryTerm - 1), failingTask.message, failingTask.failure));
tasks.add(new FailedShardEntry(failingTask.shardId, failingTask.allocationId,
randomIntBetween(1, (int) primaryTerm - 1), failingTask.message, failingTask.failure, randomBoolean()));
}
Map<ShardStateAction.ShardEntry, ClusterStateTaskExecutor.TaskResult> taskResultMap =
Map<FailedShardEntry, ClusterStateTaskExecutor.TaskResult> taskResultMap =
tasks.stream().collect(Collectors.toMap(
Function.identity(),
task -> ClusterStateTaskExecutor.TaskResult.failure(new ShardStateAction.NoLongerPrimaryShardException(task.shardId,
"primary term [" + task.primaryTerm + "] did not match current primary term [" +
currentState.metaData().index(task.shardId.getIndex()).primaryTerm(task.shardId.id()) + "]"))));
ClusterStateTaskExecutor.ClusterTasksResult<ShardStateAction.ShardEntry> result = executor.execute(currentState, tasks);
ClusterStateTaskExecutor.ClusterTasksResult<FailedShardEntry> result = executor.execute(currentState, tasks);
assertTaskResults(taskResultMap, result, currentState, false);
}
public void testMarkAsStaleWhenFailingShard() throws Exception {
final MockAllocationService allocation = createAllocationService();
ClusterState clusterState = createClusterStateWithStartedShards("test markAsStale");
clusterState = allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING));
IndexShardRoutingTable shardRoutingTable = clusterState.routingTable().index(INDEX).shard(0);
long primaryTerm = clusterState.metaData().index(INDEX).primaryTerm(0);
final Set<String> oldInSync = clusterState.metaData().index(INDEX).inSyncAllocationIds(0);
{
ShardStateAction.FailedShardEntry failShardOnly = new ShardStateAction.FailedShardEntry(shardRoutingTable.shardId(),
randomFrom(oldInSync), primaryTerm, "dummy", null, false);
ClusterState appliedState = executor.execute(clusterState, Arrays.asList(failShardOnly)).resultingState;
Set<String> newInSync = appliedState.metaData().index(INDEX).inSyncAllocationIds(0);
assertThat(newInSync, equalTo(oldInSync));
}
{
final String failedAllocationId = randomFrom(oldInSync);
ShardStateAction.FailedShardEntry failAndMarkAsStale = new ShardStateAction.FailedShardEntry(shardRoutingTable.shardId(),
failedAllocationId, primaryTerm, "dummy", null, true);
ClusterState appliedState = executor.execute(clusterState, Arrays.asList(failAndMarkAsStale)).resultingState;
Set<String> newInSync = appliedState.metaData().index(INDEX).inSyncAllocationIds(0);
assertThat(Sets.difference(oldInSync, newInSync), contains(failedAllocationId));
}
}
private ClusterState createClusterStateWithStartedShards(String reason) {
int numberOfNodes = 1 + numberOfReplicas;
DiscoveryNodes.Builder nodes = DiscoveryNodes.builder();
@ -163,7 +195,7 @@ public class ShardFailedClusterStateTaskExecutorTests extends ESAllocationTestCa
return allocationService.applyStartedShards(stateAfterReroute, routingNodes.shardsWithState(ShardRoutingState.INITIALIZING));
}
private List<ShardStateAction.ShardEntry> createExistingShards(ClusterState currentState, String reason) {
private List<ShardStateAction.FailedShardEntry> createExistingShards(ClusterState currentState, String reason) {
List<ShardRouting> shards = new ArrayList<>();
GroupShardsIterator<ShardIterator> shardGroups = currentState.routingTable().allAssignedShardsGrouped(new String[] { INDEX }, true);
for (ShardIterator shardIt : shardGroups) {
@ -181,7 +213,7 @@ public class ShardFailedClusterStateTaskExecutorTests extends ESAllocationTestCa
return toTasks(currentState, shardsToFail, indexUUID, reason);
}
private List<ShardStateAction.ShardEntry> createNonExistentShards(ClusterState currentState, String reason) {
private List<ShardStateAction.FailedShardEntry> createNonExistentShards(ClusterState currentState, String reason) {
// add shards from a non-existent index
String nonExistentIndexUUID = "non-existent";
Index index = new Index("non-existent", nonExistentIndexUUID);
@ -195,14 +227,15 @@ public class ShardFailedClusterStateTaskExecutorTests extends ESAllocationTestCa
nonExistentShards.add(nonExistentShardRouting(index, nodeIds, false));
}
List<ShardStateAction.ShardEntry> existingShards = createExistingShards(currentState, reason);
List<ShardStateAction.ShardEntry> shardsWithMismatchedAllocationIds = new ArrayList<>();
for (ShardStateAction.ShardEntry existingShard : existingShards) {
shardsWithMismatchedAllocationIds.add(new ShardStateAction.ShardEntry(existingShard.shardId, UUIDs.randomBase64UUID(), 0L, existingShard.message, existingShard.failure));
List<ShardStateAction.FailedShardEntry> existingShards = createExistingShards(currentState, reason);
List<ShardStateAction.FailedShardEntry> shardsWithMismatchedAllocationIds = new ArrayList<>();
for (ShardStateAction.FailedShardEntry existingShard : existingShards) {
shardsWithMismatchedAllocationIds.add(new ShardStateAction.FailedShardEntry(existingShard.shardId, UUIDs.randomBase64UUID(), 0L, existingShard.message, existingShard.failure, randomBoolean()));
}
List<ShardStateAction.ShardEntry> tasks = new ArrayList<>();
nonExistentShards.forEach(shard -> tasks.add(new ShardStateAction.ShardEntry(shard.shardId(), shard.allocationId().getId(), 0L, reason, new CorruptIndexException("simulated", nonExistentIndexUUID))));
List<ShardStateAction.FailedShardEntry> tasks = new ArrayList<>();
nonExistentShards.forEach(shard -> tasks.add(new ShardStateAction.FailedShardEntry(shard.shardId(), shard.allocationId().getId(), 0L,
reason, new CorruptIndexException("simulated", nonExistentIndexUUID), randomBoolean())));
tasks.addAll(shardsWithMismatchedAllocationIds);
return tasks;
}
@ -214,26 +247,26 @@ public class ShardFailedClusterStateTaskExecutorTests extends ESAllocationTestCa
}
private static void assertTasksSuccessful(
List<ShardStateAction.ShardEntry> tasks,
ClusterStateTaskExecutor.ClusterTasksResult<ShardStateAction.ShardEntry> result,
List<ShardStateAction.FailedShardEntry> tasks,
ClusterStateTaskExecutor.ClusterTasksResult<ShardStateAction.FailedShardEntry> result,
ClusterState clusterState,
boolean clusterStateChanged
) {
Map<ShardStateAction.ShardEntry, ClusterStateTaskExecutor.TaskResult> taskResultMap =
Map<ShardStateAction.FailedShardEntry, ClusterStateTaskExecutor.TaskResult> taskResultMap =
tasks.stream().collect(Collectors.toMap(Function.identity(), task -> ClusterStateTaskExecutor.TaskResult.success()));
assertTaskResults(taskResultMap, result, clusterState, clusterStateChanged);
}
private static void assertTaskResults(
Map<ShardStateAction.ShardEntry, ClusterStateTaskExecutor.TaskResult> taskResultMap,
ClusterStateTaskExecutor.ClusterTasksResult<ShardStateAction.ShardEntry> result,
Map<ShardStateAction.FailedShardEntry, ClusterStateTaskExecutor.TaskResult> taskResultMap,
ClusterStateTaskExecutor.ClusterTasksResult<ShardStateAction.FailedShardEntry> result,
ClusterState clusterState,
boolean clusterStateChanged
) {
// there should be as many task results as tasks
assertEquals(taskResultMap.size(), result.executionResults.size());
for (Map.Entry<ShardStateAction.ShardEntry, ClusterStateTaskExecutor.TaskResult> entry : taskResultMap.entrySet()) {
for (Map.Entry<ShardStateAction.FailedShardEntry, ClusterStateTaskExecutor.TaskResult> entry : taskResultMap.entrySet()) {
// every task should have a corresponding task result
assertTrue(result.executionResults.containsKey(entry.getKey()));
@ -242,7 +275,7 @@ public class ShardFailedClusterStateTaskExecutorTests extends ESAllocationTestCa
}
List<ShardRouting> shards = clusterState.getRoutingTable().allShards();
for (Map.Entry<ShardStateAction.ShardEntry, ClusterStateTaskExecutor.TaskResult> entry : taskResultMap.entrySet()) {
for (Map.Entry<ShardStateAction.FailedShardEntry, ClusterStateTaskExecutor.TaskResult> entry : taskResultMap.entrySet()) {
if (entry.getValue().isSuccess()) {
// the shard was successfully failed and so should not be in the routing table
for (ShardRouting shard : shards) {
@ -267,15 +300,15 @@ public class ShardFailedClusterStateTaskExecutorTests extends ESAllocationTestCa
}
}
private static List<ShardStateAction.ShardEntry> toTasks(ClusterState currentState, List<ShardRouting> shards, String indexUUID, String message) {
private static List<ShardStateAction.FailedShardEntry> toTasks(ClusterState currentState, List<ShardRouting> shards, String indexUUID, String message) {
return shards
.stream()
.map(shard -> new ShardStateAction.ShardEntry(
.map(shard -> new ShardStateAction.FailedShardEntry(
shard.shardId(),
shard.allocationId().getId(),
randomBoolean() ? 0L : currentState.metaData().getIndexSafe(shard.index()).primaryTerm(shard.id()),
message,
new CorruptIndexException("simulated", indexUUID)))
new CorruptIndexException("simulated", indexUUID), randomBoolean()))
.collect(Collectors.toList());
}
}

View File

@ -20,10 +20,13 @@
package org.elasticsearch.cluster.action.shard;
import org.apache.lucene.index.CorruptIndexException;
import org.elasticsearch.Version;
import org.elasticsearch.action.support.replication.ClusterStateCreationUtils;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.NotMasterException;
import org.elasticsearch.cluster.action.shard.ShardStateAction.FailedShardEntry;
import org.elasticsearch.cluster.action.shard.ShardStateAction.StartedShardEntry;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.RoutingService;
@ -32,15 +35,22 @@ import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardsIterator;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.test.transport.CapturingTransport;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.NodeDisconnectedException;
import org.elasticsearch.transport.NodeNotConnectedException;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;
import org.junit.After;
@ -48,6 +58,8 @@ import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import java.io.IOException;
import java.util.UUID;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@ -63,6 +75,7 @@ import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
public class ShardStateActionTests extends ESTestCase {
private static ThreadPool THREAD_POOL;
@ -90,9 +103,9 @@ public class ShardStateActionTests extends ESTestCase {
}
@Override
protected void waitForNewMasterAndRetry(String actionName, ClusterStateObserver observer, ShardEntry shardEntry, Listener listener, Predicate<ClusterState> changePredicate) {
protected void waitForNewMasterAndRetry(String actionName, ClusterStateObserver observer, TransportRequest request, Listener listener, Predicate<ClusterState> changePredicate) {
onBeforeWaitForNewMasterAndRetry.run();
super.waitForNewMasterAndRetry(actionName, observer, shardEntry, listener, changePredicate);
super.waitForNewMasterAndRetry(actionName, observer, request, listener, changePredicate);
onAfterWaitForNewMasterAndRetry.run();
}
}
@ -160,8 +173,8 @@ public class ShardStateActionTests extends ESTestCase {
CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear();
assertEquals(1, capturedRequests.length);
// the request is a shard failed request
assertThat(capturedRequests[0].request, is(instanceOf(ShardStateAction.ShardEntry.class)));
ShardStateAction.ShardEntry shardEntry = (ShardStateAction.ShardEntry) capturedRequests[0].request;
assertThat(capturedRequests[0].request, is(instanceOf(ShardStateAction.FailedShardEntry.class)));
ShardStateAction.FailedShardEntry shardEntry = (ShardStateAction.FailedShardEntry) capturedRequests[0].request;
// for the right shard
assertEquals(shardEntry.shardId, shardRouting.shardId());
assertEquals(shardEntry.allocationId, shardRouting.allocationId().getId());
@ -342,7 +355,7 @@ public class ShardStateActionTests extends ESTestCase {
long primaryTerm = clusterService.state().metaData().index(index).primaryTerm(failedShard.id());
assertThat(primaryTerm, greaterThanOrEqualTo(1L));
shardStateAction.remoteShardFailed(failedShard.shardId(), failedShard.allocationId().getId(), primaryTerm + 1, "test",
shardStateAction.remoteShardFailed(failedShard.shardId(), failedShard.allocationId().getId(), primaryTerm + 1, randomBoolean(), "test",
getSimulatedFailure(), new ShardStateAction.Listener() {
@Override
public void onSuccess() {
@ -407,4 +420,36 @@ public class ShardStateActionTests extends ESTestCase {
private Exception getSimulatedFailure() {
return new CorruptIndexException("simulated", (String) null);
}
public void testShardEntryBWCSerialize() throws Exception {
final Version bwcVersion = randomValueOtherThanMany(
version -> version.onOrAfter(Version.V_7_0_0_alpha1), () -> VersionUtils.randomVersion(random()));
final ShardId shardId = new ShardId(randomRealisticUnicodeOfLengthBetween(10, 100), UUID.randomUUID().toString(), between(0, 1000));
final String allocationId = randomRealisticUnicodeOfCodepointLengthBetween(10, 100);
final String reason = randomRealisticUnicodeOfCodepointLengthBetween(10, 100);
try (StreamInput in = serialize(new StartedShardEntry(shardId, allocationId, reason), bwcVersion).streamInput()) {
in.setVersion(bwcVersion);
final FailedShardEntry failedShardEntry = new FailedShardEntry(in);
assertThat(failedShardEntry.shardId, equalTo(shardId));
assertThat(failedShardEntry.allocationId, equalTo(allocationId));
assertThat(failedShardEntry.message, equalTo(reason));
assertThat(failedShardEntry.failure, nullValue());
assertThat(failedShardEntry.markAsStale, equalTo(true));
}
try (StreamInput in = serialize(new FailedShardEntry(shardId, allocationId, 0L, reason, null, false), bwcVersion).streamInput()) {
in.setVersion(bwcVersion);
final StartedShardEntry startedShardEntry = new StartedShardEntry(in);
assertThat(startedShardEntry.shardId, equalTo(shardId));
assertThat(startedShardEntry.allocationId, equalTo(allocationId));
assertThat(startedShardEntry.message, equalTo(reason));
}
}
BytesReference serialize(Writeable writeable, Version version) throws IOException {
try (BytesStreamOutput out = new BytesStreamOutput()) {
out.setVersion(version);
writeable.writeTo(out);
return out.bytes();
}
}
}

View File

@ -144,7 +144,7 @@ public class PrimaryTermsTests extends ESAllocationTestCase {
logger.info("failing primary shards {} for index [{}]", shardIdsToFail, index);
List<FailedShard> failedShards = new ArrayList<>();
for (int shard : shardIdsToFail) {
failedShards.add(new FailedShard(indexShardRoutingTable.shard(shard).primaryShard(), "test", null));
failedShards.add(new FailedShard(indexShardRoutingTable.shard(shard).primaryShard(), "test", null, randomBoolean()));
incrementPrimaryTerm(index, shard); // the primary failure should increment the primary term;
}
applyRerouteResult(allocationService.applyFailedShards(this.clusterState, failedShards,Collections.emptyList()));

View File

@ -254,7 +254,7 @@ public class UnassignedInfoTests extends ESAllocationTestCase {
assertThat(clusterState.getRoutingNodes().unassigned().size() > 0, equalTo(false));
// fail shard
ShardRouting shardToFail = clusterState.getRoutingNodes().shardsWithState(STARTED).get(0);
clusterState = allocation.applyFailedShards(clusterState, Collections.singletonList(new FailedShard(shardToFail, "test fail", null)));
clusterState = allocation.applyFailedShards(clusterState, Collections.singletonList(new FailedShard(shardToFail, "test fail", null, randomBoolean())));
// verify the reason and details
assertThat(clusterState.getRoutingNodes().unassigned().size() > 0, equalTo(true));
assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(1));

View File

@ -167,7 +167,7 @@ public class FailedNodeRoutingTests extends ESAllocationTestCase {
List<FailedShard> shardsToFail = new ArrayList<>();
List<ShardRouting> failedPrimaries = randomSubsetOf(primaries);
failedPrimaries.stream().forEach(sr -> {
shardsToFail.add(new FailedShard(randomFrom(sr), "failed primary", new Exception()));
shardsToFail.add(new FailedShard(randomFrom(sr), "failed primary", new Exception(), randomBoolean()));
});
logger.info("--> state before failing shards: {}", state);

View File

@ -117,7 +117,7 @@ public class FailedShardsRoutingTests extends ESAllocationTestCase {
assertThat(clusterState.getRoutingNodes().node("node3").iterator().next().state(), equalTo(INITIALIZING));
logger.info("--> fail primary shard recovering instance on node3 being initialized");
clusterState = allocation.applyFailedShard(clusterState, clusterState.getRoutingNodes().node("node3").iterator().next());
clusterState = allocation.applyFailedShard(clusterState, clusterState.getRoutingNodes().node("node3").iterator().next(), randomBoolean());
assertThat(clusterState.getRoutingNodes().node(origPrimaryNodeId).iterator().next().state(), equalTo(STARTED));
assertThat(clusterState.getRoutingNodes().node("node3").size(), equalTo(0));
@ -132,7 +132,7 @@ public class FailedShardsRoutingTests extends ESAllocationTestCase {
assertThat(clusterState.getRoutingNodes().node("node3").iterator().next().state(), equalTo(INITIALIZING));
logger.info("--> fail primary shard recovering instance on node1 being relocated");
clusterState = allocation.applyFailedShard(clusterState, clusterState.getRoutingNodes().node(origPrimaryNodeId).iterator().next());
clusterState = allocation.applyFailedShard(clusterState, clusterState.getRoutingNodes().node(origPrimaryNodeId).iterator().next(), randomBoolean());
// check promotion of replica to primary
assertThat(clusterState.getRoutingNodes().node(origReplicaNodeId).iterator().next().state(), equalTo(STARTED));
@ -200,7 +200,7 @@ public class FailedShardsRoutingTests extends ESAllocationTestCase {
logger.info("fail the primary shard, will have no place to be rerouted to (single node), so stays unassigned");
ShardRouting shardToFail = clusterState.routingTable().index("test").shard(0).primaryShard();
newState = strategy.applyFailedShard(clusterState, shardToFail);
newState = strategy.applyFailedShard(clusterState, shardToFail, randomBoolean());
assertThat(newState, not(equalTo(clusterState)));
clusterState = newState;
@ -249,7 +249,7 @@ public class FailedShardsRoutingTests extends ESAllocationTestCase {
logger.info("fail the first shard, will have no place to be rerouted to (single node), so stays unassigned");
ShardRouting firstShard = clusterState.getRoutingNodes().node("node1").iterator().next();
newState = strategy.applyFailedShard(clusterState, firstShard);
newState = strategy.applyFailedShard(clusterState, firstShard, randomBoolean());
assertThat(newState, not(equalTo(clusterState)));
clusterState = newState;
@ -305,7 +305,7 @@ public class FailedShardsRoutingTests extends ESAllocationTestCase {
logger.info("failing shard on node [{}]", failedNode);
ShardRouting shardToFail = routingNodes.node(failedNode).iterator().next();
if (shardRoutingsToFail.contains(shardToFail) == false) {
failedShards.add(new FailedShard(shardToFail, null, null));
failedShards.add(new FailedShard(shardToFail, null, null, randomBoolean()));
failedNodes.add(failedNode);
shardRoutingsToFail.add(shardToFail);
}
@ -364,7 +364,7 @@ public class FailedShardsRoutingTests extends ESAllocationTestCase {
logger.info("fail the first shard, will start INITIALIZING on the second node");
final ShardRouting firstShard = clusterState.getRoutingNodes().node(nodeHoldingPrimary).iterator().next();
newState = strategy.applyFailedShard(clusterState, firstShard);
newState = strategy.applyFailedShard(clusterState, firstShard, randomBoolean());
assertThat(newState, not(equalTo(clusterState)));
clusterState = newState;
@ -455,7 +455,7 @@ public class FailedShardsRoutingTests extends ESAllocationTestCase {
logger.info("Fail the shards on node 3");
ShardRouting shardToFail = routingNodes.node("node3").iterator().next();
newState = strategy.applyFailedShard(clusterState, shardToFail);
newState = strategy.applyFailedShard(clusterState, shardToFail, randomBoolean());
assertThat(newState, not(equalTo(clusterState)));
clusterState = newState;
routingNodes = clusterState.getRoutingNodes();
@ -507,7 +507,7 @@ public class FailedShardsRoutingTests extends ESAllocationTestCase {
// fail the primary shard, check replicas get removed as well...
ShardRouting primaryShardToFail = clusterState.routingTable().index("test").shard(0).primaryShard();
ClusterState newState = allocation.applyFailedShard(clusterState, primaryShardToFail);
ClusterState newState = allocation.applyFailedShard(clusterState, primaryShardToFail, randomBoolean());
assertThat(newState, not(equalTo(clusterState)));
clusterState = newState;
// the primary gets allocated on another node, replicas are initializing
@ -550,7 +550,7 @@ public class FailedShardsRoutingTests extends ESAllocationTestCase {
// fail the primary shard, check one replica gets elected to primary, others become INITIALIZING (from it)
ShardRouting primaryShardToFail = clusterState.routingTable().index("test").shard(0).primaryShard();
ClusterState newState = allocation.applyFailedShard(clusterState, primaryShardToFail);
ClusterState newState = allocation.applyFailedShard(clusterState, primaryShardToFail, randomBoolean());
assertThat(newState, not(equalTo(clusterState)));
clusterState = newState;
assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(1));
@ -620,7 +620,7 @@ public class FailedShardsRoutingTests extends ESAllocationTestCase {
// fail the primary shard again and make sure the correct replica is promoted
ShardRouting primaryShardToFail = clusterState.routingTable().index("test").shard(0).primaryShard();
ClusterState newState = allocation.applyFailedShard(clusterState, primaryShardToFail);
ClusterState newState = allocation.applyFailedShard(clusterState, primaryShardToFail, randomBoolean());
assertThat(newState, not(equalTo(clusterState)));
clusterState = newState;
// the primary gets allocated on another node
@ -649,7 +649,7 @@ public class FailedShardsRoutingTests extends ESAllocationTestCase {
// fail the primary shard again, and ensure the same thing happens
ShardRouting secondPrimaryShardToFail = clusterState.routingTable().index("test").shard(0).primaryShard();
newState = allocation.applyFailedShard(clusterState, secondPrimaryShardToFail);
newState = allocation.applyFailedShard(clusterState, secondPrimaryShardToFail, randomBoolean());
assertThat(newState, not(equalTo(clusterState)));
clusterState = newState;
// the primary gets allocated on another node

View File

@ -23,7 +23,7 @@ import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ESAllocationTestCase;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction.ShardEntry;
import org.elasticsearch.cluster.action.shard.ShardStateAction.FailedShardEntry;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes;
@ -135,7 +135,7 @@ public class InSyncAllocationIdTests extends ESAllocationTestCase {
logger.info("fail primary shard");
ShardRouting startedPrimary = clusterState.getRoutingNodes().shardsWithState(STARTED).get(0);
clusterState = allocation.applyFailedShard(clusterState, startedPrimary);
clusterState = allocation.applyFailedShard(clusterState, startedPrimary, true);
assertThat(clusterState.getRoutingTable().shardsWithState(STARTED).size(), equalTo(0));
assertEquals(Collections.singleton(startedPrimary.allocationId().getId()),
@ -167,7 +167,7 @@ public class InSyncAllocationIdTests extends ESAllocationTestCase {
new ShardStateAction.ShardFailedClusterStateTaskExecutor(allocation, null, logger);
long primaryTerm = clusterState.metaData().index("test").primaryTerm(0);
clusterState = failedClusterStateTaskExecutor.execute(clusterState, Arrays.asList(
new ShardEntry(shardRoutingTable.shardId(), replicaShard.allocationId().getId(), primaryTerm, "dummy", null))
new FailedShardEntry(shardRoutingTable.shardId(), replicaShard.allocationId().getId(), primaryTerm, "dummy", null, true))
).resultingState;
assertThat(clusterState.metaData().index("test").inSyncAllocationIds(0).size(), equalTo(1));
@ -189,11 +189,11 @@ public class InSyncAllocationIdTests extends ESAllocationTestCase {
long primaryTerm = clusterState.metaData().index("test").primaryTerm(0);
List<ShardEntry> failureEntries = new ArrayList<>();
failureEntries.add(new ShardEntry(
shardRoutingTable.shardId(), primaryShard.allocationId().getId(), 0L, "dummy", null));
failureEntries.add(new ShardEntry(
shardRoutingTable.shardId(), replicaShard.allocationId().getId(), primaryTerm, "dummy", null));
List<FailedShardEntry> failureEntries = new ArrayList<>();
failureEntries.add(new FailedShardEntry(
shardRoutingTable.shardId(), primaryShard.allocationId().getId(), 0L, "dummy", null, true));
failureEntries.add(new FailedShardEntry(
shardRoutingTable.shardId(), replicaShard.allocationId().getId(), primaryTerm, "dummy", null, true));
Collections.shuffle(failureEntries, random());
logger.info("Failing {}", failureEntries);
@ -333,8 +333,8 @@ public class InSyncAllocationIdTests extends ESAllocationTestCase {
assertEquals(inSyncSet, clusterState.metaData().index("test").inSyncAllocationIds(0));
logger.info("fail primary shard");
clusterState = failedClusterStateTaskExecutor.execute(clusterState, Collections.singletonList(new ShardEntry(
shardRoutingTable.shardId(), primaryShard.allocationId().getId(), 0L, "dummy", null))).resultingState;
clusterState = failedClusterStateTaskExecutor.execute(clusterState, Collections.singletonList(new FailedShardEntry(
shardRoutingTable.shardId(), primaryShard.allocationId().getId(), 0L, "dummy", null, true))).resultingState;
assertThat(clusterState.routingTable().index("test").shard(0).assignedShards().size(), equalTo(0));
// in-sync allocation ids should not be updated

View File

@ -91,7 +91,7 @@ public class MaxRetryAllocationDeciderTests extends ESAllocationTestCase {
for (int i = 0; i < retries-1; i++) {
List<FailedShard> failedShards = Collections.singletonList(
new FailedShard(routingTable.index("idx").shard(0).shards().get(0), "boom" + i,
new UnsupportedOperationException()));
new UnsupportedOperationException(), randomBoolean()));
ClusterState newState = strategy.applyFailedShards(clusterState, failedShards);
assertThat(newState, not(equalTo(clusterState)));
clusterState = newState;
@ -104,7 +104,7 @@ public class MaxRetryAllocationDeciderTests extends ESAllocationTestCase {
// now we go and check that we are actually stick to unassigned on the next failure
List<FailedShard> failedShards = Collections.singletonList(
new FailedShard(routingTable.index("idx").shard(0).shards().get(0), "boom",
new UnsupportedOperationException()));
new UnsupportedOperationException(), randomBoolean()));
ClusterState newState = strategy.applyFailedShards(clusterState, failedShards);
assertThat(newState, not(equalTo(clusterState)));
clusterState = newState;
@ -130,7 +130,7 @@ public class MaxRetryAllocationDeciderTests extends ESAllocationTestCase {
for (int i = 0; i < retries-1; i++) {
failedShards = Collections.singletonList(
new FailedShard(routingTable.index("idx").shard(0).shards().get(0), "boom",
new UnsupportedOperationException()));
new UnsupportedOperationException(), randomBoolean()));
newState = strategy.applyFailedShards(clusterState, failedShards);
assertThat(newState, not(equalTo(clusterState)));
@ -145,7 +145,7 @@ public class MaxRetryAllocationDeciderTests extends ESAllocationTestCase {
// now we go and check that we are actually stick to unassigned on the next failure
failedShards = Collections.singletonList(
new FailedShard(routingTable.index("idx").shard(0).shards().get(0), "boom",
new UnsupportedOperationException()));
new UnsupportedOperationException(), randomBoolean()));
newState = strategy.applyFailedShards(clusterState, failedShards);
assertThat(newState, not(equalTo(clusterState)));
clusterState = newState;
@ -164,7 +164,7 @@ public class MaxRetryAllocationDeciderTests extends ESAllocationTestCase {
for (int i = 0; i < retries-1; i++) {
List<FailedShard> failedShards = Collections.singletonList(
new FailedShard(routingTable.index("idx").shard(0).shards().get(0), "boom" + i,
new UnsupportedOperationException()));
new UnsupportedOperationException(), randomBoolean()));
ClusterState newState = strategy.applyFailedShards(clusterState, failedShards);
assertThat(newState, not(equalTo(clusterState)));
clusterState = newState;
@ -182,7 +182,7 @@ public class MaxRetryAllocationDeciderTests extends ESAllocationTestCase {
{
List<FailedShard> failedShards = Collections.singletonList(
new FailedShard(routingTable.index("idx").shard(0).shards().get(0), "boom",
new UnsupportedOperationException()));
new UnsupportedOperationException(), randomBoolean()));
ClusterState newState = strategy.applyFailedShards(clusterState, failedShards);
assertThat(newState, not(equalTo(clusterState)));
clusterState = newState;
@ -231,7 +231,7 @@ public class MaxRetryAllocationDeciderTests extends ESAllocationTestCase {
// now fail again and see if it has a new counter
List<FailedShard> failedShards = Collections.singletonList(
new FailedShard(routingTable.index("idx").shard(0).shards().get(0), "ZOOOMG",
new UnsupportedOperationException()));
new UnsupportedOperationException(), randomBoolean()));
newState = strategy.applyFailedShards(clusterState, failedShards);
assertThat(newState, not(equalTo(clusterState)));
clusterState = newState;

View File

@ -194,7 +194,7 @@ public class SingleShardNoReplicasRoutingTests extends ESAllocationTestCase {
logger.info("Marking the shard as failed");
RoutingNodes routingNodes = clusterState.getRoutingNodes();
newState = strategy.applyFailedShard(clusterState, routingNodes.node("node1").shardsWithState(INITIALIZING).get(0));
newState = strategy.applyFailedShard(clusterState, routingNodes.node("node1").shardsWithState(INITIALIZING).get(0), randomBoolean());
assertThat(newState, not(equalTo(clusterState)));
clusterState = newState;

View File

@ -66,7 +66,7 @@ public class FilterAllocationDeciderTests extends ESAllocationTestCase {
// we can initally only allocate on node2
assertEquals(routingTable.index("idx").shard(0).shards().get(0).state(), INITIALIZING);
assertEquals(routingTable.index("idx").shard(0).shards().get(0).currentNodeId(), "node2");
routingTable = service.applyFailedShard(state, routingTable.index("idx").shard(0).shards().get(0)).routingTable();
routingTable = service.applyFailedShard(state, routingTable.index("idx").shard(0).shards().get(0), randomBoolean()).routingTable();
state = ClusterState.builder(state).routingTable(routingTable).build();
assertEquals(routingTable.index("idx").shard(0).shards().get(0).state(), UNASSIGNED);
assertNull(routingTable.index("idx").shard(0).shards().get(0).currentNodeId());
@ -114,7 +114,7 @@ public class FilterAllocationDeciderTests extends ESAllocationTestCase {
state = service.deassociateDeadNodes(
ClusterState.builder(state).nodes(DiscoveryNodes.builder(state.nodes()).remove("node1")).build(),
true, "test");
state = service.applyFailedShard(state, routingTable.index("idx").shard(0).primaryShard());
state = service.applyFailedShard(state, routingTable.index("idx").shard(0).primaryShard(), randomBoolean());
// now bring back node1 and see it's assigned
state = service.reroute(

View File

@ -46,7 +46,8 @@ import org.elasticsearch.cluster.ClusterStateTaskExecutor.ClusterTasksResult;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.EmptyClusterInfoService;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction.ShardEntry;
import org.elasticsearch.cluster.action.shard.ShardStateAction.StartedShardEntry;
import org.elasticsearch.cluster.action.shard.ShardStateAction.FailedShardEntry;
import org.elasticsearch.cluster.metadata.AliasValidator;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
@ -221,16 +222,16 @@ public class ClusterStateChanges extends AbstractComponent {
}
public ClusterState applyFailedShards(ClusterState clusterState, List<FailedShard> failedShards) {
List<ShardEntry> entries = failedShards.stream().map(failedShard ->
new ShardEntry(failedShard.getRoutingEntry().shardId(), failedShard.getRoutingEntry().allocationId().getId(),
0L, failedShard.getMessage(), failedShard.getFailure()))
List<FailedShardEntry> entries = failedShards.stream().map(failedShard ->
new FailedShardEntry(failedShard.getRoutingEntry().shardId(), failedShard.getRoutingEntry().allocationId().getId(),
0L, failedShard.getMessage(), failedShard.getFailure(), failedShard.markAsStale()))
.collect(Collectors.toList());
return runTasks(shardFailedClusterStateTaskExecutor, clusterState, entries);
}
public ClusterState applyStartedShards(ClusterState clusterState, List<ShardRouting> startedShards) {
List<ShardEntry> entries = startedShards.stream().map(startedShard ->
new ShardEntry(startedShard.shardId(), startedShard.allocationId().getId(), 0L, "shard started", null))
List<StartedShardEntry> entries = startedShards.stream().map(startedShard ->
new StartedShardEntry(startedShard.shardId(), startedShard.allocationId().getId(), "shard started"))
.collect(Collectors.toList());
return runTasks(shardStartedClusterStateTaskExecutor, clusterState, entries);
}

View File

@ -333,7 +333,7 @@ public class IndicesClusterStateServiceRandomUpdatesTests extends AbstractIndice
if (persistedShardRouting.initializing() && randomBoolean()) {
startedShards.add(persistedShardRouting);
} else if (rarely()) {
failedShards.add(new FailedShard(persistedShardRouting, "fake shard failure", new Exception()));
failedShards.add(new FailedShard(persistedShardRouting, "fake shard failure", new Exception(), randomBoolean()));
}
}
}