Use correct block levels for TRA subclasses (#22224)

Subclasses of TransportReplicationAction can currently chose to implement block levels for which the request will be blocked.
- Refresh/Flush was using the block level METADATA_WRITE although they don't operate at the cluster meta data level (but more like shard level meta data which is not represented in the block levels). Their level has been changed to null so that they can operate freely in the presence of blocks.
- GlobChkptSync was using WRITE although it does not make any changes to the actual documents of a shard. The level has been changed to null so that it can operate freely in the presence of blocks.
The commit also adds a check for closed indices in TRA so that the right exception is thrown if refresh/flush/checkpoint syncing is attempted on a closed index (before it was throwing an IndexNotFoundException, now it's throwing IndexClosedException).
This commit is contained in:
Yannick Welsch 2016-12-19 14:36:58 +01:00 committed by GitHub
parent b2e93d2870
commit 1cabf66bd5
7 changed files with 86 additions and 79 deletions

View File

@ -64,16 +64,6 @@ public class TransportShardFlushAction extends TransportReplicationAction<ShardF
return new ReplicaResult();
}
@Override
protected ClusterBlockLevel globalBlockLevel() {
return ClusterBlockLevel.METADATA_WRITE;
}
@Override
protected ClusterBlockLevel indexBlockLevel() {
return ClusterBlockLevel.METADATA_WRITE;
}
@Override
protected boolean shouldExecuteReplication(Settings settings) {
return true;

View File

@ -67,16 +67,6 @@ public class TransportShardRefreshAction
return new ReplicaResult();
}
@Override
protected ClusterBlockLevel globalBlockLevel() {
return ClusterBlockLevel.METADATA_WRITE;
}
@Override
protected ClusterBlockLevel indexBlockLevel() {
return ClusterBlockLevel.METADATA_WRITE;
}
@Override
protected boolean shouldExecuteReplication(Settings settings) {
return true;

View File

@ -44,6 +44,7 @@ import org.elasticsearch.cluster.routing.AllocationId;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lease.Releasable;
@ -58,6 +59,7 @@ import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardNotFoundException;
import org.elasticsearch.indices.IndexClosedException;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.tasks.Task;
@ -183,17 +185,19 @@ public abstract class TransportReplicationAction<
protected abstract ReplicaResult shardOperationOnReplica(ReplicaRequest shardRequest, IndexShard replica) throws Exception;
/**
* Cluster level block to check before request execution
* Cluster level block to check before request execution. Returning null means that no blocks need to be checked.
*/
@Nullable
protected ClusterBlockLevel globalBlockLevel() {
return ClusterBlockLevel.WRITE;
return null;
}
/**
* Index level block to check before request execution
* Index level block to check before request execution. Returning null means that no blocks need to be checked.
*/
@Nullable
protected ClusterBlockLevel indexBlockLevel() {
return ClusterBlockLevel.WRITE;
return null;
}
/**
@ -643,6 +647,9 @@ public abstract class TransportReplicationAction<
retry(new IndexNotFoundException(concreteIndex));
return;
}
if (indexMetaData.getState() == IndexMetaData.State.CLOSE) {
throw new IndexClosedException(indexMetaData.getIndex());
}
// resolve all derived request fields, so we can route and apply it
resolveRequest(state.metaData(), indexMetaData, request);
@ -719,15 +726,21 @@ public abstract class TransportReplicationAction<
}
private boolean handleBlockExceptions(ClusterState state) {
ClusterBlockException blockException = state.blocks().globalBlockedException(globalBlockLevel());
if (blockException != null) {
handleBlockException(blockException);
return true;
ClusterBlockLevel globalBlockLevel = globalBlockLevel();
if (globalBlockLevel != null) {
ClusterBlockException blockException = state.blocks().globalBlockedException(globalBlockLevel);
if (blockException != null) {
handleBlockException(blockException);
return true;
}
}
blockException = state.blocks().indexBlockedException(indexBlockLevel(), concreteIndex(state));
if (blockException != null) {
handleBlockException(blockException);
return true;
ClusterBlockLevel indexBlockLevel = indexBlockLevel();
if (indexBlockLevel != null) {
ClusterBlockException blockException = state.blocks().indexBlockedException(indexBlockLevel, concreteIndex(state));
if (blockException != null) {
handleBlockException(blockException);
return true;
}
}
return false;
}

View File

@ -25,6 +25,7 @@ import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.WriteResponse;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
@ -184,6 +185,16 @@ public abstract class TransportWriteAction<
}
}
@Override
protected ClusterBlockLevel globalBlockLevel() {
return ClusterBlockLevel.WRITE;
}
@Override
protected ClusterBlockLevel indexBlockLevel() {
return ClusterBlockLevel.WRITE;
}
/**
* callback used by {@link AsyncAfterWriteAction} to notify that all post
* process actions have been executed

View File

@ -46,7 +46,7 @@ public class FlushBlocksIT extends ESIntegTestCase {
}
// Request is not blocked
for (String blockSetting : Arrays.asList(SETTING_BLOCKS_READ, SETTING_BLOCKS_WRITE)) {
for (String blockSetting : Arrays.asList(SETTING_BLOCKS_READ, SETTING_BLOCKS_WRITE, SETTING_READ_ONLY, SETTING_BLOCKS_METADATA)) {
try {
enableIndexBlock("test", blockSetting);
FlushResponse response = client().admin().indices().prepareFlush("test").execute().actionGet();
@ -56,28 +56,5 @@ public class FlushBlocksIT extends ESIntegTestCase {
disableIndexBlock("test", blockSetting);
}
}
// Request is blocked
for (String blockSetting : Arrays.asList(SETTING_READ_ONLY, SETTING_BLOCKS_METADATA)) {
try {
enableIndexBlock("test", blockSetting);
FlushResponse flushResponse = client().admin().indices().prepareFlush("test").get();
assertBlocked(flushResponse);
} finally {
disableIndexBlock("test", blockSetting);
}
}
// Flushing all indices is blocked when the cluster is read-only
try {
FlushResponse response = client().admin().indices().prepareFlush().execute().actionGet();
assertNoFailures(response);
assertThat(response.getSuccessfulShards(), equalTo(numShards.totalNumShards));
setClusterReadOnly(true);
assertBlocked(client().admin().indices().prepareFlush().get());
} finally {
setClusterReadOnly(false);
}
}
}

View File

@ -42,7 +42,7 @@ public class RefreshBlocksIT extends ESIntegTestCase {
NumShards numShards = getNumShards("test");
// Request is not blocked
for (String blockSetting : Arrays.asList(SETTING_BLOCKS_READ, SETTING_BLOCKS_WRITE)) {
for (String blockSetting : Arrays.asList(SETTING_BLOCKS_READ, SETTING_BLOCKS_WRITE, SETTING_READ_ONLY, SETTING_BLOCKS_METADATA)) {
try {
enableIndexBlock("test", blockSetting);
RefreshResponse response = client().admin().indices().prepareRefresh("test").execute().actionGet();
@ -52,27 +52,5 @@ public class RefreshBlocksIT extends ESIntegTestCase {
disableIndexBlock("test", blockSetting);
}
}
// Request is blocked
for (String blockSetting : Arrays.asList(SETTING_READ_ONLY, SETTING_BLOCKS_METADATA)) {
try {
enableIndexBlock("test", blockSetting);
assertBlocked(client().admin().indices().prepareRefresh("test").get());
} finally {
disableIndexBlock("test", blockSetting);
}
}
// Refreshing all indices is blocked when the cluster is read-only
try {
RefreshResponse response = client().admin().indices().prepareRefresh().execute().actionGet();
assertNoFailures(response);
assertThat(response.getSuccessfulShards(), equalTo(numShards.totalNumShards));
setClusterReadOnly(true);
assertBlocked(client().admin().indices().prepareRefresh().get());
} finally {
setClusterReadOnly(false);
}
}
}

View File

@ -22,6 +22,7 @@ package org.elasticsearch.action.support.replication;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.UnavailableShardsException;
import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.PlainActionFuture;
@ -60,7 +61,9 @@ import org.elasticsearch.index.shard.IndexShardClosedException;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardNotFoundException;
import org.elasticsearch.indices.IndexClosedException;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.cluster.ClusterStateChanges;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESTestCase;
@ -184,6 +187,12 @@ public class TransportReplicationActionTests extends ESTestCase {
Request request = new Request();
PlainActionFuture<Response> listener = new PlainActionFuture<>();
ReplicationTask task = maybeTask();
Action action = new Action(Settings.EMPTY, "testActionWithBlocks", transportService, clusterService, shardStateAction, threadPool) {
@Override
protected ClusterBlockLevel globalBlockLevel() {
return ClusterBlockLevel.WRITE;
}
};
ClusterBlocks.Builder block = ClusterBlocks.builder()
.addGlobalBlock(new ClusterBlock(1, "non retryable", false, true, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL));
@ -216,6 +225,17 @@ public class TransportReplicationActionTests extends ESTestCase {
assertListenerThrows("primary phase should fail operation when moving from a retryable block to a non-retryable one", listener,
ClusterBlockException.class);
assertIndexShardUninitialized();
action = new Action(Settings.EMPTY, "testActionWithNoBlocks", transportService, clusterService, shardStateAction, threadPool) {
@Override
protected ClusterBlockLevel globalBlockLevel() {
return null;
}
};
listener = new PlainActionFuture<>();
reroutePhase = action.new ReroutePhase(task, new Request().timeout("5ms"), listener);
reroutePhase.run();
assertListenerThrows("should fail with an IndexNotFoundException when no blocks checked", listener, IndexNotFoundException.class);
}
public void assertIndexShardUninitialized() {
@ -337,6 +357,34 @@ public class TransportReplicationActionTests extends ESTestCase {
}
public void testClosedIndexOnReroute() throws InterruptedException {
final String index = "test";
// no replicas in oder to skip the replication part
setState(clusterService,
new ClusterStateChanges().closeIndices(state(index, true, ShardRoutingState.UNASSIGNED), new CloseIndexRequest(index)));
logger.debug("--> using initial state:\n{}", clusterService.state());
Request request = new Request(new ShardId("test", "_na_", 0)).timeout("1ms");
PlainActionFuture<Response> listener = new PlainActionFuture<>();
ReplicationTask task = maybeTask();
ClusterBlockLevel indexBlockLevel = randomBoolean() ? ClusterBlockLevel.WRITE : null;
Action action = new Action(Settings.EMPTY, "testActionWithBlocks", transportService, clusterService, shardStateAction, threadPool) {
@Override
protected ClusterBlockLevel indexBlockLevel() {
return indexBlockLevel;
}
};
Action.ReroutePhase reroutePhase = action.new ReroutePhase(task, request, listener);
reroutePhase.run();
if (indexBlockLevel == ClusterBlockLevel.WRITE) {
assertListenerThrows("must throw block exception", listener, ClusterBlockException.class);
} else {
assertListenerThrows("must throw index closed exception", listener, IndexClosedException.class);
}
assertPhase(task, "failed");
assertFalse(request.isRetrySet.get());
}
public void testStalePrimaryShardOnReroute() throws InterruptedException {
final String index = "test";
final ShardId shardId = new ShardId(index, "_na_", 0);