Merge pull request #13341 from brwe/shard-not-available-broadcast-replication

fix exception handling for unavailable shards in broadcast replicatio…
This commit is contained in:
Britta Weber 2015-09-07 14:47:55 +02:00
commit c5cb559014
3 changed files with 11 additions and 4 deletions

View File

@ -21,6 +21,7 @@ package org.elasticsearch.action.support;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.action.UnavailableShardsException;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
import org.elasticsearch.index.shard.ShardNotFoundException;
@ -34,7 +35,8 @@ public class TransportActions {
if (actual instanceof ShardNotFoundException ||
actual instanceof IndexNotFoundException ||
actual instanceof IllegalIndexShardStateException ||
actual instanceof NoShardAvailableActionException) {
actual instanceof NoShardAvailableActionException ||
actual instanceof UnavailableShardsException) {
return true;
}
return false;

View File

@ -28,6 +28,7 @@ import org.elasticsearch.action.UnavailableShardsException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.action.support.broadcast.BroadcastRequest;
import org.elasticsearch.action.support.broadcast.BroadcastResponse;
import org.elasticsearch.action.support.broadcast.BroadcastShardOperationFailedException;
@ -90,7 +91,7 @@ public abstract class TransportBroadcastReplicationAction<Request extends Broadc
int totalNumCopies = clusterState.getMetaData().index(shardId.index().getName()).getNumberOfReplicas() + 1;
ShardResponse shardResponse = newShardResponse();
ActionWriteResponse.ShardInfo.Failure[] failures;
if (ExceptionsHelper.unwrap(e, UnavailableShardsException.class) != null) {
if (TransportActions.isShardNotAvailableException(e)) {
failures = new ActionWriteResponse.ShardInfo.Failure[0];
} else {
ActionWriteResponse.ShardInfo.Failure failure = new ActionWriteResponse.ShardInfo.Failure(shardId.index().name(), shardId.id(), null, e, ExceptionsHelper.status(e), true);

View File

@ -21,6 +21,7 @@ package org.elasticsearch.action.support.replication;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionWriteResponse;
import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.action.UnavailableShardsException;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.flush.FlushResponse;
@ -94,13 +95,16 @@ public class BroadcastReplicationTests extends ESTestCase {
@Test
public void testNotStartedPrimary() throws InterruptedException, ExecutionException, IOException {
final String index = "test";
final ShardId shardId = new ShardId(index, 0);
clusterService.setState(state(index, randomBoolean(),
randomBoolean() ? ShardRoutingState.INITIALIZING : ShardRoutingState.UNASSIGNED, ShardRoutingState.UNASSIGNED));
logger.debug("--> using initial state:\n{}", clusterService.state().prettyPrint());
Future<BroadcastResponse> response = (broadcastReplicationAction.execute(new BroadcastRequest().indices(index)));
for (Tuple<ShardId, ActionListener<ActionWriteResponse>> shardRequests : broadcastReplicationAction.capturedShardRequests) {
shardRequests.v2().onFailure(new UnavailableShardsException(shardId, "test exception expected"));
if (randomBoolean()) {
shardRequests.v2().onFailure(new NoShardAvailableActionException(shardRequests.v1()));
} else {
shardRequests.v2().onFailure(new UnavailableShardsException(shardRequests.v1(), "test exception"));
}
}
response.get();
logger.info("total shards: {}, ", response.get().getTotalShards());