fix exception handling for unavailable shards in broadcast replication action

Before #13068 refresh and flush ignored all exceptions that matched
TransportActions.isShardNotAvailableException(e) and this should not change.
In addition, refresh and flush which are based on broadcast replication
might now get UnavailableShardsException from TransportReplicationAction if a shard
is unavailable and this is not caught by TransportActions.isShardNotAvailableException(e).
This must be ignored as well.
This commit is contained in:
Britta Weber 2015-09-04 09:34:23 +02:00
parent c5b39ce85e
commit 41a2375c9e
3 changed files with 11 additions and 4 deletions

View File

@ -21,6 +21,7 @@ package org.elasticsearch.action.support;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.action.UnavailableShardsException;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
import org.elasticsearch.index.shard.ShardNotFoundException;
@ -34,7 +35,8 @@ public class TransportActions {
if (actual instanceof ShardNotFoundException ||
actual instanceof IndexNotFoundException ||
actual instanceof IllegalIndexShardStateException ||
actual instanceof NoShardAvailableActionException) {
actual instanceof NoShardAvailableActionException ||
actual instanceof UnavailableShardsException) {
return true;
}
return false;

View File

@ -28,6 +28,7 @@ import org.elasticsearch.action.UnavailableShardsException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.action.support.broadcast.BroadcastRequest;
import org.elasticsearch.action.support.broadcast.BroadcastResponse;
import org.elasticsearch.action.support.broadcast.BroadcastShardOperationFailedException;
@ -90,7 +91,7 @@ public abstract class TransportBroadcastReplicationAction<Request extends Broadc
int totalNumCopies = clusterState.getMetaData().index(shardId.index().getName()).getNumberOfReplicas() + 1;
ShardResponse shardResponse = newShardResponse();
ActionWriteResponse.ShardInfo.Failure[] failures;
if (ExceptionsHelper.unwrap(e, UnavailableShardsException.class) != null) {
if (TransportActions.isShardNotAvailableException(e)) {
failures = new ActionWriteResponse.ShardInfo.Failure[0];
} else {
ActionWriteResponse.ShardInfo.Failure failure = new ActionWriteResponse.ShardInfo.Failure(shardId.index().name(), shardId.id(), null, e, ExceptionsHelper.status(e), true);

View File

@ -21,6 +21,7 @@ package org.elasticsearch.action.support.replication;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionWriteResponse;
import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.action.UnavailableShardsException;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.flush.FlushResponse;
@ -103,13 +104,16 @@ public class BroadcastReplicationTests extends ESTestCase {
@Test
public void testNotStartedPrimary() throws InterruptedException, ExecutionException, IOException {
final String index = "test";
final ShardId shardId = new ShardId(index, 0);
clusterService.setState(state(index, randomBoolean(),
randomBoolean() ? ShardRoutingState.INITIALIZING : ShardRoutingState.UNASSIGNED, ShardRoutingState.UNASSIGNED));
logger.debug("--> using initial state:\n{}", clusterService.state().prettyPrint());
Future<BroadcastResponse> response = (broadcastReplicationAction.execute(new BroadcastRequest().indices(index)));
for (Tuple<ShardId, ActionListener<ActionWriteResponse>> shardRequests : broadcastReplicationAction.capturedShardRequests) {
shardRequests.v2().onFailure(new UnavailableShardsException(shardId, "test exception expected"));
if (randomBoolean()) {
shardRequests.v2().onFailure(new NoShardAvailableActionException(shardRequests.v1()));
} else {
shardRequests.v2().onFailure(new UnavailableShardsException(shardRequests.v1(), "test exception"));
}
}
response.get();
logger.info("total shards: {}, ", response.get().getTotalShards());