No master (startup / minimum_master_node) / not recovered blocks should cause proper failures on operations, closes #1589.

This commit is contained in:
Shay Banon 2012-01-06 23:38:41 +02:00
parent 93cce59a74
commit ec8b7c3e23
40 changed files with 627 additions and 151 deletions

View File

@ -20,14 +20,16 @@
package org.elasticsearch.action;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.rest.RestStatus;
/**
*
*/
public class UnavailableShardsException extends ElasticSearchException {
public UnavailableShardsException(ShardId shardId, String message) {
public UnavailableShardsException(@Nullable ShardId shardId, String message) {
super(buildMessage(shardId, message));
}
@ -37,4 +39,9 @@ public class UnavailableShardsException extends ElasticSearchException {
}
return "[" + shardId.index().name() + "][" + shardId.id() + "] " + message;
}
@Override
public RestStatus status() {
return RestStatus.SERVICE_UNAVAILABLE;
}
}

View File

@ -27,6 +27,8 @@ import org.elasticsearch.action.support.broadcast.BroadcastShardOperationFailedE
import org.elasticsearch.action.support.broadcast.TransportBroadcastOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.inject.Inject;
@ -65,10 +67,20 @@ public class TransportBroadcastPingAction extends TransportBroadcastOperationAct
}
@Override
protected GroupShardsIterator shards(BroadcastPingRequest request, String[] concreteIndices, ClusterState clusterState) {
protected GroupShardsIterator shards(ClusterState clusterState, BroadcastPingRequest request, String[] concreteIndices) {
return clusterService.operationRouting().searchShards(clusterState, request.indices(), concreteIndices, request.queryHint(), null, null);
}
@Override
protected ClusterBlockException checkGlobalBlock(ClusterState state, BroadcastPingRequest request) {
return state.blocks().globalBlockedException(ClusterBlockLevel.READ);
}
@Override
protected ClusterBlockException checkRequestBlock(ClusterState state, BroadcastPingRequest request, String[] concreteIndices) {
return null;
}
@Override
protected BroadcastPingResponse newResponse(BroadcastPingRequest request, AtomicReferenceArray shardsResponses, ClusterState clusterState) {
int successfulShards = 0;

View File

@ -21,6 +21,9 @@ package org.elasticsearch.action.admin.cluster.ping.replication;
import org.elasticsearch.action.support.replication.TransportIndexReplicationOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.common.inject.Inject;
@ -85,4 +88,14 @@ public class TransportIndexReplicationPingAction extends TransportIndexReplicati
protected ShardReplicationPingRequest newShardRequestInstance(IndexReplicationPingRequest indexRequest, int shardId) {
return new ShardReplicationPingRequest(indexRequest, shardId);
}
@Override
protected ClusterBlockException checkGlobalBlock(ClusterState state, IndexReplicationPingRequest request) {
return state.blocks().globalBlockedException(ClusterBlockLevel.READ);
}
@Override
protected ClusterBlockException checkRequestBlock(ClusterState state, IndexReplicationPingRequest request) {
return state.blocks().indexBlockedException(ClusterBlockLevel.READ, request.index());
}
}

View File

@ -22,6 +22,9 @@ package org.elasticsearch.action.admin.cluster.ping.replication;
import org.elasticsearch.action.TransportActions;
import org.elasticsearch.action.support.replication.TransportIndicesReplicationOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
@ -70,4 +73,14 @@ public class TransportReplicationPingAction extends TransportIndicesReplicationO
protected IndexReplicationPingRequest newIndexRequestInstance(ReplicationPingRequest request, String index, Set<String> routing) {
return new IndexReplicationPingRequest(request, index);
}
@Override
protected ClusterBlockException checkGlobalBlock(ClusterState state, ReplicationPingRequest replicationPingRequest) {
return state.blocks().globalBlockedException(ClusterBlockLevel.READ);
}
@Override
protected ClusterBlockException checkRequestBlock(ClusterState state, ReplicationPingRequest replicationPingRequest, String[] concreteIndices) {
return state.blocks().indicesBlockedException(ClusterBlockLevel.WRITE, concreteIndices);
}
}

View File

@ -23,6 +23,8 @@ import org.elasticsearch.action.support.replication.TransportShardReplicationOpe
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
@ -81,6 +83,16 @@ public class TransportShardReplicationPingAction extends TransportShardReplicati
protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) {
}
@Override
protected ClusterBlockException checkGlobalBlock(ClusterState state, ShardReplicationPingRequest request) {
return state.blocks().globalBlockedException(ClusterBlockLevel.READ);
}
@Override
protected ClusterBlockException checkRequestBlock(ClusterState state, ShardReplicationPingRequest request) {
return state.blocks().indexBlockedException(ClusterBlockLevel.READ, request.index());
}
@Override
protected ShardIterator shards(ClusterState clusterState, ShardReplicationPingRequest request) {
return clusterService.state().routingTable().index(request.index()).shard(request.shardId()).shardsIt();

View File

@ -24,6 +24,8 @@ import org.elasticsearch.action.TransportActions;
import org.elasticsearch.action.support.single.shard.TransportShardSingleOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
@ -51,7 +53,17 @@ public class TransportSinglePingAction extends TransportShardSingleOperationActi
}
@Override
protected ShardIterator shards(ClusterState clusterState, SinglePingRequest request) throws ElasticSearchException {
protected ClusterBlockException checkGlobalBlock(ClusterState state, SinglePingRequest request) {
return state.blocks().globalBlockedException(ClusterBlockLevel.READ);
}
@Override
protected ClusterBlockException checkRequestBlock(ClusterState state, SinglePingRequest singlePingRequest) {
return null;
}
@Override
protected ShardIterator shards(ClusterState state, SinglePingRequest request) throws ElasticSearchException {
return clusterService.operationRouting()
.getShards(clusterService.state(), request.index(), request.type, request.id, null, null);
}

View File

@ -32,6 +32,8 @@ import org.elasticsearch.action.TransportActions;
import org.elasticsearch.action.support.single.custom.TransportSingleCustomOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.routing.ShardsIterator;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.FastStringReader;
@ -87,13 +89,26 @@ public class TransportAnalyzeAction extends TransportSingleCustomOperationAction
}
@Override
protected ShardsIterator shards(ClusterState clusterState, AnalyzeRequest request) {
protected ClusterBlockException checkGlobalBlock(ClusterState state, AnalyzeRequest request) {
return state.blocks().globalBlockedException(ClusterBlockLevel.READ);
}
@Override
protected ClusterBlockException checkRequestBlock(ClusterState state, AnalyzeRequest request) {
if (request.index() != null) {
request.index(state.metaData().concreteIndex(request.index()));
return state.blocks().indexBlockedException(ClusterBlockLevel.READ, request.index());
}
return null;
}
@Override
protected ShardsIterator shards(ClusterState state, AnalyzeRequest request) {
if (request.index() == null) {
// just execute locally....
return null;
}
request.index(clusterState.metaData().concreteIndex(request.index()));
return clusterState.routingTable().index(request.index()).randomAllActiveShardsIt();
return state.routingTable().index(request.index()).randomAllActiveShardsIt();
}
@Override

View File

@ -27,6 +27,8 @@ import org.elasticsearch.action.support.broadcast.BroadcastShardOperationFailedE
import org.elasticsearch.action.support.broadcast.TransportBroadcastOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.inject.Inject;
@ -160,7 +162,18 @@ public class TransportClearIndicesCacheAction extends TransportBroadcastOperatio
* The refresh request works against *all* shards.
*/
@Override
protected GroupShardsIterator shards(ClearIndicesCacheRequest request, String[] concreteIndices, ClusterState clusterState) {
protected GroupShardsIterator shards(ClusterState clusterState, ClearIndicesCacheRequest request, String[] concreteIndices) {
return clusterState.routingTable().allActiveShardsGrouped(concreteIndices, true);
}
@Override
protected ClusterBlockException checkGlobalBlock(ClusterState state, ClearIndicesCacheRequest request) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA);
}
@Override
protected ClusterBlockException checkRequestBlock(ClusterState state, ClearIndicesCacheRequest request, String[] concreteIndices) {
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA, concreteIndices);
}
}

