From ec8b7c3e233703b7eb4908fae74c3c38c0cd9d82 Mon Sep 17 00:00:00 2001 From: Shay Banon Date: Fri, 6 Jan 2012 23:38:41 +0200 Subject: [PATCH] No master (startup / minimum_master_node) / not recovered blocks should cause proper failures on operations, closes #1589. --- .../action/UnavailableShardsException.java | 9 +- .../TransportBroadcastPingAction.java | 14 +- .../TransportIndexReplicationPingAction.java | 13 ++ .../TransportReplicationPingAction.java | 13 ++ .../TransportShardReplicationPingAction.java | 12 ++ .../single/TransportSinglePingAction.java | 14 +- .../analyze/TransportAnalyzeAction.java | 21 ++- .../TransportClearIndicesCacheAction.java | 15 ++- .../indices/flush/TransportFlushAction.java | 14 +- .../TransportGatewaySnapshotAction.java | 15 ++- .../optimize/TransportOptimizeAction.java | 14 +- .../refresh/TransportRefreshAction.java | 14 +- .../TransportIndicesSegmentsAction.java | 14 +- .../stats/TransportIndicesStatsAction.java | 15 ++- .../status/TransportIndicesStatusAction.java | 16 ++- .../query/TransportValidateQueryAction.java | 14 +- .../action/bulk/TransportBulkAction.java | 5 + .../action/bulk/TransportShardBulkAction.java | 10 +- .../action/count/TransportCountAction.java | 14 +- .../action/delete/TransportDeleteAction.java | 30 +++-- .../index/TransportIndexDeleteAction.java | 10 +- .../index/TransportShardDeleteAction.java | 10 +- .../TransportDeleteByQueryAction.java | 12 +- .../TransportIndexDeleteByQueryAction.java | 10 +- .../TransportShardDeleteByQueryAction.java | 10 +- .../action/get/TransportGetAction.java | 22 ++-- .../action/get/TransportMultiGetAction.java | 5 + .../get/TransportShardMultiGetAction.java | 18 ++- .../action/index/TransportIndexAction.java | 19 ++- .../percolate/TransportPercolateAction.java | 14 +- .../TransportBroadcastOperationAction.java | 23 ++-- ...nsportIndexReplicationOperationAction.java | 16 ++- ...portIndicesReplicationOperationAction.java | 17 ++- ...nsportShardReplicationOperationAction.java | 72 +++++++--- .../TransportSingleCustomOperationAction.java | 15 ++- ...ransportInstanceSingleOperationAction.java | 61 +++++---- .../TransportShardSingleOperationAction.java | 23 +++- .../action/update/TransportUpdateAction.java | 10 +- .../cluster/MinimumMasterNodesTests.java | 2 + .../cluster/NoMasterNodeTests.java | 123 ++++++++++++++++++ 40 files changed, 627 insertions(+), 151 deletions(-) create mode 100644 src/test/java/org/elasticsearch/test/integration/cluster/NoMasterNodeTests.java diff --git a/src/main/java/org/elasticsearch/action/UnavailableShardsException.java b/src/main/java/org/elasticsearch/action/UnavailableShardsException.java index 0ff6d95c3d3..30d3811dd04 100644 --- a/src/main/java/org/elasticsearch/action/UnavailableShardsException.java +++ b/src/main/java/org/elasticsearch/action/UnavailableShardsException.java @@ -20,14 +20,16 @@ package org.elasticsearch.action; import org.elasticsearch.ElasticSearchException; +import org.elasticsearch.common.Nullable; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.rest.RestStatus; /** * */ public class UnavailableShardsException extends ElasticSearchException { - public UnavailableShardsException(ShardId shardId, String message) { + public UnavailableShardsException(@Nullable ShardId shardId, String message) { super(buildMessage(shardId, message)); } @@ -37,4 +39,9 @@ public class UnavailableShardsException extends ElasticSearchException { } return "[" + shardId.index().name() + "][" + shardId.id() + "] " + message; } + + @Override + public RestStatus status() { + return RestStatus.SERVICE_UNAVAILABLE; + } } \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/action/admin/cluster/ping/broadcast/TransportBroadcastPingAction.java b/src/main/java/org/elasticsearch/action/admin/cluster/ping/broadcast/TransportBroadcastPingAction.java index ec3eb79e38b..9f869cd295b 100644 --- a/src/main/java/org/elasticsearch/action/admin/cluster/ping/broadcast/TransportBroadcastPingAction.java +++ b/src/main/java/org/elasticsearch/action/admin/cluster/ping/broadcast/TransportBroadcastPingAction.java @@ -27,6 +27,8 @@ import org.elasticsearch.action.support.broadcast.BroadcastShardOperationFailedE import org.elasticsearch.action.support.broadcast.TransportBroadcastOperationAction; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.inject.Inject; @@ -65,10 +67,20 @@ public class TransportBroadcastPingAction extends TransportBroadcastOperationAct } @Override - protected GroupShardsIterator shards(BroadcastPingRequest request, String[] concreteIndices, ClusterState clusterState) { + protected GroupShardsIterator shards(ClusterState clusterState, BroadcastPingRequest request, String[] concreteIndices) { return clusterService.operationRouting().searchShards(clusterState, request.indices(), concreteIndices, request.queryHint(), null, null); } + @Override + protected ClusterBlockException checkGlobalBlock(ClusterState state, BroadcastPingRequest request) { + return state.blocks().globalBlockedException(ClusterBlockLevel.READ); + } + + @Override + protected ClusterBlockException checkRequestBlock(ClusterState state, BroadcastPingRequest request, String[] concreteIndices) { + return null; + } + @Override protected BroadcastPingResponse newResponse(BroadcastPingRequest request, AtomicReferenceArray shardsResponses, ClusterState clusterState) { int successfulShards = 0; diff --git a/src/main/java/org/elasticsearch/action/admin/cluster/ping/replication/TransportIndexReplicationPingAction.java b/src/main/java/org/elasticsearch/action/admin/cluster/ping/replication/TransportIndexReplicationPingAction.java index 65d0c903f53..267bffe3271 100644 --- a/src/main/java/org/elasticsearch/action/admin/cluster/ping/replication/TransportIndexReplicationPingAction.java +++ b/src/main/java/org/elasticsearch/action/admin/cluster/ping/replication/TransportIndexReplicationPingAction.java @@ -21,6 +21,9 @@ package org.elasticsearch.action.admin.cluster.ping.replication; import org.elasticsearch.action.support.replication.TransportIndexReplicationOperationAction; import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.common.inject.Inject; @@ -85,4 +88,14 @@ public class TransportIndexReplicationPingAction extends TransportIndexReplicati protected ShardReplicationPingRequest newShardRequestInstance(IndexReplicationPingRequest indexRequest, int shardId) { return new ShardReplicationPingRequest(indexRequest, shardId); } + + @Override + protected ClusterBlockException checkGlobalBlock(ClusterState state, IndexReplicationPingRequest request) { + return state.blocks().globalBlockedException(ClusterBlockLevel.READ); + } + + @Override + protected ClusterBlockException checkRequestBlock(ClusterState state, IndexReplicationPingRequest request) { + return state.blocks().indexBlockedException(ClusterBlockLevel.READ, request.index()); + } } \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/action/admin/cluster/ping/replication/TransportReplicationPingAction.java b/src/main/java/org/elasticsearch/action/admin/cluster/ping/replication/TransportReplicationPingAction.java index a29eb9c7621..53a0e15c031 100644 --- a/src/main/java/org/elasticsearch/action/admin/cluster/ping/replication/TransportReplicationPingAction.java +++ b/src/main/java/org/elasticsearch/action/admin/cluster/ping/replication/TransportReplicationPingAction.java @@ -22,6 +22,9 @@ package org.elasticsearch.action.admin.cluster.ping.replication; import org.elasticsearch.action.TransportActions; import org.elasticsearch.action.support.replication.TransportIndicesReplicationOperationAction; import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.threadpool.ThreadPool; @@ -70,4 +73,14 @@ public class TransportReplicationPingAction extends TransportIndicesReplicationO protected IndexReplicationPingRequest newIndexRequestInstance(ReplicationPingRequest request, String index, Set routing) { return new IndexReplicationPingRequest(request, index); } + + @Override + protected ClusterBlockException checkGlobalBlock(ClusterState state, ReplicationPingRequest replicationPingRequest) { + return state.blocks().globalBlockedException(ClusterBlockLevel.READ); + } + + @Override + protected ClusterBlockException checkRequestBlock(ClusterState state, ReplicationPingRequest replicationPingRequest, String[] concreteIndices) { + return state.blocks().indicesBlockedException(ClusterBlockLevel.WRITE, concreteIndices); + } } \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/action/admin/cluster/ping/replication/TransportShardReplicationPingAction.java b/src/main/java/org/elasticsearch/action/admin/cluster/ping/replication/TransportShardReplicationPingAction.java index b992491c60c..d6e53b0f432 100644 --- a/src/main/java/org/elasticsearch/action/admin/cluster/ping/replication/TransportShardReplicationPingAction.java +++ b/src/main/java/org/elasticsearch/action/admin/cluster/ping/replication/TransportShardReplicationPingAction.java @@ -23,6 +23,8 @@ import org.elasticsearch.action.support.replication.TransportShardReplicationOpe import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.action.shard.ShardStateAction; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; @@ -81,6 +83,16 @@ public class TransportShardReplicationPingAction extends TransportShardReplicati protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) { } + @Override + protected ClusterBlockException checkGlobalBlock(ClusterState state, ShardReplicationPingRequest request) { + return state.blocks().globalBlockedException(ClusterBlockLevel.READ); + } + + @Override + protected ClusterBlockException checkRequestBlock(ClusterState state, ShardReplicationPingRequest request) { + return state.blocks().indexBlockedException(ClusterBlockLevel.READ, request.index()); + } + @Override protected ShardIterator shards(ClusterState clusterState, ShardReplicationPingRequest request) { return clusterService.state().routingTable().index(request.index()).shard(request.shardId()).shardsIt(); diff --git a/src/main/java/org/elasticsearch/action/admin/cluster/ping/single/TransportSinglePingAction.java b/src/main/java/org/elasticsearch/action/admin/cluster/ping/single/TransportSinglePingAction.java index 0c5191dbecd..69969593937 100644 --- a/src/main/java/org/elasticsearch/action/admin/cluster/ping/single/TransportSinglePingAction.java +++ b/src/main/java/org/elasticsearch/action/admin/cluster/ping/single/TransportSinglePingAction.java @@ -24,6 +24,8 @@ import org.elasticsearch.action.TransportActions; import org.elasticsearch.action.support.single.shard.TransportShardSingleOperationAction; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; @@ -51,7 +53,17 @@ public class TransportSinglePingAction extends TransportShardSingleOperationActi } @Override - protected ShardIterator shards(ClusterState clusterState, SinglePingRequest request) throws ElasticSearchException { + protected ClusterBlockException checkGlobalBlock(ClusterState state, SinglePingRequest request) { + return state.blocks().globalBlockedException(ClusterBlockLevel.READ); + } + + @Override + protected ClusterBlockException checkRequestBlock(ClusterState state, SinglePingRequest singlePingRequest) { + return null; + } + + @Override + protected ShardIterator shards(ClusterState state, SinglePingRequest request) throws ElasticSearchException { return clusterService.operationRouting() .getShards(clusterService.state(), request.index(), request.type, request.id, null, null); } diff --git a/src/main/java/org/elasticsearch/action/admin/indices/analyze/TransportAnalyzeAction.java b/src/main/java/org/elasticsearch/action/admin/indices/analyze/TransportAnalyzeAction.java index aa93fb76c07..1a7667419c5 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/analyze/TransportAnalyzeAction.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/analyze/TransportAnalyzeAction.java @@ -32,6 +32,8 @@ import org.elasticsearch.action.TransportActions; import org.elasticsearch.action.support.single.custom.TransportSingleCustomOperationAction; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.routing.ShardsIterator; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.FastStringReader; @@ -87,13 +89,26 @@ public class TransportAnalyzeAction extends TransportSingleCustomOperationAction } @Override - protected ShardsIterator shards(ClusterState clusterState, AnalyzeRequest request) { + protected ClusterBlockException checkGlobalBlock(ClusterState state, AnalyzeRequest request) { + return state.blocks().globalBlockedException(ClusterBlockLevel.READ); + } + + @Override + protected ClusterBlockException checkRequestBlock(ClusterState state, AnalyzeRequest request) { + if (request.index() != null) { + request.index(state.metaData().concreteIndex(request.index())); + return state.blocks().indexBlockedException(ClusterBlockLevel.READ, request.index()); + } + return null; + } + + @Override + protected ShardsIterator shards(ClusterState state, AnalyzeRequest request) { if (request.index() == null) { // just execute locally.... return null; } - request.index(clusterState.metaData().concreteIndex(request.index())); - return clusterState.routingTable().index(request.index()).randomAllActiveShardsIt(); + return state.routingTable().index(request.index()).randomAllActiveShardsIt(); } @Override diff --git a/src/main/java/org/elasticsearch/action/admin/indices/cache/clear/TransportClearIndicesCacheAction.java b/src/main/java/org/elasticsearch/action/admin/indices/cache/clear/TransportClearIndicesCacheAction.java index dcf4219eb48..767b594a199 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/cache/clear/TransportClearIndicesCacheAction.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/cache/clear/TransportClearIndicesCacheAction.java @@ -27,6 +27,8 @@ import org.elasticsearch.action.support.broadcast.BroadcastShardOperationFailedE import org.elasticsearch.action.support.broadcast.TransportBroadcastOperationAction; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.inject.Inject; @@ -160,7 +162,18 @@ public class TransportClearIndicesCacheAction extends TransportBroadcastOperatio * The refresh request works against *all* shards. */ @Override - protected GroupShardsIterator shards(ClearIndicesCacheRequest request, String[] concreteIndices, ClusterState clusterState) { + protected GroupShardsIterator shards(ClusterState clusterState, ClearIndicesCacheRequest request, String[] concreteIndices) { return clusterState.routingTable().allActiveShardsGrouped(concreteIndices, true); } + + @Override + protected ClusterBlockException checkGlobalBlock(ClusterState state, ClearIndicesCacheRequest request) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA); + } + + @Override + protected ClusterBlockException checkRequestBlock(ClusterState state, ClearIndicesCacheRequest request, String[] concreteIndices) { + return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA, concreteIndices); + } + } \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportFlushAction.java b/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportFlushAction.java index 45269e8a3e4..05965014b3f 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportFlushAction.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportFlushAction.java @@ -27,6 +27,8 @@ import org.elasticsearch.action.support.broadcast.BroadcastShardOperationFailedE import org.elasticsearch.action.support.broadcast.TransportBroadcastOperationAction; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.inject.Inject; @@ -123,7 +125,17 @@ public class TransportFlushAction extends TransportBroadcastOperationAction> routingMap = clusterState.metaData().resolveSearchRouting(Integer.toString(ThreadLocalRandom.current().nextInt(1000)), request.indices()); return clusterService.operationRouting().searchShards(clusterState, request.indices(), concreteIndices, null, routingMap, "_local"); } @Override - protected void checkBlock(ValidateQueryRequest request, String[] concreteIndices, ClusterState state) { - for (String index : concreteIndices) { - state.blocks().indexBlocked(ClusterBlockLevel.READ, index); - } + protected ClusterBlockException checkGlobalBlock(ClusterState state, ValidateQueryRequest request) { + return state.blocks().globalBlockedException(ClusterBlockLevel.READ); + } + + @Override + protected ClusterBlockException checkRequestBlock(ClusterState state, ValidateQueryRequest countRequest, String[] concreteIndices) { + return state.blocks().indicesBlockedException(ClusterBlockLevel.READ, concreteIndices); } @Override diff --git a/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index 84e37db2fcc..35dc277200d 100644 --- a/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -34,6 +34,7 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.BaseAction; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.routing.GroupShardsIterator; @@ -138,6 +139,9 @@ public class TransportBulkAction extends BaseAction { private void executeBulk(final BulkRequest bulkRequest, final long startTime, final ActionListener listener) { ClusterState clusterState = clusterService.state(); + // TODO use timeout to wait here if its blocked... + clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.WRITE); + MetaData metaData = clusterState.metaData(); for (ActionRequest request : bulkRequest.requests) { if (request instanceof IndexRequest) { @@ -152,6 +156,7 @@ public class TransportBulkAction extends BaseAction { indexRequest.process(metaData, aliasOrIndex, mappingMd, allowIdGeneration); } else if (request instanceof DeleteRequest) { DeleteRequest deleteRequest = (DeleteRequest) request; + deleteRequest.routing(clusterState.metaData().resolveIndexRouting(deleteRequest.routing(), deleteRequest.index())); deleteRequest.index(clusterState.metaData().concreteIndex(deleteRequest.index())); } } diff --git a/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index d397be17531..006615f4c0d 100644 --- a/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -33,6 +33,7 @@ import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.action.index.MappingUpdatedAction; import org.elasticsearch.cluster.action.shard.ShardStateAction; +import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.routing.ShardIterator; @@ -107,8 +108,13 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation } @Override - protected void checkBlock(BulkShardRequest request, ClusterState state) { - state.blocks().indexBlockedRaiseException(ClusterBlockLevel.WRITE, request.index()); + protected ClusterBlockException checkGlobalBlock(ClusterState state, BulkShardRequest request) { + return state.blocks().globalBlockedException(ClusterBlockLevel.WRITE); + } + + @Override + protected ClusterBlockException checkRequestBlock(ClusterState state, BulkShardRequest request) { + return state.blocks().indexBlockedException(ClusterBlockLevel.WRITE, request.index()); } @Override diff --git a/src/main/java/org/elasticsearch/action/count/TransportCountAction.java b/src/main/java/org/elasticsearch/action/count/TransportCountAction.java index a88fd542978..503351d4597 100644 --- a/src/main/java/org/elasticsearch/action/count/TransportCountAction.java +++ b/src/main/java/org/elasticsearch/action/count/TransportCountAction.java @@ -27,6 +27,7 @@ import org.elasticsearch.action.support.broadcast.BroadcastShardOperationFailedE import org.elasticsearch.action.support.broadcast.TransportBroadcastOperationAction; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.cluster.routing.ShardRouting; @@ -89,16 +90,19 @@ public class TransportCountAction extends TransportBroadcastOperationAction> routingMap = clusterState.metaData().resolveSearchRouting(request.routing(), request.indices()); return clusterService.operationRouting().searchShards(clusterState, request.indices(), concreteIndices, request.queryHint(), routingMap, null); } @Override - protected void checkBlock(CountRequest request, String[] concreteIndices, ClusterState state) { - for (String index : concreteIndices) { - state.blocks().indexBlocked(ClusterBlockLevel.READ, index); - } + protected ClusterBlockException checkGlobalBlock(ClusterState state, CountRequest request) { + return state.blocks().globalBlockedException(ClusterBlockLevel.READ); + } + + @Override + protected ClusterBlockException checkRequestBlock(ClusterState state, CountRequest countRequest, String[] concreteIndices) { + return state.blocks().indicesBlockedException(ClusterBlockLevel.READ, concreteIndices); } @Override diff --git a/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java b/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java index a67e90b2388..4870e164297 100644 --- a/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java +++ b/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java @@ -33,6 +33,7 @@ import org.elasticsearch.action.support.replication.TransportShardReplicationOpe import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.action.shard.ShardStateAction; +import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.routing.ShardIterator; @@ -47,8 +48,6 @@ import org.elasticsearch.transport.TransportService; /** * Performs the delete operation. - * - * */ public class TransportDeleteAction extends TransportShardReplicationOperationAction { @@ -98,13 +97,13 @@ public class TransportDeleteAction extends TransportShardReplicationOperationAct } } - private void innerExecute(final DeleteRequest request, final ActionListener listener) { - ClusterState clusterState = clusterService.state(); - request.routing(clusterState.metaData().resolveIndexRouting(request.routing(), request.index())); - request.index(clusterState.metaData().concreteIndex(request.index())); // we need to get the concrete index here... - if (clusterState.metaData().hasIndex(request.index())) { + @Override + protected boolean resolveRequest(final ClusterState state, final DeleteRequest request, final ActionListener listener) { + request.routing(state.metaData().resolveIndexRouting(request.routing(), request.index())); + request.index(state.metaData().concreteIndex(request.index())); + if (state.metaData().hasIndex(request.index())) { // check if routing is required, if so, do a broadcast delete - MappingMetaData mappingMd = clusterState.metaData().index(request.index()).mapping(request.type()); + MappingMetaData mappingMd = state.metaData().index(request.index()).mapping(request.type()); if (mappingMd != null && mappingMd.routing().required()) { if (request.routing() == null) { indexDeleteAction.execute(new IndexDeleteRequest(request), new ActionListener() { @@ -128,10 +127,14 @@ public class TransportDeleteAction extends TransportShardReplicationOperationAct listener.onFailure(e); } }); - return; + return false; } } } + return true; + } + + private void innerExecute(final DeleteRequest request, final ActionListener listener) { super.doExecute(request, listener); } @@ -161,8 +164,13 @@ public class TransportDeleteAction extends TransportShardReplicationOperationAct } @Override - protected void checkBlock(DeleteRequest request, ClusterState state) { - state.blocks().indexBlockedRaiseException(ClusterBlockLevel.WRITE, request.index()); + protected ClusterBlockException checkGlobalBlock(ClusterState state, DeleteRequest request) { + return state.blocks().globalBlockedException(ClusterBlockLevel.WRITE); + } + + @Override + protected ClusterBlockException checkRequestBlock(ClusterState state, DeleteRequest request) { + return state.blocks().indexBlockedException(ClusterBlockLevel.WRITE, request.index()); } @Override diff --git a/src/main/java/org/elasticsearch/action/delete/index/TransportIndexDeleteAction.java b/src/main/java/org/elasticsearch/action/delete/index/TransportIndexDeleteAction.java index c0fb6dc4e1f..fc8d099fadd 100644 --- a/src/main/java/org/elasticsearch/action/delete/index/TransportIndexDeleteAction.java +++ b/src/main/java/org/elasticsearch/action/delete/index/TransportIndexDeleteAction.java @@ -22,6 +22,7 @@ package org.elasticsearch.action.delete.index; import org.elasticsearch.action.support.replication.TransportIndexReplicationOperationAction; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.common.inject.Inject; @@ -75,8 +76,13 @@ public class TransportIndexDeleteAction extends TransportIndexReplicationOperati } @Override - protected void checkBlock(IndexDeleteRequest request, ClusterState state) { - state.blocks().indexBlockedRaiseException(ClusterBlockLevel.WRITE, request.index()); + protected ClusterBlockException checkGlobalBlock(ClusterState state, IndexDeleteRequest request) { + return state.blocks().globalBlockedException(ClusterBlockLevel.WRITE); + } + + @Override + protected ClusterBlockException checkRequestBlock(ClusterState state, IndexDeleteRequest request) { + return state.blocks().indexBlockedException(ClusterBlockLevel.WRITE, request.index()); } @Override diff --git a/src/main/java/org/elasticsearch/action/delete/index/TransportShardDeleteAction.java b/src/main/java/org/elasticsearch/action/delete/index/TransportShardDeleteAction.java index 9a0c016e8b1..a595da63c8a 100644 --- a/src/main/java/org/elasticsearch/action/delete/index/TransportShardDeleteAction.java +++ b/src/main/java/org/elasticsearch/action/delete/index/TransportShardDeleteAction.java @@ -24,6 +24,7 @@ import org.elasticsearch.action.support.replication.TransportShardReplicationOpe import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.action.shard.ShardStateAction; +import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.cluster.routing.ShardIterator; @@ -78,8 +79,13 @@ public class TransportShardDeleteAction extends TransportShardReplicationOperati } @Override - protected void checkBlock(ShardDeleteRequest request, ClusterState state) { - state.blocks().indexBlockedRaiseException(ClusterBlockLevel.WRITE, request.index()); + protected ClusterBlockException checkGlobalBlock(ClusterState state, ShardDeleteRequest request) { + return state.blocks().globalBlockedException(ClusterBlockLevel.WRITE); + } + + @Override + protected ClusterBlockException checkRequestBlock(ClusterState state, ShardDeleteRequest request) { + return state.blocks().indexBlockedException(ClusterBlockLevel.WRITE, request.index()); } @Override diff --git a/src/main/java/org/elasticsearch/action/deletebyquery/TransportDeleteByQueryAction.java b/src/main/java/org/elasticsearch/action/deletebyquery/TransportDeleteByQueryAction.java index e8d523dfb3d..587b9548c25 100644 --- a/src/main/java/org/elasticsearch/action/deletebyquery/TransportDeleteByQueryAction.java +++ b/src/main/java/org/elasticsearch/action/deletebyquery/TransportDeleteByQueryAction.java @@ -23,6 +23,7 @@ import org.elasticsearch.action.TransportActions; import org.elasticsearch.action.support.replication.TransportIndicesReplicationOperationAction; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; @@ -70,10 +71,13 @@ public class TransportDeleteByQueryAction extends TransportIndicesReplicationOpe } @Override - protected void checkBlock(DeleteByQueryRequest request, String[] concreteIndices, ClusterState state) { - for (String index : concreteIndices) { - state.blocks().indexBlockedRaiseException(ClusterBlockLevel.WRITE, index); - } + protected ClusterBlockException checkGlobalBlock(ClusterState state, DeleteByQueryRequest replicationPingRequest) { + return state.blocks().globalBlockedException(ClusterBlockLevel.READ); + } + + @Override + protected ClusterBlockException checkRequestBlock(ClusterState state, DeleteByQueryRequest replicationPingRequest, String[] concreteIndices) { + return state.blocks().indicesBlockedException(ClusterBlockLevel.WRITE, concreteIndices); } @Override diff --git a/src/main/java/org/elasticsearch/action/deletebyquery/TransportIndexDeleteByQueryAction.java b/src/main/java/org/elasticsearch/action/deletebyquery/TransportIndexDeleteByQueryAction.java index 6c52fe0b64a..b815b38ee6d 100644 --- a/src/main/java/org/elasticsearch/action/deletebyquery/TransportIndexDeleteByQueryAction.java +++ b/src/main/java/org/elasticsearch/action/deletebyquery/TransportIndexDeleteByQueryAction.java @@ -23,6 +23,7 @@ import org.elasticsearch.action.TransportActions; import org.elasticsearch.action.support.replication.TransportIndexReplicationOperationAction; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.common.inject.Inject; @@ -73,8 +74,13 @@ public class TransportIndexDeleteByQueryAction extends TransportIndexReplication } @Override - protected void checkBlock(IndexDeleteByQueryRequest request, ClusterState state) { - state.blocks().indexBlockedRaiseException(ClusterBlockLevel.WRITE, request.index()); + protected ClusterBlockException checkGlobalBlock(ClusterState state, IndexDeleteByQueryRequest request) { + return state.blocks().globalBlockedException(ClusterBlockLevel.WRITE); + } + + @Override + protected ClusterBlockException checkRequestBlock(ClusterState state, IndexDeleteByQueryRequest request) { + return state.blocks().indexBlockedException(ClusterBlockLevel.WRITE, request.index()); } @Override diff --git a/src/main/java/org/elasticsearch/action/deletebyquery/TransportShardDeleteByQueryAction.java b/src/main/java/org/elasticsearch/action/deletebyquery/TransportShardDeleteByQueryAction.java index 8738aa17b48..40ae24e4c01 100644 --- a/src/main/java/org/elasticsearch/action/deletebyquery/TransportShardDeleteByQueryAction.java +++ b/src/main/java/org/elasticsearch/action/deletebyquery/TransportShardDeleteByQueryAction.java @@ -25,6 +25,7 @@ import org.elasticsearch.action.support.replication.TransportShardReplicationOpe import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.action.shard.ShardStateAction; +import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.cluster.routing.ShardIterator; @@ -79,8 +80,13 @@ public class TransportShardDeleteByQueryAction extends TransportShardReplication } @Override - protected void checkBlock(ShardDeleteByQueryRequest request, ClusterState state) { - state.blocks().indexBlockedRaiseException(ClusterBlockLevel.WRITE, request.index()); + protected ClusterBlockException checkGlobalBlock(ClusterState state, ShardDeleteByQueryRequest request) { + return state.blocks().globalBlockedException(ClusterBlockLevel.WRITE); + } + + @Override + protected ClusterBlockException checkRequestBlock(ClusterState state, ShardDeleteByQueryRequest request) { + return state.blocks().indexBlockedException(ClusterBlockLevel.WRITE, request.index()); } @Override diff --git a/src/main/java/org/elasticsearch/action/get/TransportGetAction.java b/src/main/java/org/elasticsearch/action/get/TransportGetAction.java index e1d1eba6adc..87e4ed0710c 100644 --- a/src/main/java/org/elasticsearch/action/get/TransportGetAction.java +++ b/src/main/java/org/elasticsearch/action/get/TransportGetAction.java @@ -20,13 +20,12 @@ package org.elasticsearch.action.get; import org.elasticsearch.ElasticSearchException; -import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.TransportActions; import org.elasticsearch.action.support.single.shard.TransportShardSingleOperationAction; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; -import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; @@ -67,26 +66,29 @@ public class TransportGetAction extends TransportShardSingleOperationAction listener) { + protected void resolveRequest(ClusterState state, GetRequest request) { if (request.realtime == null) { request.realtime = this.realtime; } // update the routing (request#index here is possibly an alias) - MetaData metaData = clusterService.state().metaData(); - request.routing(metaData.resolveIndexRouting(request.routing(), request.index())); - - super.doExecute(request, listener); + request.routing(state.metaData().resolveIndexRouting(request.routing(), request.index())); + request.index(state.metaData().concreteIndex(request.index())); } @Override diff --git a/src/main/java/org/elasticsearch/action/get/TransportMultiGetAction.java b/src/main/java/org/elasticsearch/action/get/TransportMultiGetAction.java index 775a9632af3..3819bdb69a5 100644 --- a/src/main/java/org/elasticsearch/action/get/TransportMultiGetAction.java +++ b/src/main/java/org/elasticsearch/action/get/TransportMultiGetAction.java @@ -25,6 +25,7 @@ import org.elasticsearch.action.TransportActions; import org.elasticsearch.action.support.BaseAction; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.shard.ShardId; @@ -55,9 +56,13 @@ public class TransportMultiGetAction extends BaseAction listener) { ClusterState clusterState = clusterService.state(); + + clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.READ); + Map shardRequests = new HashMap(); for (int i = 0; i < request.items.size(); i++) { MultiGetRequest.Item item = request.items.get(i); + item.routing(clusterState.metaData().resolveIndexRouting(item.routing(), item.index())); item.index(clusterState.metaData().concreteIndex(item.index())); ShardId shardId = clusterService.operationRouting() .getShards(clusterState, item.index(), item.type(), item.id(), item.routing(), null).shardId(); diff --git a/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java b/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java index 8296d18e2ed..f3561fa7db2 100644 --- a/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java +++ b/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java @@ -21,11 +21,11 @@ package org.elasticsearch.action.get; import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.TransportActions; import org.elasticsearch.action.support.single.shard.TransportShardSingleOperationAction; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.common.inject.Inject; @@ -74,22 +74,28 @@ public class TransportShardMultiGetAction extends TransportShardSingleOperationA } @Override - protected void checkBlock(MultiGetShardRequest request, ClusterState state) { - state.blocks().indexBlockedRaiseException(ClusterBlockLevel.READ, request.index()); + protected ClusterBlockException checkGlobalBlock(ClusterState state, MultiGetShardRequest request) { + return state.blocks().globalBlockedException(ClusterBlockLevel.READ); } @Override - protected ShardIterator shards(ClusterState clusterState, MultiGetShardRequest request) { + protected ClusterBlockException checkRequestBlock(ClusterState state, MultiGetShardRequest request) { + return state.blocks().indexBlockedException(ClusterBlockLevel.READ, request.index()); + } + + @Override + protected ShardIterator shards(ClusterState state, MultiGetShardRequest request) { return clusterService.operationRouting() .getShards(clusterService.state(), request.index(), request.shardId(), request.preference()); } @Override - protected void doExecute(MultiGetShardRequest request, ActionListener listener) { + protected void resolveRequest(ClusterState state, MultiGetShardRequest request) { if (request.realtime == null) { request.realtime = this.realtime; } - super.doExecute(request, listener); + // no need to set concrete index and routing here, it has already been set by the multi get action on the item + //request.index(state.metaData().concreteIndex(request.index())); } @Override diff --git a/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java b/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java index f1ed9f5d846..7eab00a4fb3 100644 --- a/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java +++ b/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java @@ -31,6 +31,7 @@ import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.action.index.MappingUpdatedAction; import org.elasticsearch.cluster.action.shard.ShardStateAction; +import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.metadata.MetaData; @@ -63,8 +64,6 @@ import java.util.concurrent.TimeUnit; * Defaults to true. *
  • allowIdGeneration: If the id is set not, should it be generated. Defaults to true. * - * - * */ public class TransportIndexAction extends TransportShardReplicationOperationAction { @@ -92,6 +91,7 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi @Override protected void doExecute(final IndexRequest request, final ActionListener listener) { + // if we don't have a master, we don't have metadata, that's fine, let it find a master using create index API if (autoCreateIndex && !clusterService.state().metaData().hasConcreteIndex(request.index())) { request.beforeLocalFork(); // we fork on another thread... createIndexAction.execute(new CreateIndexRequest(request.index()).cause("auto(index api)"), new ActionListener() { @@ -119,7 +119,8 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi } } - private void innerExecute(final IndexRequest request, final ActionListener listener) { + @Override + protected boolean resolveRequest(ClusterState state, IndexRequest request, ActionListener indexResponseActionListener) { MetaData metaData = clusterService.state().metaData(); String aliasOrIndex = request.index(); request.index(metaData.concreteIndex(request.index())); @@ -128,7 +129,10 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi mappingMd = metaData.index(request.index()).mapping(request.type()); } request.process(metaData, aliasOrIndex, mappingMd, allowIdGeneration); + return true; + } + private void innerExecute(final IndexRequest request, final ActionListener listener) { super.doExecute(request, listener); } @@ -163,8 +167,13 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi } @Override - protected void checkBlock(IndexRequest request, ClusterState state) { - state.blocks().indexBlockedRaiseException(ClusterBlockLevel.WRITE, request.index()); + protected ClusterBlockException checkGlobalBlock(ClusterState state, IndexRequest request) { + return state.blocks().globalBlockedException(ClusterBlockLevel.WRITE); + } + + @Override + protected ClusterBlockException checkRequestBlock(ClusterState state, IndexRequest request) { + return state.blocks().indexBlockedException(ClusterBlockLevel.WRITE, request.index()); } @Override diff --git a/src/main/java/org/elasticsearch/action/percolate/TransportPercolateAction.java b/src/main/java/org/elasticsearch/action/percolate/TransportPercolateAction.java index 5efe3f0fca7..560179eb7b8 100644 --- a/src/main/java/org/elasticsearch/action/percolate/TransportPercolateAction.java +++ b/src/main/java/org/elasticsearch/action/percolate/TransportPercolateAction.java @@ -24,6 +24,8 @@ import org.elasticsearch.action.TransportActions; import org.elasticsearch.action.support.single.custom.TransportSingleCustomOperationAction; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.routing.ShardsIterator; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; @@ -68,9 +70,19 @@ public class TransportPercolateAction extends TransportSingleCustomOperationActi return TransportActions.PERCOLATE; } + @Override + protected ClusterBlockException checkGlobalBlock(ClusterState state, PercolateRequest request) { + return state.blocks().globalBlockedException(ClusterBlockLevel.READ); + } + + @Override + protected ClusterBlockException checkRequestBlock(ClusterState state, PercolateRequest request) { + request.index(state.metaData().concreteIndex(request.index())); + return state.blocks().indexBlockedException(ClusterBlockLevel.READ, request.index()); + } + @Override protected ShardsIterator shards(ClusterState clusterState, PercolateRequest request) { - request.index(clusterState.metaData().concreteIndex(request.index())); return clusterState.routingTable().index(request.index()).randomAllActiveShardsIt(); } diff --git a/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastOperationAction.java b/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastOperationAction.java index d67f33b22fa..32e45a13816 100644 --- a/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastOperationAction.java +++ b/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastOperationAction.java @@ -24,6 +24,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.BaseAction; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.GroupShardsIterator; @@ -88,7 +89,7 @@ public abstract class TransportBroadcastOperationAction listener) { this.request = request; this.listener = listener; clusterState = clusterService.state(); + ClusterBlockException blockException = checkGlobalBlock(clusterState, request); + if (blockException != null) { + throw blockException; + } // update to concrete indices - concreteIndices = clusterState.metaData().concreteIndices(request.indices(), false, true); - checkBlock(request, concreteIndices, clusterState); + String[] concreteIndices = clusterState.metaData().concreteIndices(request.indices(), false, true); + blockException = checkRequestBlock(clusterState, request, concreteIndices); + if (blockException != null) { + throw blockException; + } nodes = clusterState.nodes(); - shardsIts = shards(request, concreteIndices, clusterState); + shardsIts = shards(clusterState, request, concreteIndices); expectedOps = shardsIts.size(); - shardsResponses = new AtomicReferenceArray(expectedOps); } diff --git a/src/main/java/org/elasticsearch/action/support/replication/TransportIndexReplicationOperationAction.java b/src/main/java/org/elasticsearch/action/support/replication/TransportIndexReplicationOperationAction.java index 41ff5c31cc9..43a1536d398 100644 --- a/src/main/java/org/elasticsearch/action/support/replication/TransportIndexReplicationOperationAction.java +++ b/src/main/java/org/elasticsearch/action/support/replication/TransportIndexReplicationOperationAction.java @@ -26,6 +26,7 @@ import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.support.BaseAction; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.common.inject.Inject; @@ -60,12 +61,17 @@ public abstract class TransportIndexReplicationOperationAction listener) { - ClusterState clusterState = clusterService.state(); + ClusterBlockException blockException = checkGlobalBlock(clusterState, request); + if (blockException != null) { + throw blockException; + } // update to concrete index request.index(clusterState.metaData().concreteIndex(request.index())); - - checkBlock(request, clusterState); + blockException = checkRequestBlock(clusterState, request); + if (blockException != null) { + throw blockException; + } GroupShardsIterator groups; try { @@ -122,9 +128,9 @@ public abstract class TransportIndexReplicationOperationAction { diff --git a/src/main/java/org/elasticsearch/action/support/replication/TransportIndicesReplicationOperationAction.java b/src/main/java/org/elasticsearch/action/support/replication/TransportIndicesReplicationOperationAction.java index 41705e687a6..251d8ce9073 100644 --- a/src/main/java/org/elasticsearch/action/support/replication/TransportIndicesReplicationOperationAction.java +++ b/src/main/java/org/elasticsearch/action/support/replication/TransportIndicesReplicationOperationAction.java @@ -25,6 +25,7 @@ import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.support.BaseAction; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.threadpool.ThreadPool; @@ -65,12 +66,16 @@ public abstract class TransportIndicesReplicationOperationAction listener) { ClusterState clusterState = clusterService.state(); - + ClusterBlockException blockException = checkGlobalBlock(clusterState, request); + if (blockException != null) { + throw blockException; + } // get actual indices - String[] concreteIndices = clusterState.metaData().concreteIndices(request.indices()); - - checkBlock(request, concreteIndices, clusterState); + blockException = checkRequestBlock(clusterState, request, concreteIndices); + if (blockException != null) { + throw blockException; + } final AtomicInteger indexCounter = new AtomicInteger(); final AtomicInteger completionCounter = new AtomicInteger(concreteIndices.length); @@ -120,9 +125,9 @@ public abstract class TransportIndicesReplicationOperationAction { diff --git a/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java b/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java index 1524612dc75..95e3b02f969 100644 --- a/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java +++ b/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java @@ -28,10 +28,12 @@ import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.TimeoutClusterStateListener; import org.elasticsearch.cluster.action.shard.ShardStateAction; +import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Streamable; @@ -42,7 +44,6 @@ import org.elasticsearch.index.IndexShardMissingException; import org.elasticsearch.index.engine.DocumentAlreadyExistsException; import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.index.shard.IllegalIndexShardStateException; -import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndexMissingException; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.node.NodeClosedException; @@ -128,8 +129,17 @@ public abstract class TransportShardReplicationOperationAction listener) { + request.index(state.metaData().concreteIndex(request.index())); + return true; } protected TransportRequestOptions transportOptions() { @@ -317,12 +327,6 @@ public abstract class TransportShardReplicationOperationAction listener) { this.request = request; this.listener = listener; - - ClusterState clusterState = clusterService.state(); - - nodes = clusterState.nodes(); - - request.index(clusterState.metaData().concreteIndex(request.index())); - - checkBlock(request, clusterState); } public void start() { @@ -128,11 +122,26 @@ public abstract class TransportInstanceSingleOperationAction