Illegal shard failure requests

Today, shard failure requests are blindly handled on the master without
any validation that the request is a legal request. A legal request is a
shard failure request for which the shard requesting the failure is
either the local allocation or the primary allocation. This is because
shard failure requests are classified into only two sets: requests that
correspond to shards that exist, and requests that correspond to shards
that do not exist. Requests that correspond to shards that do not exist
are immediately marked as successful (there is nothing to do), and
requests that correspond to shards that do exist are sent to the
allocation service for handling the failure.

This pull request adds a third classification for shard failure requests
to separate out illegal shard failure requests and enables the master to
validate shard failure requests. The master communicates the illegality
of a shard failure request via a new exception:
NoLongerPrimaryShardException. This exception can be used by shard
failure listeners to discover when they've sent a shard failure request
that they were not allowed to send (e.g., if they are no longer the
primary allocation for the shard).

Closes #16275
This commit is contained in:
Jason Tedor 2016-01-27 08:51:18 -05:00
parent 3906edae80
commit a3a49a12ef
11 changed files with 302 additions and 107 deletions

View File

@ -19,6 +19,7 @@
package org.elasticsearch;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -613,7 +614,8 @@ public class ElasticsearchException extends RuntimeException implements ToXConte
RETRY_ON_REPLICA_EXCEPTION(org.elasticsearch.action.support.replication.TransportReplicationAction.RetryOnReplicaException.class, org.elasticsearch.action.support.replication.TransportReplicationAction.RetryOnReplicaException::new, 136),
TYPE_MISSING_EXCEPTION(org.elasticsearch.indices.TypeMissingException.class, org.elasticsearch.indices.TypeMissingException::new, 137),
FAILED_TO_COMMIT_CLUSTER_STATE_EXCEPTION(org.elasticsearch.discovery.Discovery.FailedToCommitClusterStateException.class, org.elasticsearch.discovery.Discovery.FailedToCommitClusterStateException::new, 140),
QUERY_SHARD_EXCEPTION(org.elasticsearch.index.query.QueryShardException.class, org.elasticsearch.index.query.QueryShardException::new, 141);
QUERY_SHARD_EXCEPTION(org.elasticsearch.index.query.QueryShardException.class, org.elasticsearch.index.query.QueryShardException::new, 141),
NO_LONGER_PRIMARY_SHARD_EXCEPTION(ShardStateAction.NoLongerPrimaryShardException.class, ShardStateAction.NoLongerPrimaryShardException::new, 142);
final Class<? extends ElasticsearchException> exceptionClass;
final FunctionThatThrowsIOException<StreamInput, ? extends ElasticsearchException> constructor;

View File