View File

@ -27,6 +27,8 @@ import org.elasticsearch.action.support.broadcast.BroadcastShardOperationFailedE
import org.elasticsearch.action.support.broadcast.TransportBroadcastOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.inject.Inject;
@ -123,7 +125,17 @@ public class TransportFlushAction extends TransportBroadcastOperationAction<Flus
* The refresh request works against *all* shards.
*/
@Override
protected GroupShardsIterator shards(FlushRequest request, String[] concreteIndices, ClusterState clusterState) {
protected GroupShardsIterator shards(ClusterState clusterState, FlushRequest request, String[] concreteIndices) {
return clusterState.routingTable().allActiveShardsGrouped(concreteIndices, true);
}
@Override
protected ClusterBlockException checkGlobalBlock(ClusterState state, FlushRequest request) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA);
}
@Override
protected ClusterBlockException checkRequestBlock(ClusterState state, FlushRequest countRequest, String[] concreteIndices) {
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA, concreteIndices);
}
}

View File

@ -28,6 +28,8 @@ import org.elasticsearch.action.support.broadcast.BroadcastShardOperationFailedE
import org.elasticsearch.action.support.broadcast.TransportBroadcastOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.inject.Inject;
@ -123,7 +125,18 @@ public class TransportGatewaySnapshotAction extends TransportBroadcastOperationA
* The snapshot request works against all primary shards.
*/
@Override
protected GroupShardsIterator shards(GatewaySnapshotRequest request, String[] concreteIndices, ClusterState clusterState) {
protected GroupShardsIterator shards(ClusterState clusterState, GatewaySnapshotRequest request, String[] concreteIndices) {
return clusterState.routingTable().activePrimaryShardsGrouped(concreteIndices, true);
}
@Override
protected ClusterBlockException checkGlobalBlock(ClusterState state, GatewaySnapshotRequest request) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA);
}
@Override
protected ClusterBlockException checkRequestBlock(ClusterState state, GatewaySnapshotRequest request, String[] concreteIndices) {
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA, concreteIndices);
}
}

View File

@ -27,6 +27,8 @@ import org.elasticsearch.action.support.broadcast.BroadcastShardOperationFailedE
import org.elasticsearch.action.support.broadcast.TransportBroadcastOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.inject.Inject;
@ -134,7 +136,17 @@ public class TransportOptimizeAction extends TransportBroadcastOperationAction<O
* The refresh request works against *all* shards.
*/
@Override
protected GroupShardsIterator shards(OptimizeRequest request, String[] concreteIndices, ClusterState clusterState) {
protected GroupShardsIterator shards(ClusterState clusterState, OptimizeRequest request, String[] concreteIndices) {
return clusterState.routingTable().allActiveShardsGrouped(concreteIndices, true);
}
@Override
protected ClusterBlockException checkGlobalBlock(ClusterState state, OptimizeRequest request) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA);
}
@Override
protected ClusterBlockException checkRequestBlock(ClusterState state, OptimizeRequest request, String[] concreteIndices) {
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA, concreteIndices);
}
}

View File

@ -28,6 +28,8 @@ import org.elasticsearch.action.support.broadcast.BroadcastShardOperationFailedE
import org.elasticsearch.action.support.broadcast.TransportBroadcastOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.inject.Inject;
@ -143,7 +145,17 @@ public class TransportRefreshAction extends TransportBroadcastOperationAction<Re
* The refresh request works against *all* shards.
*/
@Override
protected GroupShardsIterator shards(RefreshRequest request, String[] concreteIndices, ClusterState clusterState) {
protected GroupShardsIterator shards(ClusterState clusterState, RefreshRequest request, String[] concreteIndices) {
return clusterState.routingTable().allAssignedShardsGrouped(concreteIndices, true);
}
@Override
protected ClusterBlockException checkGlobalBlock(ClusterState state, RefreshRequest request) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA);
}
@Override
protected ClusterBlockException checkRequestBlock(ClusterState state, RefreshRequest countRequest, String[] concreteIndices) {
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA, concreteIndices);
}
}

View File

@ -28,6 +28,8 @@ import org.elasticsearch.action.support.broadcast.BroadcastShardOperationRequest
import org.elasticsearch.action.support.broadcast.TransportBroadcastOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.inject.Inject;
@ -84,10 +86,20 @@ public class TransportIndicesSegmentsAction extends TransportBroadcastOperationA
* Segments goes across *all* active shards.
*/
@Override
protected GroupShardsIterator shards(IndicesSegmentsRequest request, String[] concreteIndices, ClusterState clusterState) {
protected GroupShardsIterator shards(ClusterState clusterState, IndicesSegmentsRequest request, String[] concreteIndices) {
return clusterState.routingTable().allActiveShardsGrouped(concreteIndices, true);
}
@Override
protected ClusterBlockException checkGlobalBlock(ClusterState state, IndicesSegmentsRequest request) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA);
}
@Override
protected ClusterBlockException checkRequestBlock(ClusterState state, IndicesSegmentsRequest countRequest, String[] concreteIndices) {
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA, concreteIndices);
}
@Override
protected IndicesSegmentResponse newResponse(IndicesSegmentsRequest request, AtomicReferenceArray shardsResponses, ClusterState clusterState) {
int successfulShards = 0;

View File

@ -29,6 +29,8 @@ import org.elasticsearch.action.support.broadcast.BroadcastShardOperationRequest
import org.elasticsearch.action.support.broadcast.TransportBroadcastOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.inject.Inject;
@ -84,10 +86,21 @@ public class TransportIndicesStatsAction extends TransportBroadcastOperationActi
* Status goes across *all* shards.
*/
@Override
protected GroupShardsIterator shards(IndicesStatsRequest request, String[] concreteIndices, ClusterState clusterState) {
protected GroupShardsIterator shards(ClusterState clusterState, IndicesStatsRequest request, String[] concreteIndices) {
return clusterState.routingTable().allAssignedShardsGrouped(concreteIndices, true);
}
@Override
protected ClusterBlockException checkGlobalBlock(ClusterState state, IndicesStatsRequest request) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA);
}
@Override
protected ClusterBlockException checkRequestBlock(ClusterState state, IndicesStatsRequest request, String[] concreteIndices) {
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA, concreteIndices);
}
@Override
protected IndicesStats newResponse(IndicesStatsRequest request, AtomicReferenceArray shardsResponses, ClusterState clusterState) {
int successfulShards = 0;

View File

@ -28,6 +28,8 @@ import org.elasticsearch.action.support.broadcast.BroadcastShardOperationRequest
import org.elasticsearch.action.support.broadcast.TransportBroadcastOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.inject.Inject;
@ -93,8 +95,18 @@ public class TransportIndicesStatusAction extends TransportBroadcastOperationAct
* Status goes across *all* shards.
*/
@Override
protected GroupShardsIterator shards(IndicesStatusRequest request, String[] concreteIndices, ClusterState clusterState) {
return clusterState.routingTable().allAssignedShardsGrouped(concreteIndices, true);
protected GroupShardsIterator shards(ClusterState state, IndicesStatusRequest request, String[] concreteIndices) {
return state.routingTable().allAssignedShardsGrouped(concreteIndices, true);
}
@Override
protected ClusterBlockException checkGlobalBlock(ClusterState state, IndicesStatusRequest request) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA);
}
@Override
protected ClusterBlockException checkRequestBlock(ClusterState state, IndicesStatusRequest countRequest, String[] concreteIndices) {
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA, concreteIndices);
}
@Override