@ -778,16 +778,15 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
private final List<ShardRouting> shards;
private final DiscoveryNodes nodes;
private final boolean executeOnReplica;
private final String indexUUID;
private final AtomicBoolean finished = new AtomicBoolean();
private final AtomicInteger success = new AtomicInteger(1); // We already wrote into the primary shard
private final ConcurrentMap<String, Throwable> shardReplicaFailures = ConcurrentCollections.newConcurrentMap();
private final AtomicInteger pending;
private final int totalShards;
private final Releasable indexShardReference;
private final IndexShardReference indexShardReference;
public ReplicationPhase(ReplicaRequest replicaRequest, Response finalResponse, ShardId shardId,
TransportChannel channel, Releasable indexShardReference) {
TransportChannel channel, IndexShardReference indexShardReference) {
this.replicaRequest = replicaRequest;
this.channel = channel;
this.finalResponse = finalResponse;
@ -804,7 +803,6 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
final IndexMetaData indexMetaData = state.getMetaData().index(shardId.getIndex());
this.shards = (shardRoutingTable != null) ? shardRoutingTable.shards() : Collections.emptyList();
this.executeOnReplica = (indexMetaData == null) || shouldExecuteReplication(indexMetaData.getSettings());
this.indexUUID = (indexMetaData != null) ? indexMetaData.getIndexUUID() : null;
this.nodes = state.getNodes();
if (shards.isEmpty()) {
@ -940,22 +938,22 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
String message = String.format(Locale.ROOT, "failed to perform %s on replica on node %s", transportReplicaAction, node);
logger.warn("[{}] {}", exp, shardId, message);
shardStateAction.shardFailed(
shard,
indexUUID,
message,
exp,
new ShardStateAction.Listener() {
@Override
public void onSuccess() {
onReplicaFailure(nodeId, exp);
}
@Override
public void onFailure(Throwable t) {
// TODO: handle catastrophic non-channel failures
onReplicaFailure(nodeId, exp);
}
shard,
indexShardReference.routingEntry(),
message,
exp,
new ShardStateAction.Listener() {
@Override
public void onSuccess() {
onReplicaFailure(nodeId, exp);
}
@Override
public void onFailure(Throwable t) {
// TODO: handle catastrophic non-channel failures
onReplicaFailure(nodeId, exp);
}
}
);
}
}

View File

@ -123,6 +123,11 @@ public interface ClusterStateTaskExecutor<T> {
return this == SUCCESS;
}
public Throwable getFailure() {
assert !isSuccess();
return failure;
}
/**
* Handle the execution result with the provided consumers
* @param onSuccess handler to invoke on success

View File

@ -19,6 +19,7 @@
package org.elasticsearch.cluster.action.shard;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
@ -28,8 +29,9 @@ import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.ClusterStateTaskListener;
import org.elasticsearch.cluster.MasterNodeChangePredicate;
import org.elasticsearch.cluster.NotMasterException;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingService;
import org.elasticsearch.cluster.routing.ShardRouting;
@ -46,6 +48,7 @@ import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ConnectTransportException;
@ -60,6 +63,7 @@ import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
@ -125,17 +129,22 @@ public class ShardStateAction extends AbstractComponent {
return ExceptionsHelper.unwrap(exp, MASTER_CHANNEL_EXCEPTIONS) != null;
}
public void shardFailed(final ShardRouting shardRouting, final String indexUUID, final String message, @Nullable final Throwable failure, Listener listener) {
/**
* Send a shard failed request to the master node to update the
* cluster state.
*
* @param shardRouting the shard to fail
* @param sourceShardRouting the source shard requesting the failure (must be the shard itself, or the primary shard)
* @param message the reason for the failure
* @param failure the underlying cause of the failure
* @param listener callback upon completion of the request
*/
public void shardFailed(final ShardRouting shardRouting, ShardRouting sourceShardRouting, final String message, @Nullable final Throwable failure, Listener listener) {
ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger, threadPool.getThreadContext());
ShardRoutingEntry shardRoutingEntry = new ShardRoutingEntry(shardRouting, indexUUID, message, failure);
ShardRoutingEntry shardRoutingEntry = new ShardRoutingEntry(shardRouting, sourceShardRouting, message, failure);
sendShardAction(SHARD_FAILED_ACTION_NAME, observer, shardRoutingEntry, listener);
}
public void resendShardFailed(final ShardRouting shardRouting, final String indexUUID, final String message, @Nullable final Throwable failure, Listener listener) {
logger.trace("{} re-sending failed shard [{}], index UUID [{}], reason [{}]", shardRouting.shardId(), failure, shardRouting, indexUUID, message);
shardFailed(shardRouting, indexUUID, message, failure, listener);
}
// visible for testing
protected void waitForNewMasterAndRetry(String actionName, ClusterStateObserver observer, ShardRoutingEntry shardRoutingEntry, Listener listener) {
observer.waitForNextChange(new ClusterStateObserver.Listener() {
@ -231,15 +240,15 @@ public class ShardStateAction extends AbstractComponent {
// partition tasks into those that correspond to shards
// that exist versus do not exist
Map<Boolean, List<ShardRoutingEntry>> partition =
tasks.stream().collect(Collectors.partitioningBy(task -> shardExists(currentState, task)));
Map<ValidationResult, List<ShardRoutingEntry>> partition =
tasks.stream().collect(Collectors.groupingBy(task -> validateTask(currentState, task)));
// tasks that correspond to non-existent shards are marked
// as successful
batchResultBuilder.successes(partition.get(false));
batchResultBuilder.successes(partition.getOrDefault(ValidationResult.SHARD_MISSING, Collections.emptyList()));
ClusterState maybeUpdatedState = currentState;
List<ShardRoutingEntry> tasksToFail = partition.get(true);
List<ShardRoutingEntry> tasksToFail = partition.getOrDefault(ValidationResult.VALID, Collections.emptyList());
try {
List<FailedRerouteAllocation.FailedShard> failedShards =
tasksToFail
@ -257,6 +266,15 @@ public class ShardStateAction extends AbstractComponent {
batchResultBuilder.failures(tasksToFail, t);
}
partition
.getOrDefault(ValidationResult.SOURCE_INVALID, Collections.emptyList())
.forEach(task -> batchResultBuilder.failure(
task,
new NoLongerPrimaryShardException(
task.getShardRouting().shardId(),
"source shard [" + task.sourceShardRouting + "] is neither the local allocation nor the primary allocation")
));
return batchResultBuilder.build(maybeUpdatedState);
}
@ -265,17 +283,36 @@ public class ShardStateAction extends AbstractComponent {
return allocationService.applyFailedShards(currentState, failedShards);
}
private boolean shardExists(ClusterState currentState, ShardRoutingEntry task) {
private enum ValidationResult {
VALID,
SOURCE_INVALID,
SHARD_MISSING
}
private ValidationResult validateTask(ClusterState currentState, ShardRoutingEntry task) {
// non-local requests
if (!task.shardRouting.isSameAllocation(task.sourceShardRouting)) {
IndexShardRoutingTable indexShard = currentState.getRoutingTable().shardRoutingTableOrNull(task.shardRouting.shardId());
if (indexShard == null) {
return ValidationResult.SOURCE_INVALID;
}
ShardRouting primaryShard = indexShard.primaryShard();
if (primaryShard == null || !primaryShard.isSameAllocation(task.sourceShardRouting)) {
return ValidationResult.SOURCE_INVALID;
}
}
RoutingNodes.RoutingNodeIterator routingNodeIterator =
currentState.getRoutingNodes().routingNodeIter(task.getShardRouting().currentNodeId());
if (routingNodeIterator != null) {
for (ShardRouting maybe : routingNodeIterator) {
if (task.getShardRouting().isSameAllocation(maybe)) {
return true;
return ValidationResult.VALID;
}
}
}
return false;
return ValidationResult.SHARD_MISSING;
}
@Override
@ -291,9 +328,9 @@ public class ShardStateAction extends AbstractComponent {
}
}
public void shardStarted(final ShardRouting shardRouting, String indexUUID, final String message, Listener listener) {
public void shardStarted(final ShardRouting shardRouting, final String message, Listener listener) {
ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger, threadPool.getThreadContext());
ShardRoutingEntry shardRoutingEntry = new ShardRoutingEntry(shardRouting, indexUUID, message, null);
ShardRoutingEntry shardRoutingEntry = new ShardRoutingEntry(shardRouting, shardRouting, message, null);
sendShardAction(SHARD_STARTED_ACTION_NAME, observer, shardRoutingEntry, listener);
}
@ -360,16 +397,16 @@ public class ShardStateAction extends AbstractComponent {
public static class ShardRoutingEntry extends TransportRequest {
ShardRouting shardRouting;
String indexUUID = IndexMetaData.INDEX_UUID_NA_VALUE;
ShardRouting sourceShardRouting;
String message;
Throwable failure;
public ShardRoutingEntry() {
}
ShardRoutingEntry(ShardRouting shardRouting, String indexUUID, String message, @Nullable Throwable failure) {
ShardRoutingEntry(ShardRouting shardRouting, ShardRouting sourceShardRouting, String message, @Nullable Throwable failure) {
this.shardRouting = shardRouting;
this.indexUUID = indexUUID;
this.sourceShardRouting = sourceShardRouting;
this.message = message;
this.failure = failure;
}
@ -382,7 +419,7 @@ public class ShardStateAction extends AbstractComponent {
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
shardRouting = readShardRoutingEntry(in);
indexUUID = in.readString();
sourceShardRouting = readShardRoutingEntry(in);
message = in.readString();
failure = in.readThrowable();
}
@ -391,18 +428,25 @@ public class ShardStateAction extends AbstractComponent {
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
shardRouting.writeTo(out);
out.writeString(indexUUID);
sourceShardRouting.writeTo(out);
out.writeString(message);
out.writeThrowable(failure);
}
@Override
public String toString() {
return "" + shardRouting + ", indexUUID [" + indexUUID + "], message [" + message + "], failure [" + ExceptionsHelper.detailedMessage(failure) + "]";
return String.format(
Locale.ROOT,
"failed shard [%s], source shard [%s], message [%s], failure [%s]",
shardRouting,
sourceShardRouting,
message,
ExceptionsHelper.detailedMessage(failure));
}
}
public interface Listener {
default void onSuccess() {
}
@ -423,6 +467,20 @@ public class ShardStateAction extends AbstractComponent {
*/
default void onFailure(final Throwable t) {
}
}
public static class NoLongerPrimaryShardException extends ElasticsearchException {
public NoLongerPrimaryShardException(ShardId shardId, String msg) {
super(msg);
setShard(shardId);
}
public NoLongerPrimaryShardException(StreamInput in) throws IOException {
super(in);
}
}
}

View File

@ -43,6 +43,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Predicate;
/**
@ -137,6 +138,13 @@ public class RoutingTable implements Iterable<IndexRoutingTable>, Diffable<Routi
return shard;
}
public IndexShardRoutingTable shardRoutingTableOrNull(ShardId shardId) {
return Optional
.ofNullable(index(shardId.getIndexName()))
.flatMap(irt -> Optional.ofNullable(irt.shard(shardId.getId())))
.orElse(null);
}
public RoutingTable validateRaiseException(MetaData metaData) throws RoutingValidationException {
RoutingTableValidation validation = validate(metaData);
if (!validation.valid()) {

View File

@ -306,7 +306,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
try {
indicesService.createIndex(nodeServicesProvider, indexMetaData, buildInIndexListener);
} catch (Throwable e) {
sendFailShard(shard, indexMetaData.getIndexUUID(), "failed to create index", e);
sendFailShard(shard, "failed to create index", e);
}
}
}
@ -371,7 +371,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
// so this failure typically means wrong node level configuration or something similar
for (IndexShard indexShard : indexService) {
ShardRouting shardRouting = indexShard.routingEntry();
failAndRemoveShard(shardRouting, indexService.indexUUID(), indexService, true, "failed to update mappings", t);
failAndRemoveShard(shardRouting, indexService, true, "failed to update mappings", t);
}
}
}
@ -434,12 +434,13 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
if (!indexService.hasShard(shardId) && shardRouting.started()) {
if (failedShards.containsKey(shardRouting.shardId())) {
if (nodes.masterNode() != null) {
shardStateAction.resendShardFailed(shardRouting, indexMetaData.getIndexUUID(),
"master " + nodes.masterNode() + " marked shard as started, but shard has previous failed. resending shard failure.", null, SHARD_STATE_ACTION_LISTENER);
String message = "master " + nodes.masterNode() + " marked shard as started, but shard has previous failed. resending shard failure";
logger.trace("[{}] re-sending failed shard [{}], reason [{}]", shardRouting.shardId(), shardRouting, message);
shardStateAction.shardFailed(shardRouting, shardRouting, message, null, SHARD_STATE_ACTION_LISTENER);
}
} else {
// the master thinks we are started, but we don't have this shard at all, mark it as failed
sendFailShard(shardRouting, indexMetaData.getIndexUUID(), "master [" + nodes.masterNode() + "] marked shard as started, but shard has not been created, mark shard as failed", null);
sendFailShard(shardRouting, "master [" + nodes.masterNode() + "] marked shard as started, but shard has not been created, mark shard as failed", null);
}
continue;
}
@ -474,7 +475,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
try {
indexShard.updateRoutingEntry(shardRouting, event.state().blocks().disableStatePersistence() == false);
} catch (Throwable e) {
failAndRemoveShard(shardRouting, indexService.indexUUID(), indexService, true, "failed updating shard routing entry", e);
failAndRemoveShard(shardRouting, indexService, true, "failed updating shard routing entry", e);
}
}
}
@ -533,7 +534,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
indexShard.shardId(), indexShard.state(), nodes.masterNode());
}
if (nodes.masterNode() != null) {
shardStateAction.shardStarted(shardRouting, indexMetaData.getIndexUUID(),
shardStateAction.shardStarted(shardRouting,
"master " + nodes.masterNode() + " marked shard as initializing, but shard state is [" + indexShard.state() + "], mark shard as started",
SHARD_STATE_ACTION_LISTENER);
}
@ -560,8 +561,9 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
if (!indexService.hasShard(shardId)) {
if (failedShards.containsKey(shardRouting.shardId())) {
if (nodes.masterNode() != null) {
shardStateAction.resendShardFailed(shardRouting, indexMetaData.getIndexUUID(),
"master " + nodes.masterNode() + " marked shard as initializing, but shard is marked as failed, resend shard failure", null, SHARD_STATE_ACTION_LISTENER);
String message = "master " + nodes.masterNode() + " marked shard as initializing, but shard is marked as failed, resend shard failure";
logger.trace("[{}] re-sending failed shard [{}], reason [{}]", shardRouting.shardId(), shardRouting, message);
shardStateAction.shardFailed(shardRouting, shardRouting, message, null, SHARD_STATE_ACTION_LISTENER);
}
return;
}
@ -574,7 +576,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
} catch (IndexShardAlreadyExistsException e) {
// ignore this, the method call can happen several times
} catch (Throwable e) {
failAndRemoveShard(shardRouting, indexService.indexUUID(), indexService, true, "failed to create shard", e);
failAndRemoveShard(shardRouting, indexService, true, "failed to create shard", e);
return;
}
}
@ -616,7 +618,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
threadPool.generic().execute(() -> {
try {
if (indexShard.recoverFromStore(nodes.localNode())) {
shardStateAction.shardStarted(shardRouting, indexMetaData.getIndexUUID(), "after recovery from store", SHARD_STATE_ACTION_LISTENER);
shardStateAction.shardStarted(shardRouting, "after recovery from store", SHARD_STATE_ACTION_LISTENER);
}
} catch (Throwable t) {
handleRecoveryFailure(indexService, shardRouting, true, t);
@ -634,7 +636,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
final IndexShardRepository indexShardRepository = repositoriesService.indexShardRepository(restoreSource.snapshotId().getRepository());
if (indexShard.restoreFromRepository(indexShardRepository, nodes.localNode())) {
restoreService.indexShardRestoreCompleted(restoreSource.snapshotId(), sId);
shardStateAction.shardStarted(shardRouting, indexMetaData.getIndexUUID(), "after recovery from repository", SHARD_STATE_ACTION_LISTENER);
shardStateAction.shardStarted(shardRouting, "after recovery from repository", SHARD_STATE_ACTION_LISTENER);
}
} catch (Throwable first) {
try {
@ -704,7 +706,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
@Override
public void onRecoveryDone(RecoveryState state) {
shardStateAction.shardStarted(shardRouting, indexMetaData.getIndexUUID(), "after recovery (replica) from node [" + state.getSourceNode() + "]", SHARD_STATE_ACTION_LISTENER);
shardStateAction.shardStarted(shardRouting, "after recovery (replica) from node [" + state.getSourceNode() + "]", SHARD_STATE_ACTION_LISTENER);
}
@Override
@ -715,7 +717,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
private void handleRecoveryFailure(IndexService indexService, ShardRouting shardRouting, boolean sendShardFailure, Throwable failure) {
synchronized (mutex) {
failAndRemoveShard(shardRouting, indexService.indexUUID(), indexService, sendShardFailure, "failed recovery", failure);
failAndRemoveShard(shardRouting, indexService, sendShardFailure, "failed recovery", failure);
}
}
@ -736,7 +738,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
}
private void failAndRemoveShard(ShardRouting shardRouting, String indexUUID, @Nullable IndexService indexService, boolean sendShardFailure, String message, @Nullable Throwable failure) {
private void failAndRemoveShard(ShardRouting shardRouting, @Nullable IndexService indexService, boolean sendShardFailure, String message, @Nullable Throwable failure) {
if (indexService != null && indexService.hasShard(shardRouting.getId())) {
// if the indexService is null we can't remove the shard, that's fine since we might have a failure
// when the index is remove and then we already removed the index service for that shard...
@ -749,15 +751,15 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
}
}
if (sendShardFailure) {
sendFailShard(shardRouting, indexUUID, message, failure);
sendFailShard(shardRouting, message, failure);
}
}
private void sendFailShard(ShardRouting shardRouting, String indexUUID, String message, @Nullable Throwable failure) {
private void sendFailShard(ShardRouting shardRouting, String message, @Nullable Throwable failure) {
try {
logger.warn("[{}] marking and sending shard failed due to [{}]", failure, shardRouting.shardId(), message);
failedShards.put(shardRouting.shardId(), shardRouting);
shardStateAction.shardFailed(shardRouting, indexUUID, message, failure, SHARD_STATE_ACTION_LISTENER);
shardStateAction.shardFailed(shardRouting, shardRouting, message, failure, SHARD_STATE_ACTION_LISTENER);
} catch (Throwable e1) {
logger.warn("[{}][{}] failed to mark shard as failed (because of [{}])", e1, shardRouting.getIndexName(), shardRouting.getId(), message);
}
@ -770,7 +772,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
final ShardRouting shardRouting = shardFailure.routing;
threadPool.generic().execute(() -> {
synchronized (mutex) {
failAndRemoveShard(shardRouting, shardFailure.indexUUID, indexService, true, "shard failure, reason [" + shardFailure.reason + "]", shardFailure.cause);
failAndRemoveShard(shardRouting, indexService, true, "shard failure, reason [" + shardFailure.reason + "]", shardFailure.cause);
}
});
}

View File

@ -24,6 +24,7 @@ import org.elasticsearch.action.TimestampParsingException;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.client.AbstractClientHeadersTestCase;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.metadata.SnapshotId;
import org.elasticsearch.cluster.node.DiscoveryNode;
@ -591,7 +592,14 @@ public class ExceptionSerializationTests extends ESTestCase {
assertEquals("foo", e.getHeader("foo").get(0));
assertEquals("bar", e.getHeader("foo").get(1));
assertSame(status, e.status());
}
public void testNoLongerPrimaryShardException() throws IOException {
ShardId shardId = new ShardId(new Index(randomAsciiOfLength(4), randomAsciiOfLength(4)), randomIntBetween(0, Integer.MAX_VALUE));
String msg = randomAsciiOfLength(4);
ShardStateAction.NoLongerPrimaryShardException ex = serialize(new ShardStateAction.NoLongerPrimaryShardException(shardId, msg));
assertEquals(shardId, ex.getShardId());
assertEquals(msg, ex.getMessage());
}
public static class UnknownHeaderException extends ElasticsearchException {
@ -776,6 +784,7 @@ public class ExceptionSerializationTests extends ESTestCase {
ids.put(139, null);
ids.put(140, org.elasticsearch.discovery.Discovery.FailedToCommitClusterStateException.class);
ids.put(141, org.elasticsearch.index.query.QueryShardException.class);
ids.put(142, ShardStateAction.NoLongerPrimaryShardException.class);
Map<Class<? extends ElasticsearchException>, Integer> reverse = new HashMap<>();
for (Map.Entry<Integer, Class<? extends ElasticsearchException>> entry : ids.entrySet()) {

View File

@ -550,7 +550,7 @@ public class TransportReplicationActionTests extends ESTestCase {
}
}
runReplicateTest(shardRoutingTable, assignedReplicas, totalShards);
runReplicateTest(state, shardRoutingTable, assignedReplicas, totalShards);
}
public void testReplicationWithShadowIndex() throws ExecutionException, InterruptedException {
@ -581,18 +581,22 @@ public class TransportReplicationActionTests extends ESTestCase {
totalShards++;
}
}
runReplicateTest(shardRoutingTable, assignedReplicas, totalShards);
runReplicateTest(state, shardRoutingTable, assignedReplicas, totalShards);
}
protected void runReplicateTest(IndexShardRoutingTable shardRoutingTable, int assignedReplicas, int totalShards) throws InterruptedException, ExecutionException {
protected void runReplicateTest(ClusterState state, IndexShardRoutingTable shardRoutingTable, int assignedReplicas, int totalShards) throws InterruptedException, ExecutionException {
final ShardIterator shardIt = shardRoutingTable.shardsIt();
final ShardId shardId = shardIt.shardId();
final Request request = new Request(shardId);
final PlainActionFuture<Response> listener = new PlainActionFuture<>();
logger.debug("expecting [{}] assigned replicas, [{}] total shards. using state: \n{}", assignedReplicas, totalShards, clusterService.state().prettyPrint());
Releasable reference = getOrCreateIndexShardOperationsCounter();
TransportReplicationAction.IndexShardReference reference = getOrCreateIndexShardOperationsCounter();
ShardRouting primaryShard = state.getRoutingTable().shardRoutingTable(shardId).primaryShard();
indexShardRouting.set(primaryShard);
assertIndexShardCounter(2);
// TODO: set a default timeout
TransportReplicationAction<Request, Request, Response>.ReplicationPhase replicationPhase =
@ -755,6 +759,8 @@ public class TransportReplicationActionTests extends ESTestCase {
// one replica to make sure replication is attempted
clusterService.setState(state(index, true,
ShardRoutingState.STARTED, ShardRoutingState.STARTED));
ShardRouting primaryShard = clusterService.state().routingTable().shardRoutingTable(shardId).primaryShard();
indexShardRouting.set(primaryShard);
logger.debug("--> using initial state:\n{}", clusterService.state().prettyPrint());
Request request = new Request(shardId).timeout("100ms");
PlainActionFuture<Response> listener = new PlainActionFuture<>();

View File

@ -19,6 +19,7 @@
package org.elasticsearch.cluster.action.shard;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.apache.lucene.index.CorruptIndexException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
@ -28,6 +29,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardIterator;
@ -38,6 +40,9 @@ import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.DiscoveryService;
import org.elasticsearch.index.Index;
import org.elasticsearch.test.ESAllocationTestCase;
import org.junit.Before;
@ -45,12 +50,15 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.not;
public class ShardFailedClusterStateTaskExecutorTests extends ESAllocationTestCase {
@ -119,9 +127,25 @@ public class ShardFailedClusterStateTaskExecutorTests extends ESAllocationTestCa
tasks.addAll(failingTasks);
tasks.addAll(nonExistentTasks);
ClusterStateTaskExecutor.BatchResult<ShardStateAction.ShardRoutingEntry> result = failingExecutor.execute(currentState, tasks);
Map<ShardStateAction.ShardRoutingEntry, Boolean> taskResultMap =
failingTasks.stream().collect(Collectors.toMap(Function.identity(), task -> false));
taskResultMap.putAll(nonExistentTasks.stream().collect(Collectors.toMap(Function.identity(), task -> true)));
Map<ShardStateAction.ShardRoutingEntry, ClusterStateTaskExecutor.TaskResult> taskResultMap =
failingTasks.stream().collect(Collectors.toMap(Function.identity(), task -> ClusterStateTaskExecutor.TaskResult.failure(new RuntimeException("simulated applyFailedShards failure"))));
taskResultMap.putAll(nonExistentTasks.stream().collect(Collectors.toMap(Function.identity(), task -> ClusterStateTaskExecutor.TaskResult.success())));
assertTaskResults(taskResultMap, result, currentState, false);
}
public void testIllegalShardFailureRequests() throws Exception {
String reason = "test illegal shard failure requests";
ClusterState currentState = createClusterStateWithStartedShards(reason);
List<ShardStateAction.ShardRoutingEntry> failingTasks = createExistingShards(currentState, reason);
List<ShardStateAction.ShardRoutingEntry> tasks = new ArrayList<>();
for (ShardStateAction.ShardRoutingEntry failingTask : failingTasks) {
tasks.add(new ShardStateAction.ShardRoutingEntry(failingTask.getShardRouting(), randomInvalidSourceShard(currentState, failingTask.getShardRouting()), failingTask.message, failingTask.failure));
}
Map<ShardStateAction.ShardRoutingEntry, ClusterStateTaskExecutor.TaskResult> taskResultMap =
tasks.stream().collect(Collectors.toMap(
Function.identity(),
task -> ClusterStateTaskExecutor.TaskResult.failure(new ShardStateAction.NoLongerPrimaryShardException(task.getShardRouting().shardId(), "source shard [" + task.sourceShardRouting + "] is neither the local allocation nor the primary allocation"))));
ClusterStateTaskExecutor.BatchResult<ShardStateAction.ShardRoutingEntry> result = executor.execute(currentState, tasks);
assertTaskResults(taskResultMap, result, currentState, false);
}
@ -156,17 +180,22 @@ public class ShardFailedClusterStateTaskExecutorTests extends ESAllocationTestCa
for (int i = 0; i < numberOfTasks; i++) {
shardsToFail.add(randomFrom(failures));
}
return toTasks(shardsToFail, indexUUID, reason);
return toTasks(currentState, shardsToFail, indexUUID, reason);
}
private List<ShardStateAction.ShardRoutingEntry> createNonExistentShards(ClusterState currentState, String reason) {
// add shards from a non-existent index
MetaData nonExistentMetaData =
MetaData.builder()
.put(IndexMetaData.builder("non-existent").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(numberOfReplicas))
.build();
RoutingTable routingTable = RoutingTable.builder().addAsNew(nonExistentMetaData.index("non-existent")).build();
String nonExistentIndexUUID = nonExistentMetaData.index("non-existent").getIndexUUID();
String nonExistentIndexUUID = "non-existent";
Index index = new Index("non-existent", nonExistentIndexUUID);
List<String> nodeIds = new ArrayList<>();
for (ObjectCursor<String> nodeId : currentState.nodes().getNodes().keys()) {
nodeIds.add(nodeId.toString());
}
List<ShardRouting> nonExistentShards = new ArrayList<>();
nonExistentShards.add(nonExistentShardRouting(index, nodeIds, true));
for (int i = 0; i < numberOfReplicas; i++) {
nonExistentShards.add(nonExistentShardRouting(index, nodeIds, false));
}
List<ShardStateAction.ShardRoutingEntry> existingShards = createExistingShards(currentState, reason);
List<ShardStateAction.ShardRoutingEntry> shardsWithMismatchedAllocationIds = new ArrayList<>();
@ -174,28 +203,32 @@ public class ShardFailedClusterStateTaskExecutorTests extends ESAllocationTestCa
ShardRouting sr = existingShard.getShardRouting();
ShardRouting nonExistentShardRouting =
TestShardRouting.newShardRouting(sr.index(), sr.id(), sr.currentNodeId(), sr.relocatingNodeId(), sr.restoreSource(), sr.primary(), sr.state(), sr.version());
shardsWithMismatchedAllocationIds.add(new ShardStateAction.ShardRoutingEntry(nonExistentShardRouting, existingShard.indexUUID, existingShard.message, existingShard.failure));
shardsWithMismatchedAllocationIds.add(new ShardStateAction.ShardRoutingEntry(nonExistentShardRouting, nonExistentShardRouting, existingShard.message, existingShard.failure));
}
List<ShardStateAction.ShardRoutingEntry> tasks = new ArrayList<>();
tasks.addAll(toTasks(routingTable.allShards(), nonExistentIndexUUID, reason));
nonExistentShards.forEach(shard -> tasks.add(new ShardStateAction.ShardRoutingEntry(shard, shard, reason, new CorruptIndexException("simulated", nonExistentIndexUUID))));
tasks.addAll(shardsWithMismatchedAllocationIds);
return tasks;
}
private ShardRouting nonExistentShardRouting(Index index, List<String> nodeIds, boolean primary) {
return TestShardRouting.newShardRouting(index, 0, randomFrom(nodeIds), primary, randomFrom(ShardRoutingState.INITIALIZING, ShardRoutingState.RELOCATING, ShardRoutingState.STARTED), randomIntBetween(1, 8));
}
private static void assertTasksSuccessful(
List<ShardStateAction.ShardRoutingEntry> tasks,
ClusterStateTaskExecutor.BatchResult<ShardStateAction.ShardRoutingEntry> result,
ClusterState clusterState,
boolean clusterStateChanged
) {
Map<ShardStateAction.ShardRoutingEntry, Boolean> taskResultMap =
tasks.stream().collect(Collectors.toMap(Function.identity(), task -> true));
Map<ShardStateAction.ShardRoutingEntry, ClusterStateTaskExecutor.TaskResult> taskResultMap =
tasks.stream().collect(Collectors.toMap(Function.identity(), task -> ClusterStateTaskExecutor.TaskResult.success()));
assertTaskResults(taskResultMap, result, clusterState, clusterStateChanged);
}
private static void assertTaskResults(
Map<ShardStateAction.ShardRoutingEntry, Boolean> taskResultMap,
Map<ShardStateAction.ShardRoutingEntry, ClusterStateTaskExecutor.TaskResult> taskResultMap,
ClusterStateTaskExecutor.BatchResult<ShardStateAction.ShardRoutingEntry> result,
ClusterState clusterState,
boolean clusterStateChanged
@ -203,24 +236,29 @@ public class ShardFailedClusterStateTaskExecutorTests extends ESAllocationTestCa
// there should be as many task results as tasks
assertEquals(taskResultMap.size(), result.executionResults.size());
for (Map.Entry<ShardStateAction.ShardRoutingEntry, Boolean> entry : taskResultMap.entrySet()) {
for (Map.Entry<ShardStateAction.ShardRoutingEntry, ClusterStateTaskExecutor.TaskResult> entry : taskResultMap.entrySet()) {
// every task should have a corresponding task result
assertTrue(result.executionResults.containsKey(entry.getKey()));
// the task results are as expected
assertEquals(entry.getValue(), result.executionResults.get(entry.getKey()).isSuccess());
assertEquals(entry.getValue().isSuccess(), result.executionResults.get(entry.getKey()).isSuccess());
}
// every shard that we requested to be successfully failed is
// gone
List<ShardRouting> shards = clusterState.getRoutingTable().allShards();
for (Map.Entry<ShardStateAction.ShardRoutingEntry, Boolean> entry : taskResultMap.entrySet()) {
if (entry.getValue()) {
for (Map.Entry<ShardStateAction.ShardRoutingEntry, ClusterStateTaskExecutor.TaskResult> entry : taskResultMap.entrySet()) {
if (entry.getValue().isSuccess()) {
// the shard was successfully failed and so should not
// be in the routing table
for (ShardRouting shard : shards) {
if (entry.getKey().getShardRouting().allocationId() != null) {
assertThat(shard.allocationId(), not(equalTo(entry.getKey().getShardRouting().allocationId())));
}
}
} else {
// check we saw the expected failure
ClusterStateTaskExecutor.TaskResult actualResult = result.executionResults.get(entry.getKey());
assertThat(actualResult.getFailure(), instanceOf(entry.getValue().getFailure().getClass()));
assertThat(actualResult.getFailure().getMessage(), equalTo(entry.getValue().getFailure().getMessage()));
}
}
@ -231,11 +269,49 @@ public class ShardFailedClusterStateTaskExecutorTests extends ESAllocationTestCa
}
}
private static List<ShardStateAction.ShardRoutingEntry> toTasks(List<ShardRouting> shards, String indexUUID, String message) {
private static List<ShardStateAction.ShardRoutingEntry> toTasks(ClusterState currentState, List<ShardRouting> shards, String indexUUID, String message) {
return shards
.stream()
.map(shard -> new ShardStateAction.ShardRoutingEntry(shard, indexUUID, message, new CorruptIndexException("simulated", indexUUID)))
.map(shard -> new ShardStateAction.ShardRoutingEntry(shard, randomValidSourceShard(currentState, shard), message, new CorruptIndexException("simulated", indexUUID)))
.collect(Collectors.toList());
}
private static ShardRouting randomValidSourceShard(ClusterState currentState, ShardRouting shardRouting) {
// for the request node ID to be valid, either the request is
// from the node the shard is assigned to, or the request is
// from the node holding the primary shard
if (randomBoolean()) {
// request from local node
return shardRouting;
} else {
// request from primary node unless in the case of
// non-existent shards there is not one and we fallback to
// the local node
ShardRouting primaryNodeId = primaryShard(currentState, shardRouting);
return primaryNodeId != null ? primaryNodeId : shardRouting;
}
}
private static ShardRouting randomInvalidSourceShard(ClusterState currentState, ShardRouting shardRouting) {
ShardRouting primaryShard = primaryShard(currentState, shardRouting);
Set<ShardRouting> shards =
currentState
.routingTable()
.allShards()
.stream()
.filter(shard -> !shard.isSameAllocation(shardRouting))
.filter(shard -> !shard.isSameAllocation(primaryShard))
.collect(Collectors.toSet());
if (!shards.isEmpty()) {
return randomSubsetOf(1, shards.toArray(new ShardRouting[0])).get(0);
} else {
return
TestShardRouting.newShardRouting(shardRouting.index(), shardRouting.id(), DiscoveryService.generateNodeId(Settings.EMPTY), randomBoolean(), randomFrom(ShardRoutingState.values()), shardRouting.version());
}
}
private static ShardRouting primaryShard(ClusterState currentState, ShardRouting shardRouting) {
IndexShardRoutingTable indexShard = currentState.getRoutingTable().shardRoutingTableOrNull(shardRouting.shardId());
return indexShard == null ? null : indexShard.primaryShard();
}
}

View File

@ -30,7 +30,9 @@ import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.RoutingService;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.ShardsIterator;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.Discovery;
@ -128,13 +130,11 @@ public class ShardStateActionTests extends ESTestCase {
clusterService.setState(ClusterStateCreationUtils.stateWithActivePrimary(index, true, randomInt(5)));
String indexUUID = clusterService.state().metaData().index(index).getIndexUUID();
AtomicBoolean success = new AtomicBoolean();
CountDownLatch latch = new CountDownLatch(1);
ShardRouting shardRouting = getRandomShardRouting(index);
shardStateAction.shardFailed(shardRouting, indexUUID, "test", getSimulatedFailure(), new ShardStateAction.Listener() {
shardStateAction.shardFailed(shardRouting, shardRouting, "test", getSimulatedFailure(), new ShardStateAction.Listener() {
@Override
public void onSuccess() {
success.set(true);
@ -174,15 +174,14 @@ public class ShardStateActionTests extends ESTestCase {
noMasterBuilder.masterNodeId(null);
clusterService.setState(ClusterState.builder(clusterService.state()).nodes(noMasterBuilder));
String indexUUID = clusterService.state().metaData().index(index).getIndexUUID();
CountDownLatch latch = new CountDownLatch(1);
AtomicInteger retries = new AtomicInteger();
AtomicBoolean success = new AtomicBoolean();
setUpMasterRetryVerification(1, retries, latch, requestId -> {});
shardStateAction.shardFailed(getRandomShardRouting(index), indexUUID, "test", getSimulatedFailure(), new ShardStateAction.Listener() {
ShardRouting failedShard = getRandomShardRouting(index);
shardStateAction.shardFailed(failedShard, failedShard, "test", getSimulatedFailure(), new ShardStateAction.Listener() {
@Override
public void onSuccess() {
success.set(true);
@ -208,8 +207,6 @@ public class ShardStateActionTests extends ESTestCase {
clusterService.setState(ClusterStateCreationUtils.stateWithActivePrimary(index, true, randomInt(5)));
String indexUUID = clusterService.state().metaData().index(index).getIndexUUID();
CountDownLatch latch = new CountDownLatch(1);
AtomicInteger retries = new AtomicInteger();
AtomicBoolean success = new AtomicBoolean();
@ -232,7 +229,8 @@ public class ShardStateActionTests extends ESTestCase {
final int numberOfRetries = randomIntBetween(1, 256);
setUpMasterRetryVerification(numberOfRetries, retries, latch, retryLoop);
shardStateAction.shardFailed(getRandomShardRouting(index), indexUUID, "test", getSimulatedFailure(), new ShardStateAction.Listener() {
ShardRouting failedShard = getRandomShardRouting(index);
shardStateAction.shardFailed(failedShard, failedShard, "test", getSimulatedFailure(), new ShardStateAction.Listener() {
@Override
public void onSuccess() {
success.set(true);
@ -265,11 +263,10 @@ public class ShardStateActionTests extends ESTestCase {
clusterService.setState(ClusterStateCreationUtils.stateWithActivePrimary(index, true, randomInt(5)));
String indexUUID = clusterService.state().metaData().index(index).getIndexUUID();
AtomicBoolean failure = new AtomicBoolean();
shardStateAction.shardFailed(getRandomShardRouting(index), indexUUID, "test", getSimulatedFailure(), new ShardStateAction.Listener() {
ShardRouting failedShard = getRandomShardRouting(index);
shardStateAction.shardFailed(failedShard, failedShard, "test", getSimulatedFailure(), new ShardStateAction.Listener() {
@Override
public void onSuccess() {
failure.set(false);
@ -295,15 +292,13 @@ public class ShardStateActionTests extends ESTestCase {
clusterService.setState(ClusterStateCreationUtils.stateWithActivePrimary(index, true, randomInt(5)));
String indexUUID = clusterService.state().metaData().index(index).getIndexUUID();
AtomicBoolean success = new AtomicBoolean();
CountDownLatch latch = new CountDownLatch(1);
ShardRouting failedShard = getRandomShardRouting(index);
RoutingTable routingTable = RoutingTable.builder(clusterService.state().getRoutingTable()).remove(index).build();
clusterService.setState(ClusterState.builder(clusterService.state()).routingTable(routingTable));
shardStateAction.shardFailed(failedShard, indexUUID, "test", getSimulatedFailure(), new ShardStateAction.Listener() {
shardStateAction.shardFailed(failedShard, failedShard, "test", getSimulatedFailure(), new ShardStateAction.Listener() {
@Override
public void onSuccess() {
success.set(true);
@ -325,6 +320,44 @@ public class ShardStateActionTests extends ESTestCase {
assertTrue(success.get());
}
public void testNoLongerPrimaryShardException() throws InterruptedException {
final String index = "test";
clusterService.setState(ClusterStateCreationUtils.stateWithActivePrimary(index, true, randomInt(5)));
ShardRouting failedShard = getRandomShardRouting(index);
String nodeId = randomFrom(clusterService.state().nodes().nodes().keys().toArray(String.class));
AtomicReference<Throwable> failure = new AtomicReference<>();
CountDownLatch latch = new CountDownLatch(1);
ShardRouting sourceFailedShard = TestShardRouting.newShardRouting(failedShard.index(), failedShard.id(), nodeId, randomBoolean(), randomFrom(ShardRoutingState.values()), failedShard.version());
shardStateAction.shardFailed(failedShard, sourceFailedShard, "test", getSimulatedFailure(), new ShardStateAction.Listener() {
@Override
public void onSuccess() {
failure.set(null);
latch.countDown();
}
@Override
public void onFailure(Throwable t) {
failure.set(t);
latch.countDown();
}
});
ShardStateAction.NoLongerPrimaryShardException catastrophicError =
new ShardStateAction.NoLongerPrimaryShardException(failedShard.shardId(), "source shard [" + sourceFailedShard + " is neither the local allocation nor the primary allocation");
CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear();
transport.handleRemoteError(capturedRequests[0].requestId, catastrophicError);
latch.await();
assertNotNull(failure.get());
assertThat(failure.get(), instanceOf(ShardStateAction.NoLongerPrimaryShardException.class));
assertThat(failure.get().getMessage(), equalTo(catastrophicError.getMessage()));
}
private ShardRouting getRandomShardRouting(String index) {
IndexRoutingTable indexRoutingTable = clusterService.state().routingTable().index(index);
ShardsIterator shardsIterator = indexRoutingTable.randomAllActiveShardsIt();

View File

@ -56,7 +56,6 @@ import org.elasticsearch.discovery.zen.ping.ZenPing;
import org.elasticsearch.discovery.zen.ping.ZenPingService;
import org.elasticsearch.discovery.zen.ping.unicast.UnicastZenPing;
import org.elasticsearch.discovery.zen.publish.PublishClusterStateAction;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.store.IndicesStoreIntegrationIT;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
@ -905,7 +904,6 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
ShardRouting failedShard =
randomFrom(clusterService().state().getRoutingNodes().node(nonMasterNodeId).shardsWithState(ShardRoutingState.STARTED));
ShardStateAction service = internalCluster().getInstance(ShardStateAction.class, nonMasterNode);
String indexUUID = clusterService().state().metaData().index("test").getIndexUUID();
CountDownLatch latch = new CountDownLatch(1);
AtomicBoolean success = new AtomicBoolean();
@ -913,7 +911,7 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
NetworkPartition networkPartition = addRandomIsolation(isolatedNode);
networkPartition.startDisrupting();
service.shardFailed(failedShard, indexUUID, "simulated", new CorruptIndexException("simulated", (String) null), new ShardStateAction.Listener() {
service.shardFailed(failedShard, failedShard, "simulated", new CorruptIndexException("simulated", (String) null), new ShardStateAction.Listener() {
@Override
public void onSuccess() {
success.set(true);