View File

@ -28,6 +28,7 @@ import org.elasticsearch.action.support.broadcast.BroadcastShardOperationFailedE
import org.elasticsearch.action.support.broadcast.TransportBroadcastOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
@ -91,17 +92,20 @@ public class TransportValidateQueryAction extends TransportBroadcastOperationAct
}
@Override
protected GroupShardsIterator shards(ValidateQueryRequest request, String[] concreteIndices, ClusterState clusterState) {
protected GroupShardsIterator shards(ClusterState clusterState, ValidateQueryRequest request, String[] concreteIndices) {
// Hard-code routing to limit request to a single shard, but still, randomize it...
Map<String, Set<String>> routingMap = clusterState.metaData().resolveSearchRouting(Integer.toString(ThreadLocalRandom.current().nextInt(1000)), request.indices());
return clusterService.operationRouting().searchShards(clusterState, request.indices(), concreteIndices, null, routingMap, "_local");
}
@Override
protected void checkBlock(ValidateQueryRequest request, String[] concreteIndices, ClusterState state) {
for (String index : concreteIndices) {
state.blocks().indexBlocked(ClusterBlockLevel.READ, index);
}
protected ClusterBlockException checkGlobalBlock(ClusterState state, ValidateQueryRequest request) {
return state.blocks().globalBlockedException(ClusterBlockLevel.READ);
}
@Override
protected ClusterBlockException checkRequestBlock(ClusterState state, ValidateQueryRequest countRequest, String[] concreteIndices) {
return state.blocks().indicesBlockedException(ClusterBlockLevel.READ, concreteIndices);
}
@Override

View File

@ -34,6 +34,7 @@ import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.BaseAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
@ -138,6 +139,9 @@ public class TransportBulkAction extends BaseAction<BulkRequest, BulkResponse> {
private void executeBulk(final BulkRequest bulkRequest, final long startTime, final ActionListener<BulkResponse> listener) {
ClusterState clusterState = clusterService.state();
// TODO use timeout to wait here if its blocked...
clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.WRITE);
MetaData metaData = clusterState.metaData();
for (ActionRequest request : bulkRequest.requests) {
if (request instanceof IndexRequest) {
@ -152,6 +156,7 @@ public class TransportBulkAction extends BaseAction<BulkRequest, BulkResponse> {
indexRequest.process(metaData, aliasOrIndex, mappingMd, allowIdGeneration);
} else if (request instanceof DeleteRequest) {
DeleteRequest deleteRequest = (DeleteRequest) request;
deleteRequest.routing(clusterState.metaData().resolveIndexRouting(deleteRequest.routing(), deleteRequest.index()));
deleteRequest.index(clusterState.metaData().concreteIndex(deleteRequest.index()));
}
}

View File

@ -33,6 +33,7 @@ import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.routing.ShardIterator;
@ -107,8 +108,13 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
}
@Override
protected void checkBlock(BulkShardRequest request, ClusterState state) {
state.blocks().indexBlockedRaiseException(ClusterBlockLevel.WRITE, request.index());
protected ClusterBlockException checkGlobalBlock(ClusterState state, BulkShardRequest request) {
return state.blocks().globalBlockedException(ClusterBlockLevel.WRITE);
}
@Override
protected ClusterBlockException checkRequestBlock(ClusterState state, BulkShardRequest request) {
return state.blocks().indexBlockedException(ClusterBlockLevel.WRITE, request.index());
}
@Override

View File

@ -27,6 +27,7 @@ import org.elasticsearch.action.support.broadcast.BroadcastShardOperationFailedE
import org.elasticsearch.action.support.broadcast.TransportBroadcastOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
@ -89,16 +90,19 @@ public class TransportCountAction extends TransportBroadcastOperationAction<Coun
}
@Override
protected GroupShardsIterator shards(CountRequest request, String[] concreteIndices, ClusterState clusterState) {
protected GroupShardsIterator shards(ClusterState clusterState, CountRequest request, String[] concreteIndices) {
Map<String, Set<String>> routingMap = clusterState.metaData().resolveSearchRouting(request.routing(), request.indices());
return clusterService.operationRouting().searchShards(clusterState, request.indices(), concreteIndices, request.queryHint(), routingMap, null);
}
@Override
protected void checkBlock(CountRequest request, String[] concreteIndices, ClusterState state) {
for (String index : concreteIndices) {
state.blocks().indexBlocked(ClusterBlockLevel.READ, index);
}
protected ClusterBlockException checkGlobalBlock(ClusterState state, CountRequest request) {
return state.blocks().globalBlockedException(ClusterBlockLevel.READ);
}
@Override
protected ClusterBlockException checkRequestBlock(ClusterState state, CountRequest countRequest, String[] concreteIndices) {
return state.blocks().indicesBlockedException(ClusterBlockLevel.READ, concreteIndices);
}
@Override

View File

@ -33,6 +33,7 @@ import org.elasticsearch.action.support.replication.TransportShardReplicationOpe
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.routing.ShardIterator;
@ -47,8 +48,6 @@ import org.elasticsearch.transport.TransportService;
/**
* Performs the delete operation.
*
*
*/
public class TransportDeleteAction extends TransportShardReplicationOperationAction<DeleteRequest, DeleteRequest, DeleteResponse> {
@ -98,13 +97,13 @@ public class TransportDeleteAction extends TransportShardReplicationOperationAct
}
}
private void innerExecute(final DeleteRequest request, final ActionListener<DeleteResponse> listener) {
ClusterState clusterState = clusterService.state();
request.routing(clusterState.metaData().resolveIndexRouting(request.routing(), request.index()));
request.index(clusterState.metaData().concreteIndex(request.index())); // we need to get the concrete index here...
if (clusterState.metaData().hasIndex(request.index())) {
@Override
protected boolean resolveRequest(final ClusterState state, final DeleteRequest request, final ActionListener<DeleteResponse> listener) {
request.routing(state.metaData().resolveIndexRouting(request.routing(), request.index()));
request.index(state.metaData().concreteIndex(request.index()));
if (state.metaData().hasIndex(request.index())) {
// check if routing is required, if so, do a broadcast delete
MappingMetaData mappingMd = clusterState.metaData().index(request.index()).mapping(request.type());
MappingMetaData mappingMd = state.metaData().index(request.index()).mapping(request.type());
if (mappingMd != null && mappingMd.routing().required()) {
if (request.routing() == null) {
indexDeleteAction.execute(new IndexDeleteRequest(request), new ActionListener<IndexDeleteResponse>() {
@ -128,10 +127,14 @@ public class TransportDeleteAction extends TransportShardReplicationOperationAct
listener.onFailure(e);
}
});
return;
return false;
}
}
}
return true;
}
private void innerExecute(final DeleteRequest request, final ActionListener<DeleteResponse> listener) {
super.doExecute(request, listener);
}
@ -161,8 +164,13 @@ public class TransportDeleteAction extends TransportShardReplicationOperationAct
}
@Override
protected void checkBlock(DeleteRequest request, ClusterState state) {
state.blocks().indexBlockedRaiseException(ClusterBlockLevel.WRITE, request.index());
protected ClusterBlockException checkGlobalBlock(ClusterState state, DeleteRequest request) {
return state.blocks().globalBlockedException(ClusterBlockLevel.WRITE);
}
@Override
protected ClusterBlockException checkRequestBlock(ClusterState state, DeleteRequest request) {
return state.blocks().indexBlockedException(ClusterBlockLevel.WRITE, request.index());
}
@Override

View File

@ -22,6 +22,7 @@ package org.elasticsearch.action.delete.index;
import org.elasticsearch.action.support.replication.TransportIndexReplicationOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.common.inject.Inject;
@ -75,8 +76,13 @@ public class TransportIndexDeleteAction extends TransportIndexReplicationOperati
}
@Override
protected void checkBlock(IndexDeleteRequest request, ClusterState state) {
state.blocks().indexBlockedRaiseException(ClusterBlockLevel.WRITE, request.index());
protected ClusterBlockException checkGlobalBlock(ClusterState state, IndexDeleteRequest request) {
return state.blocks().globalBlockedException(ClusterBlockLevel.WRITE);
}
@Override
protected ClusterBlockException checkRequestBlock(ClusterState state, IndexDeleteRequest request) {
return state.blocks().indexBlockedException(ClusterBlockLevel.WRITE, request.index());
}
@Override

View File

@ -24,6 +24,7 @@ import org.elasticsearch.action.support.replication.TransportShardReplicationOpe
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardIterator;
@ -78,8 +79,13 @@ public class TransportShardDeleteAction extends TransportShardReplicationOperati
}
@Override
protected void checkBlock(ShardDeleteRequest request, ClusterState state) {
state.blocks().indexBlockedRaiseException(ClusterBlockLevel.WRITE, request.index());
protected ClusterBlockException checkGlobalBlock(ClusterState state, ShardDeleteRequest request) {
return state.blocks().globalBlockedException(ClusterBlockLevel.WRITE);
}
@Override
protected ClusterBlockException checkRequestBlock(ClusterState state, ShardDeleteRequest request) {
return state.blocks().indexBlockedException(ClusterBlockLevel.WRITE, request.index());
}
@Override

View File

@ -23,6 +23,7 @@ import org.elasticsearch.action.TransportActions;
import org.elasticsearch.action.support.replication.TransportIndicesReplicationOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
@ -70,10 +71,13 @@ public class TransportDeleteByQueryAction extends TransportIndicesReplicationOpe
}
@Override
protected void checkBlock(DeleteByQueryRequest request, String[] concreteIndices, ClusterState state) {
for (String index : concreteIndices) {
state.blocks().indexBlockedRaiseException(ClusterBlockLevel.WRITE, index);
}
protected ClusterBlockException checkGlobalBlock(ClusterState state, DeleteByQueryRequest replicationPingRequest) {
return state.blocks().globalBlockedException(ClusterBlockLevel.READ);
}
@Override
protected ClusterBlockException checkRequestBlock(ClusterState state, DeleteByQueryRequest replicationPingRequest, String[] concreteIndices) {
return state.blocks().indicesBlockedException(ClusterBlockLevel.WRITE, concreteIndices);
}
@Override

View File

@ -23,6 +23,7 @@ import org.elasticsearch.action.TransportActions;
import org.elasticsearch.action.support.replication.TransportIndexReplicationOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.common.inject.Inject;
@ -73,8 +74,13 @@ public class TransportIndexDeleteByQueryAction extends TransportIndexReplication
}
@Override
protected void checkBlock(IndexDeleteByQueryRequest request, ClusterState state) {
state.blocks().indexBlockedRaiseException(ClusterBlockLevel.WRITE, request.index());
protected ClusterBlockException checkGlobalBlock(ClusterState state, IndexDeleteByQueryRequest request) {
return state.blocks().globalBlockedException(ClusterBlockLevel.WRITE);
}
@Override
protected ClusterBlockException checkRequestBlock(ClusterState state, IndexDeleteByQueryRequest request) {
return state.blocks().indexBlockedException(ClusterBlockLevel.WRITE, request.index());
}
@Override

View File

@ -25,6 +25,7 @@ import org.elasticsearch.action.support.replication.TransportShardReplicationOpe
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardIterator;
@ -79,8 +80,13 @@ public class TransportShardDeleteByQueryAction extends TransportShardReplication
}
@Override
protected void checkBlock(ShardDeleteByQueryRequest request, ClusterState state) {
state.blocks().indexBlockedRaiseException(ClusterBlockLevel.WRITE, request.index());
protected ClusterBlockException checkGlobalBlock(ClusterState state, ShardDeleteByQueryRequest request) {
return state.blocks().globalBlockedException(ClusterBlockLevel.WRITE);
}
@Override
protected ClusterBlockException checkRequestBlock(ClusterState state, ShardDeleteByQueryRequest request) {
return state.blocks().indexBlockedException(ClusterBlockLevel.WRITE, request.index());
}
@Override

View File

@ -20,13 +20,12 @@
package org.elasticsearch.action.get;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.TransportActions;
import org.elasticsearch.action.support.single.shard.TransportShardSingleOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
@ -67,26 +66,29 @@ public class TransportGetAction extends TransportShardSingleOperationAction<GetR
}
@Override
protected void checkBlock(GetRequest request, ClusterState state) {
state.blocks().indexBlockedRaiseException(ClusterBlockLevel.READ, request.index());
protected ClusterBlockException checkGlobalBlock(ClusterState state, GetRequest request) {
return state.blocks().globalBlockedException(ClusterBlockLevel.READ);
}
@Override
protected ShardIterator shards(ClusterState clusterState, GetRequest request) {
protected ClusterBlockException checkRequestBlock(ClusterState state, GetRequest request) {
return state.blocks().indexBlockedException(ClusterBlockLevel.READ, request.index());
}
@Override
protected ShardIterator shards(ClusterState state, GetRequest request) {
return clusterService.operationRouting()
.getShards(clusterService.state(), request.index(), request.type(), request.id(), request.routing(), request.preference());
}
@Override
protected void doExecute(GetRequest request, ActionListener<GetResponse> listener) {
protected void resolveRequest(ClusterState state, GetRequest request) {
if (request.realtime == null) {
request.realtime = this.realtime;
}
// update the routing (request#index here is possibly an alias)
MetaData metaData = clusterService.state().metaData();
request.routing(metaData.resolveIndexRouting(request.routing(), request.index()));
super.doExecute(request, listener);
request.routing(state.metaData().resolveIndexRouting(request.routing(), request.index()));
request.index(state.metaData().concreteIndex(request.index()));
}
@Override

View File

@ -25,6 +25,7 @@ import org.elasticsearch.action.TransportActions;
import org.elasticsearch.action.support.BaseAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.ShardId;
@ -55,9 +56,13 @@ public class TransportMultiGetAction extends BaseAction<MultiGetRequest, MultiGe
@Override
protected void doExecute(final MultiGetRequest request, final ActionListener<MultiGetResponse> listener) {
ClusterState clusterState = clusterService.state();
clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.READ);
Map<ShardId, MultiGetShardRequest> shardRequests = new HashMap<ShardId, MultiGetShardRequest>();
for (int i = 0; i < request.items.size(); i++) {
MultiGetRequest.Item item = request.items.get(i);
item.routing(clusterState.metaData().resolveIndexRouting(item.routing(), item.index()));
item.index(clusterState.metaData().concreteIndex(item.index()));
ShardId shardId = clusterService.operationRouting()
.getShards(clusterState, item.index(), item.type(), item.id(), item.routing(), null).shardId();

View File

@ -21,11 +21,11 @@ package org.elasticsearch.action.get;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.TransportActions;
import org.elasticsearch.action.support.single.shard.TransportShardSingleOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.common.inject.Inject;
@ -74,22 +74,28 @@ public class TransportShardMultiGetAction extends TransportShardSingleOperationA
}
@Override
protected void checkBlock(MultiGetShardRequest request, ClusterState state) {
state.blocks().indexBlockedRaiseException(ClusterBlockLevel.READ, request.index());
protected ClusterBlockException checkGlobalBlock(ClusterState state, MultiGetShardRequest request) {
return state.blocks().globalBlockedException(ClusterBlockLevel.READ);
}
@Override
protected ShardIterator shards(ClusterState clusterState, MultiGetShardRequest request) {
protected ClusterBlockException checkRequestBlock(ClusterState state, MultiGetShardRequest request) {
return state.blocks().indexBlockedException(ClusterBlockLevel.READ, request.index());
}
@Override
protected ShardIterator shards(ClusterState state, MultiGetShardRequest request) {
return clusterService.operationRouting()
.getShards(clusterService.state(), request.index(), request.shardId(), request.preference());
}
@Override
protected void doExecute(MultiGetShardRequest request, ActionListener<MultiGetShardResponse> listener) {
protected void resolveRequest(ClusterState state, MultiGetShardRequest request) {
if (request.realtime == null) {
request.realtime = this.realtime;
}
super.doExecute(request, listener);
// no need to set concrete index and routing here, it has already been set by the multi get action on the item
//request.index(state.metaData().concreteIndex(request.index()));
}
@Override

View File

@ -31,6 +31,7 @@ import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
@ -63,8 +64,6 @@ import java.util.concurrent.TimeUnit;
* Defaults to <tt>true</tt>.
* <li><b>allowIdGeneration</b>: If the id is set not, should it be generated. Defaults to <tt>true</tt>.
* </ul>
*
*
*/
public class TransportIndexAction extends TransportShardReplicationOperationAction<IndexRequest, IndexRequest, IndexResponse> {
@ -92,6 +91,7 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
@Override
protected void doExecute(final IndexRequest request, final ActionListener<IndexResponse> listener) {
// if we don't have a master, we don't have metadata, that's fine, let it find a master using create index API
if (autoCreateIndex && !clusterService.state().metaData().hasConcreteIndex(request.index())) {
request.beforeLocalFork(); // we fork on another thread...
createIndexAction.execute(new CreateIndexRequest(request.index()).cause("auto(index api)"), new ActionListener<CreateIndexResponse>() {
@ -119,7 +119,8 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
}
}
private void innerExecute(final IndexRequest request, final ActionListener<IndexResponse> listener) {
@Override
protected boolean resolveRequest(ClusterState state, IndexRequest request, ActionListener<IndexResponse> indexResponseActionListener) {
MetaData metaData = clusterService.state().metaData();
String aliasOrIndex = request.index();
request.index(metaData.concreteIndex(request.index()));
@ -128,7 +129,10 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
mappingMd = metaData.index(request.index()).mapping(request.type());
}
request.process(metaData, aliasOrIndex, mappingMd, allowIdGeneration);
return true;
}
private void innerExecute(final IndexRequest request, final ActionListener<IndexResponse> listener) {
super.doExecute(request, listener);
}
@ -163,8 +167,13 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
}
@Override
protected void checkBlock(IndexRequest request, ClusterState state) {
state.blocks().indexBlockedRaiseException(ClusterBlockLevel.WRITE, request.index());
protected ClusterBlockException checkGlobalBlock(ClusterState state, IndexRequest request) {
return state.blocks().globalBlockedException(ClusterBlockLevel.WRITE);
}
@Override
protected ClusterBlockException checkRequestBlock(ClusterState state, IndexRequest request) {
return state.blocks().indexBlockedException(ClusterBlockLevel.WRITE, request.index());
}
@Override

View File

@ -24,6 +24,8 @@ import org.elasticsearch.action.TransportActions;
import org.elasticsearch.action.support.single.custom.TransportSingleCustomOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.routing.ShardsIterator;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
@ -68,9 +70,19 @@ public class TransportPercolateAction extends TransportSingleCustomOperationActi
return TransportActions.PERCOLATE;
}
@Override
protected ClusterBlockException checkGlobalBlock(ClusterState state, PercolateRequest request) {
return state.blocks().globalBlockedException(ClusterBlockLevel.READ);
}
@Override
protected ClusterBlockException checkRequestBlock(ClusterState state, PercolateRequest request) {
request.index(state.metaData().concreteIndex(request.index()));
return state.blocks().indexBlockedException(ClusterBlockLevel.READ, request.index());
}
@Override
protected ShardsIterator shards(ClusterState clusterState, PercolateRequest request) {
request.index(clusterState.metaData().concreteIndex(request.index()));
return clusterState.routingTable().index(request.index()).randomAllActiveShardsIt();
}

View File

@ -24,6 +24,7 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.BaseAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
@ -88,7 +89,7 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
protected abstract ShardResponse shardOperation(ShardRequest request) throws ElasticSearchException;
protected abstract GroupShardsIterator shards(Request request, String[] concreteIndices, ClusterState clusterState);
protected abstract GroupShardsIterator shards(ClusterState clusterState, Request request, String[] concreteIndices);
protected boolean accumulateExceptions() {
return true;
@ -102,9 +103,9 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
return false;
}
protected void checkBlock(Request request, String[] indices, ClusterState state) {
protected abstract ClusterBlockException checkGlobalBlock(ClusterState state, Request request);
}
protected abstract ClusterBlockException checkRequestBlock(ClusterState state, Request request, String[] concreteIndices);
class AsyncBroadcastAction {
@ -126,23 +127,27 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
private final AtomicReferenceArray shardsResponses;
private final String[] concreteIndices;
AsyncBroadcastAction(Request request, ActionListener<Response> listener) {
this.request = request;
this.listener = listener;
clusterState = clusterService.state();
ClusterBlockException blockException = checkGlobalBlock(clusterState, request);
if (blockException != null) {
throw blockException;
}
// update to concrete indices
concreteIndices = clusterState.metaData().concreteIndices(request.indices(), false, true);
checkBlock(request, concreteIndices, clusterState);
String[] concreteIndices = clusterState.metaData().concreteIndices(request.indices(), false, true);
blockException = checkRequestBlock(clusterState, request, concreteIndices);
if (blockException != null) {
throw blockException;
}
nodes = clusterState.nodes();
shardsIts = shards(request, concreteIndices, clusterState);
shardsIts = shards(clusterState, request, concreteIndices);
expectedOps = shardsIts.size();
shardsResponses = new AtomicReferenceArray<Object>(expectedOps);
}

View File

@ -26,6 +26,7 @@ import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.BaseAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.common.inject.Inject;
@ -60,12 +61,17 @@ public abstract class TransportIndexReplicationOperationAction<Request extends I
@Override
protected void doExecute(final Request request, final ActionListener<Response> listener) {
ClusterState clusterState = clusterService.state();
ClusterBlockException blockException = checkGlobalBlock(clusterState, request);
if (blockException != null) {
throw blockException;
}
// update to concrete index
request.index(clusterState.metaData().concreteIndex(request.index()));
checkBlock(request, clusterState);
blockException = checkRequestBlock(clusterState, request);
if (blockException != null) {
throw blockException;
}
GroupShardsIterator groups;
try {
@ -122,9 +128,9 @@ public abstract class TransportIndexReplicationOperationAction<Request extends I
protected abstract boolean accumulateExceptions();
protected void checkBlock(Request request, ClusterState state) {
protected abstract ClusterBlockException checkGlobalBlock(ClusterState state, Request request);
}
protected abstract ClusterBlockException checkRequestBlock(ClusterState state, Request request);
private class TransportHandler extends BaseTransportRequestHandler<Request> {

View File

@ -25,6 +25,7 @@ import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.BaseAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
@ -65,12 +66,16 @@ public abstract class TransportIndicesReplicationOperationAction<Request extends
@Override
protected void doExecute(final Request request, final ActionListener<Response> listener) {
ClusterState clusterState = clusterService.state();
ClusterBlockException blockException = checkGlobalBlock(clusterState, request);
if (blockException != null) {
throw blockException;
}
// get actual indices
String[] concreteIndices = clusterState.metaData().concreteIndices(request.indices());
checkBlock(request, concreteIndices, clusterState);
blockException = checkRequestBlock(clusterState, request, concreteIndices);
if (blockException != null) {
throw blockException;
}
final AtomicInteger indexCounter = new AtomicInteger();
final AtomicInteger completionCounter = new AtomicInteger(concreteIndices.length);
@ -120,9 +125,9 @@ public abstract class TransportIndicesReplicationOperationAction<Request extends
protected abstract boolean accumulateExceptions();
protected void checkBlock(Request request, String[] concreteIndices, ClusterState state) {
protected abstract ClusterBlockException checkGlobalBlock(ClusterState state, Request request);
}
protected abstract ClusterBlockException checkRequestBlock(ClusterState state, Request request, String[] concreteIndices);
private class TransportHandler extends BaseTransportRequestHandler<Request> {

View File

@ -28,10 +28,12 @@ import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.TimeoutClusterStateListener;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
@ -42,7 +44,6 @@ import org.elasticsearch.index.IndexShardMissingException;
import org.elasticsearch.index.engine.DocumentAlreadyExistsException;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndexMissingException;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.node.NodeClosedException;
@ -128,8 +129,17 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
protected abstract boolean checkWriteConsistency();
protected void checkBlock(Request request, ClusterState state) {
protected abstract ClusterBlockException checkGlobalBlock(ClusterState state, Request request);
protected abstract ClusterBlockException checkRequestBlock(ClusterState state, Request request);
/**
* Resolves the request, by default, simply setting the concrete index (if its aliased one). If the resolve
* means a different execution, then return false here to indicate not to continue and execute this request.
*/
protected boolean resolveRequest(ClusterState state, Request request, ActionListener<Response> listener) {
request.index(state.metaData().concreteIndex(request.index()));
return true;
}
protected TransportRequestOptions transportOptions() {
@ -317,12 +327,6 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
this.request = request;
this.listener = listener;
// update to the concrete index
ClusterState clusterState = clusterService.state();
request.index(clusterState.metaData().concreteIndex(request.index()));
checkBlock(request, clusterState);
if (request.replicationType() != ReplicationType.DEFAULT) {
replicationType = request.replicationType();
} else {
@ -340,11 +344,30 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
public boolean start(final boolean fromClusterEvent) throws ElasticSearchException {
final ClusterState clusterState = clusterService.state();
nodes = clusterState.nodes();
if (!clusterState.routingTable().hasIndex(request.index())) {
retry(fromClusterEvent, null);
return false;
}
try {
ClusterBlockException blockException = checkGlobalBlock(clusterState, request);
if (blockException != null) {
if (blockException.retryable()) {
retry(fromClusterEvent, blockException);
return false;
} else {
throw blockException;
}
}
// check if we need to execute, and if not, return
if (!resolveRequest(clusterState, request, listener)) {
return true;
}
blockException = checkRequestBlock(clusterState, request);
if (blockException != null) {
if (blockException.retryable()) {
retry(fromClusterEvent, blockException);
return false;
} else {
throw blockException;
}
}
shardIt = shards(clusterState, request);
shardIt = shards(clusterState, request);
} catch (Exception e) {
listener.onFailure(e);
@ -353,7 +376,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
// no shardIt, might be in the case between index gateway recovery and shardIt initialization
if (shardIt.size() == 0) {
retry(fromClusterEvent, shardIt.shardId());
retry(fromClusterEvent, null);
return false;
}
@ -366,7 +389,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
continue;
}
if (!shard.active() || !nodes.nodeExists(shard.currentNodeId())) {
retry(fromClusterEvent, shard.shardId());
retry(fromClusterEvent, null);
return false;
}
@ -385,7 +408,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
}
if (shardIt.sizeActive() < requiredNumber) {
retry(fromClusterEvent, shard.shardId());
retry(fromClusterEvent, null);
return false;
}
}
@ -434,7 +457,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
primaryOperationStarted.set(false);
// we already marked it as started when we executed it (removed the listener) so pass false
// to re-add to the cluster listener
retry(false, shard.shardId());
retry(false, null);
} else {
listener.onFailure(exp);
}
@ -445,13 +468,13 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
}
// we should never get here, but here we go
if (!foundPrimary) {
retry(fromClusterEvent, shardIt.shardId());
retry(fromClusterEvent, null);
return false;
}
return true;
}
void retry(boolean fromClusterEvent, final ShardId shardId) {
void retry(boolean fromClusterEvent, @Nullable final Throwable failure) {
if (!fromClusterEvent) {
// make it threaded operation so we fork on the discovery listener thread
request.beforeLocalFork();
@ -487,8 +510,15 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
return;
}
clusterService.remove(this);
final UnavailableShardsException failure = new UnavailableShardsException(shardId, "[" + shardIt.size() + "] shardIt, [" + shardIt.sizeActive() + "] active : Timeout waiting for [" + timeValue + "], request: " + request.toString());
listener.onFailure(failure);
Throwable listenerFailure = failure;
if (listenerFailure == null) {
if (shardIt == null) {
listenerFailure = new UnavailableShardsException(null, "no available shards: Timeout waiting for [" + timeValue + "], request: " + request.toString());
} else {
listenerFailure = new UnavailableShardsException(shardIt.shardId(), "[" + shardIt.size() + "] shardIt, [" + shardIt.sizeActive() + "] active : Timeout waiting for [" + timeValue + "], request: " + request.toString());
}
}
listener.onFailure(listenerFailure);
}
});
}
@ -501,7 +531,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
} catch (Exception e) {
// shard has not been allocated yet, retry it here
if (retryPrimaryException(e)) {
retry(fromDiscoveryListener, shard.shardId());
retry(fromDiscoveryListener, null);
return;
}
if (e instanceof ElasticSearchException && ((ElasticSearchException) e).status() == RestStatus.CONFLICT) {

View File

@ -26,6 +26,7 @@ import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.action.support.BaseAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
@ -85,9 +86,9 @@ public abstract class TransportSingleCustomOperationAction<Request extends Singl
protected abstract Response newResponse();
protected void checkBlock(Request request, ClusterState state) {
protected abstract ClusterBlockException checkGlobalBlock(ClusterState state, Request request);
}
protected abstract ClusterBlockException checkRequestBlock(ClusterState state, Request request);
private class AsyncSingleAction {
@ -105,8 +106,14 @@ public abstract class TransportSingleCustomOperationAction<Request extends Singl
ClusterState clusterState = clusterService.state();
nodes = clusterState.nodes();
checkBlock(request, clusterState);
ClusterBlockException blockException = checkGlobalBlock(clusterState, request);
if (blockException != null) {
throw blockException;
}
blockException = checkRequestBlock(clusterState, request);
if (blockException != null) {
throw blockException;
}
this.shardsIt = shards(clusterState, request);
}

View File

@ -28,10 +28,12 @@ import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.TimeoutClusterStateListener;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.shard.ShardId;
@ -79,9 +81,9 @@ public abstract class TransportInstanceSingleOperationAction<Request extends Ins
protected abstract Response newResponse();
protected void checkBlock(Request request, ClusterState state) {
protected abstract ClusterBlockException checkGlobalBlock(ClusterState state, Request request);
}
protected abstract ClusterBlockException checkRequestBlock(ClusterState state, Request request);
protected boolean retryOnFailure(Throwable e) {
return false;
@ -111,14 +113,6 @@ public abstract class TransportInstanceSingleOperationAction<Request extends Ins
private AsyncSingleAction(Request request, ActionListener<Response> listener) {
this.request = request;
this.listener = listener;
ClusterState clusterState = clusterService.state();
nodes = clusterState.nodes();
request.index(clusterState.metaData().concreteIndex(request.index()));
checkBlock(request, clusterState);
}
public void start() {
@ -128,11 +122,26 @@ public abstract class TransportInstanceSingleOperationAction<Request extends Ins
public boolean start(final boolean fromClusterEvent) throws ElasticSearchException {
final ClusterState clusterState = clusterService.state();
nodes = clusterState.nodes();
if (!clusterState.routingTable().hasIndex(request.index())) {
retry(fromClusterEvent);
return false;
}
try {
ClusterBlockException blockException = checkGlobalBlock(clusterState, request);
if (blockException != null) {
if (blockException.retryable()) {
retry(fromClusterEvent, blockException);
return false;
} else {
throw blockException;
}
}
request.index(clusterState.metaData().concreteIndex(request.index()));
blockException = checkRequestBlock(clusterState, request);
if (blockException != null) {
if (blockException.retryable()) {
retry(fromClusterEvent, blockException);
return false;
} else {
throw blockException;
}
}
shardIt = shards(clusterState, request);
} catch (Exception e) {
listener.onFailure(e);
@ -141,7 +150,7 @@ public abstract class TransportInstanceSingleOperationAction<Request extends Ins
// no shardIt, might be in the case between index gateway recovery and shardIt initialization
if (shardIt.size() == 0) {
retry(fromClusterEvent);
retry(fromClusterEvent, null);
return false;
}
@ -152,7 +161,7 @@ public abstract class TransportInstanceSingleOperationAction<Request extends Ins
assert shard != null;
if (!shard.active()) {
retry(fromClusterEvent);
retry(fromClusterEvent, null);
return false;
}
@ -170,7 +179,7 @@ public abstract class TransportInstanceSingleOperationAction<Request extends Ins
shardOperation(request, listener);
} catch (Exception e) {
if (retryOnFailure(e)) {
retry(fromClusterEvent);
retry(fromClusterEvent, null);
} else {
listener.onFailure(e);
}
@ -204,7 +213,7 @@ public abstract class TransportInstanceSingleOperationAction<Request extends Ins
operationStarted.set(false);
// we already marked it as started when we executed it (removed the listener) so pass false
// to re-add to the cluster listener
retry(false);
retry(false, null);
} else {
listener.onFailure(exp);
}
@ -214,7 +223,7 @@ public abstract class TransportInstanceSingleOperationAction<Request extends Ins
return true;
}
void retry(boolean fromClusterEvent) {
void retry(final boolean fromClusterEvent, final @Nullable Throwable failure) {
if (!fromClusterEvent) {
// make it threaded operation so we fork on the discovery listener thread
request.beforeLocalFork();
@ -249,13 +258,15 @@ public abstract class TransportInstanceSingleOperationAction<Request extends Ins
return;
}
clusterService.remove(this);
final UnavailableShardsException failure;
if (shardIt == null) {
failure = new UnavailableShardsException(new ShardId(request.index(), -1), "Timeout waiting for [" + timeValue + "], request: " + request.toString());
} else {
failure = new UnavailableShardsException(shardIt.shardId(), "[" + shardIt.size() + "] shardIt, [" + shardIt.sizeActive() + "] active : Timeout waiting for [" + timeValue + "], request: " + request.toString());
Throwable listenFailure = failure;
if (listenFailure == null) {
if (shardIt == null) {
listenFailure = new UnavailableShardsException(new ShardId(request.index(), -1), "Timeout waiting for [" + timeValue + "], request: " + request.toString());
} else {
listenFailure = new UnavailableShardsException(shardIt.shardId(), "[" + shardIt.size() + "] shardIt, [" + shardIt.sizeActive() + "] active : Timeout waiting for [" + timeValue + "], request: " + request.toString());
}
}
listener.onFailure(failure);
listener.onFailure(listenFailure);
}
});
}

View File

@ -26,6 +26,7 @@ import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.action.support.BaseAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.ShardIterator;
@ -81,11 +82,15 @@ public abstract class TransportShardSingleOperationAction<Request extends Single
protected abstract Response newResponse();
protected void checkBlock(Request request, ClusterState state) {
protected abstract ClusterBlockException checkGlobalBlock(ClusterState state, Request request);
protected abstract ClusterBlockException checkRequestBlock(ClusterState state, Request request);
protected void resolveRequest(ClusterState state, Request request) {
request.index(state.metaData().concreteIndex(request.index()));
}
protected abstract ShardIterator shards(ClusterState clusterState, Request request) throws ElasticSearchException;
protected abstract ShardIterator shards(ClusterState state, Request request) throws ElasticSearchException;
class AsyncSingleAction {
@ -102,12 +107,16 @@ public abstract class TransportShardSingleOperationAction<Request extends Single
this.listener = listener;
ClusterState clusterState = clusterService.state();
nodes = clusterState.nodes();
request.index(clusterState.metaData().concreteIndex(request.index()));
checkBlock(request, clusterState);
ClusterBlockException blockException = checkGlobalBlock(clusterState, request);
if (blockException != null) {
throw blockException;
}
resolveRequest(clusterState, request);
blockException = checkRequestBlock(clusterState, request);
if (blockException != null) {
throw blockException;
}
this.shardIt = shards(clusterState, request);
}

View File

@ -35,6 +35,7 @@ import org.elasticsearch.action.support.single.instance.TransportInstanceSingleO
import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.routing.PlainShardIterator;
import org.elasticsearch.cluster.routing.ShardIterator;
@ -108,8 +109,13 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
}
@Override
protected void checkBlock(UpdateRequest request, ClusterState state) {
state.blocks().indexBlockedRaiseException(ClusterBlockLevel.WRITE, request.index());
protected ClusterBlockException checkGlobalBlock(ClusterState state, UpdateRequest request) {
return state.blocks().globalBlockedException(ClusterBlockLevel.WRITE);
}
@Override
protected ClusterBlockException checkRequestBlock(ClusterState state, UpdateRequest request) {
return state.blocks().indexBlockedException(ClusterBlockLevel.WRITE, request.index());
}
@Override

View File

@ -67,6 +67,7 @@ public class MinimumMasterNodesTests extends AbstractZenNodesTests {
Settings settings = settingsBuilder()
.put("discovery.type", "zen")
.put("discovery.zen.minimum_master_nodes", 2)
.put("discovery.zen.ping_timeout", "200ms")
.put("discovery.initial_state_timeout", "500ms")
@ -194,6 +195,7 @@ public class MinimumMasterNodesTests extends AbstractZenNodesTests {
Settings settings = settingsBuilder()
.put("discovery.type", "zen")
.put("discovery.zen.minimum_master_nodes", 3)
.put("discovery.zen.ping_timeout", "200ms")
.put("discovery.initial_state_timeout", "500ms")

View File

@ -0,0 +1,123 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test.integration.cluster;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.node.Node;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.integration.AbstractNodesTests;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
/**
*/
public class NoMasterNodeTests extends AbstractNodesTests {
@AfterMethod
public void cleanAndCloseNodes() throws Exception {
closeAllNodes();
}
@Test
public void testNoMasterActions() throws Exception {
Settings settings = settingsBuilder()
.put("discovery.type", "zen")
.put("action.auto_create_index", false)
.put("discovery.zen.minimum_master_nodes", 2)
.put("discovery.zen.ping_timeout", "200ms")
.put("discovery.initial_state_timeout", "500ms")
.put("index.number_of_shards", 1)
.build();
TimeValue timeout = TimeValue.timeValueMillis(200);
Node node = startNode("node1", settings);
// start a second node, create an index, and then shut it down so we have no master block
Node node2 = startNode("node2", settings);
node.client().admin().indices().prepareCreate("test").execute().actionGet();
node2.close();
ClusterState state = node.client().admin().cluster().prepareState().setLocal(true).execute().actionGet().state();
assertThat(state.blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK), equalTo(true));
try {
node.client().prepareGet("test", "type1", "1").execute().actionGet();
assert false;
} catch (ClusterBlockException e) {
assertThat(e.status(), equalTo(RestStatus.SERVICE_UNAVAILABLE));
}
try {
node.client().prepareMultiGet().add("test", "type1", "1").execute().actionGet();
assert false;
} catch (ClusterBlockException e) {
assertThat(e.status(), equalTo(RestStatus.SERVICE_UNAVAILABLE));
}
try {
node.client().preparePercolate("test", "type1").setSource(XContentFactory.jsonBuilder().startObject().endObject()).execute().actionGet();
assert false;
} catch (ClusterBlockException e) {
assertThat(e.status(), equalTo(RestStatus.SERVICE_UNAVAILABLE));
}
long now = System.currentTimeMillis();
try {
node.client().prepareUpdate("test", "type1", "1").setScript("test script").setTimeout(timeout).execute().actionGet();
assert false;
} catch (ClusterBlockException e) {
assertThat(System.currentTimeMillis() - now, greaterThan(timeout.millis() - 50));
assertThat(e.status(), equalTo(RestStatus.SERVICE_UNAVAILABLE));
}
try {
node.client().admin().indices().prepareAnalyze("test", "this is a test").execute().actionGet();
assert false;
} catch (ClusterBlockException e) {
assertThat(e.status(), equalTo(RestStatus.SERVICE_UNAVAILABLE));
}
try {
node.client().prepareCount("test").execute().actionGet();
assert false;
} catch (ClusterBlockException e) {
assertThat(e.status(), equalTo(RestStatus.SERVICE_UNAVAILABLE));
}
now = System.currentTimeMillis();
try {
node.client().prepareIndex("test", "type1", "1").setSource(XContentFactory.jsonBuilder().startObject().endObject()).setTimeout(timeout).execute().actionGet();
assert false;
} catch (ClusterBlockException e) {
assertThat(System.currentTimeMillis() - now, greaterThan(timeout.millis() - 50));
assertThat(e.status(), equalTo(RestStatus.SERVICE_UNAVAILABLE));
}
}
}