Revert "[RCI] Check blocks while having index shard permit in TransportReplicationAction (#35332)"
This reverts commit 31567cefb4
.
This commit is contained in:
parent
76b77dbaa4
commit
d3d7c0158e
|
@ -235,39 +235,9 @@ public abstract class TransportReplicationAction<
|
||||||
return TransportRequestOptions.EMPTY;
|
return TransportRequestOptions.EMPTY;
|
||||||
}
|
}
|
||||||
|
|
||||||
private String concreteIndex(final ClusterState state, final ReplicationRequest request) {
|
|
||||||
return resolveIndex() ? indexNameExpressionResolver.concreteSingleIndex(state, request).getName() : request.index();
|
|
||||||
}
|
|
||||||
|
|
||||||
private ClusterBlockException blockExceptions(final ClusterState state, final String indexName) {
|
|
||||||
ClusterBlockLevel globalBlockLevel = globalBlockLevel();
|
|
||||||
if (globalBlockLevel != null) {
|
|
||||||
ClusterBlockException blockException = state.blocks().globalBlockedException(globalBlockLevel);
|
|
||||||
if (blockException != null) {
|
|
||||||
return blockException;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
ClusterBlockLevel indexBlockLevel = indexBlockLevel();
|
|
||||||
if (indexBlockLevel != null) {
|
|
||||||
ClusterBlockException blockException = state.blocks().indexBlockedException(indexBlockLevel, indexName);
|
|
||||||
if (blockException != null) {
|
|
||||||
return blockException;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
protected boolean retryPrimaryException(final Throwable e) {
|
protected boolean retryPrimaryException(final Throwable e) {
|
||||||
return e.getClass() == ReplicationOperation.RetryOnPrimaryException.class
|
return e.getClass() == ReplicationOperation.RetryOnPrimaryException.class
|
||||||
|| TransportActions.isShardNotAvailableException(e)
|
|| TransportActions.isShardNotAvailableException(e);
|
||||||
|| isRetryableClusterBlockException(e);
|
|
||||||
}
|
|
||||||
|
|
||||||
boolean isRetryableClusterBlockException(final Throwable e) {
|
|
||||||
if (e instanceof ClusterBlockException) {
|
|
||||||
return ((ClusterBlockException) e).retryable();
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
protected class OperationTransportHandler implements TransportRequestHandler<Request> {
|
protected class OperationTransportHandler implements TransportRequestHandler<Request> {
|
||||||
|
@ -340,15 +310,6 @@ public abstract class TransportReplicationAction<
|
||||||
@Override
|
@Override
|
||||||
public void onResponse(PrimaryShardReference primaryShardReference) {
|
public void onResponse(PrimaryShardReference primaryShardReference) {
|
||||||
try {
|
try {
|
||||||
final ClusterState clusterState = clusterService.state();
|
|
||||||
final IndexMetaData indexMetaData = clusterState.metaData().getIndexSafe(primaryShardReference.routingEntry().index());
|
|
||||||
|
|
||||||
final ClusterBlockException blockException = blockExceptions(clusterState, indexMetaData.getIndex().getName());
|
|
||||||
if (blockException != null) {
|
|
||||||
logger.trace("cluster is blocked, action failed on primary", blockException);
|
|
||||||
throw blockException;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (primaryShardReference.isRelocated()) {
|
if (primaryShardReference.isRelocated()) {
|
||||||
primaryShardReference.close(); // release shard operation lock as soon as possible
|
primaryShardReference.close(); // release shard operation lock as soon as possible
|
||||||
setPhase(replicationTask, "primary_delegation");
|
setPhase(replicationTask, "primary_delegation");
|
||||||
|
@ -362,7 +323,7 @@ public abstract class TransportReplicationAction<
|
||||||
response.readFrom(in);
|
response.readFrom(in);
|
||||||
return response;
|
return response;
|
||||||
};
|
};
|
||||||
DiscoveryNode relocatingNode = clusterState.nodes().get(primary.relocatingNodeId());
|
DiscoveryNode relocatingNode = clusterService.state().nodes().get(primary.relocatingNodeId());
|
||||||
transportService.sendRequest(relocatingNode, transportPrimaryAction,
|
transportService.sendRequest(relocatingNode, transportPrimaryAction,
|
||||||
new ConcreteShardRequest<>(request, primary.allocationId().getRelocationId(), primaryTerm),
|
new ConcreteShardRequest<>(request, primary.allocationId().getRelocationId(), primaryTerm),
|
||||||
transportOptions,
|
transportOptions,
|
||||||
|
@ -735,42 +696,35 @@ public abstract class TransportReplicationAction<
|
||||||
protected void doRun() {
|
protected void doRun() {
|
||||||
setPhase(task, "routing");
|
setPhase(task, "routing");
|
||||||
final ClusterState state = observer.setAndGetObservedState();
|
final ClusterState state = observer.setAndGetObservedState();
|
||||||
final String concreteIndex = concreteIndex(state, request);
|
if (handleBlockExceptions(state)) {
|
||||||
final ClusterBlockException blockException = blockExceptions(state, concreteIndex);
|
return;
|
||||||
if (blockException != null) {
|
}
|
||||||
if (blockException.retryable()) {
|
|
||||||
logger.trace("cluster is blocked, scheduling a retry", blockException);
|
// request does not have a shardId yet, we need to pass the concrete index to resolve shardId
|
||||||
retry(blockException);
|
final String concreteIndex = concreteIndex(state);
|
||||||
} else {
|
final IndexMetaData indexMetaData = state.metaData().index(concreteIndex);
|
||||||
finishAsFailed(blockException);
|
if (indexMetaData == null) {
|
||||||
}
|
retry(new IndexNotFoundException(concreteIndex));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (indexMetaData.getState() == IndexMetaData.State.CLOSE) {
|
||||||
|
throw new IndexClosedException(indexMetaData.getIndex());
|
||||||
|
}
|
||||||
|
|
||||||
|
// resolve all derived request fields, so we can route and apply it
|
||||||
|
resolveRequest(indexMetaData, request);
|
||||||
|
assert request.shardId() != null : "request shardId must be set in resolveRequest";
|
||||||
|
assert request.waitForActiveShards() != ActiveShardCount.DEFAULT : "request waitForActiveShards must be set in resolveRequest";
|
||||||
|
|
||||||
|
final ShardRouting primary = primary(state);
|
||||||
|
if (retryIfUnavailable(state, primary)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
final DiscoveryNode node = state.nodes().get(primary.currentNodeId());
|
||||||
|
if (primary.currentNodeId().equals(state.nodes().getLocalNodeId())) {
|
||||||
|
performLocalAction(state, primary, node, indexMetaData);
|
||||||
} else {
|
} else {
|
||||||
// request does not have a shardId yet, we need to pass the concrete index to resolve shardId
|
performRemoteAction(state, primary, node);
|
||||||
final IndexMetaData indexMetaData = state.metaData().index(concreteIndex);
|
|
||||||
if (indexMetaData == null) {
|
|
||||||
retry(new IndexNotFoundException(concreteIndex));
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
if (indexMetaData.getState() == IndexMetaData.State.CLOSE) {
|
|
||||||
throw new IndexClosedException(indexMetaData.getIndex());
|
|
||||||
}
|
|
||||||
|
|
||||||
// resolve all derived request fields, so we can route and apply it
|
|
||||||
resolveRequest(indexMetaData, request);
|
|
||||||
assert request.shardId() != null : "request shardId must be set in resolveRequest";
|
|
||||||
assert request.waitForActiveShards() != ActiveShardCount.DEFAULT :
|
|
||||||
"request waitForActiveShards must be set in resolveRequest";
|
|
||||||
|
|
||||||
final ShardRouting primary = primary(state);
|
|
||||||
if (retryIfUnavailable(state, primary)) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
final DiscoveryNode node = state.nodes().get(primary.currentNodeId());
|
|
||||||
if (primary.currentNodeId().equals(state.nodes().getLocalNodeId())) {
|
|
||||||
performLocalAction(state, primary, node, indexMetaData);
|
|
||||||
} else {
|
|
||||||
performRemoteAction(state, primary, node);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -822,11 +776,44 @@ public abstract class TransportReplicationAction<
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private String concreteIndex(ClusterState state) {
|
||||||
|
return resolveIndex() ? indexNameExpressionResolver.concreteSingleIndex(state, request).getName() : request.index();
|
||||||
|
}
|
||||||
|
|
||||||
private ShardRouting primary(ClusterState state) {
|
private ShardRouting primary(ClusterState state) {
|
||||||
IndexShardRoutingTable indexShard = state.getRoutingTable().shardRoutingTable(request.shardId());
|
IndexShardRoutingTable indexShard = state.getRoutingTable().shardRoutingTable(request.shardId());
|
||||||
return indexShard.primaryShard();
|
return indexShard.primaryShard();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private boolean handleBlockExceptions(ClusterState state) {
|
||||||
|
ClusterBlockLevel globalBlockLevel = globalBlockLevel();
|
||||||
|
if (globalBlockLevel != null) {
|
||||||
|
ClusterBlockException blockException = state.blocks().globalBlockedException(globalBlockLevel);
|
||||||
|
if (blockException != null) {
|
||||||
|
handleBlockException(blockException);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ClusterBlockLevel indexBlockLevel = indexBlockLevel();
|
||||||
|
if (indexBlockLevel != null) {
|
||||||
|
ClusterBlockException blockException = state.blocks().indexBlockedException(indexBlockLevel, concreteIndex(state));
|
||||||
|
if (blockException != null) {
|
||||||
|
handleBlockException(blockException);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void handleBlockException(ClusterBlockException blockException) {
|
||||||
|
if (blockException.retryable()) {
|
||||||
|
logger.trace("cluster is blocked, scheduling a retry", blockException);
|
||||||
|
retry(blockException);
|
||||||
|
} else {
|
||||||
|
finishAsFailed(blockException);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void performAction(final DiscoveryNode node, final String action, final boolean isPrimaryAction,
|
private void performAction(final DiscoveryNode node, final String action, final boolean isPrimaryAction,
|
||||||
final TransportRequest requestToPerform) {
|
final TransportRequest requestToPerform) {
|
||||||
transportService.sendRequest(node, action, requestToPerform, transportOptions, new TransportResponseHandler<Response>() {
|
transportService.sendRequest(node, action, requestToPerform, transportOptions, new TransportResponseHandler<Response>() {
|
||||||
|
|
|
@ -89,7 +89,6 @@ import org.junit.BeforeClass;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.EnumSet;
|
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Locale;
|
import java.util.Locale;
|
||||||
|
@ -101,7 +100,6 @@ import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import static java.util.Collections.singleton;
|
|
||||||
import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.state;
|
import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.state;
|
||||||
import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.stateWithActivePrimary;
|
import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.stateWithActivePrimary;
|
||||||
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_WAIT_FOR_ACTIVE_SHARDS;
|
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_WAIT_FOR_ACTIVE_SHARDS;
|
||||||
|
@ -110,11 +108,9 @@ import static org.elasticsearch.test.ClusterServiceUtils.setState;
|
||||||
import static org.hamcrest.CoreMatchers.containsString;
|
import static org.hamcrest.CoreMatchers.containsString;
|
||||||
import static org.hamcrest.Matchers.arrayWithSize;
|
import static org.hamcrest.Matchers.arrayWithSize;
|
||||||
import static org.hamcrest.Matchers.equalTo;
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
import static org.hamcrest.Matchers.hasToString;
|
|
||||||
import static org.hamcrest.Matchers.instanceOf;
|
import static org.hamcrest.Matchers.instanceOf;
|
||||||
import static org.hamcrest.Matchers.notNullValue;
|
import static org.hamcrest.Matchers.notNullValue;
|
||||||
import static org.hamcrest.Matchers.nullValue;
|
import static org.hamcrest.Matchers.nullValue;
|
||||||
import static org.hamcrest.core.Is.is;
|
|
||||||
import static org.mockito.Matchers.any;
|
import static org.mockito.Matchers.any;
|
||||||
import static org.mockito.Matchers.anyInt;
|
import static org.mockito.Matchers.anyInt;
|
||||||
import static org.mockito.Matchers.anyLong;
|
import static org.mockito.Matchers.anyLong;
|
||||||
|
@ -186,157 +182,70 @@ public class TransportReplicationActionTests extends ESTestCase {
|
||||||
threadPool = null;
|
threadPool = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
private <T> T assertListenerThrows(String msg, PlainActionFuture<?> listener, Class<T> klass) {
|
<T> void assertListenerThrows(String msg, PlainActionFuture<T> listener, Class<?> klass) throws InterruptedException {
|
||||||
ExecutionException exception = expectThrows(ExecutionException.class, msg, listener::get);
|
try {
|
||||||
assertThat(exception.getCause(), instanceOf(klass));
|
listener.get();
|
||||||
@SuppressWarnings("unchecked")
|
fail(msg);
|
||||||
final T cause = (T) exception.getCause();
|
} catch (ExecutionException ex) {
|
||||||
return cause;
|
assertThat(ex.getCause(), instanceOf(klass));
|
||||||
}
|
|
||||||
|
|
||||||
private void setStateWithBlock(final ClusterService clusterService, final ClusterBlock block, final boolean globalBlock) {
|
|
||||||
final ClusterBlocks.Builder blocks = ClusterBlocks.builder();
|
|
||||||
if (globalBlock) {
|
|
||||||
blocks.addGlobalBlock(block);
|
|
||||||
} else {
|
|
||||||
blocks.addIndexBlock("index", block);
|
|
||||||
}
|
}
|
||||||
setState(clusterService, ClusterState.builder(clusterService.state()).blocks(blocks).build());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testBlocksInReroutePhase() throws Exception {
|
public void testBlocks() throws ExecutionException, InterruptedException {
|
||||||
final ClusterBlock nonRetryableBlock =
|
Request request = new Request();
|
||||||
new ClusterBlock(1, "non retryable", false, true, false, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL);
|
PlainActionFuture<TestResponse> listener = new PlainActionFuture<>();
|
||||||
final ClusterBlock retryableBlock =
|
ReplicationTask task = maybeTask();
|
||||||
new ClusterBlock(1, "retryable", true, true, false, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL);
|
TestAction action = new TestAction(Settings.EMPTY, "internal:testActionWithBlocks",
|
||||||
|
transportService, clusterService, shardStateAction, threadPool) {
|
||||||
final boolean globalBlock = randomBoolean();
|
|
||||||
final TestAction action = new TestAction(Settings.EMPTY, "internal:testActionWithBlocks",
|
|
||||||
transportService, clusterService, shardStateAction, threadPool) {
|
|
||||||
@Override
|
@Override
|
||||||
protected ClusterBlockLevel globalBlockLevel() {
|
protected ClusterBlockLevel globalBlockLevel() {
|
||||||
return globalBlock ? ClusterBlockLevel.WRITE : null;
|
return ClusterBlockLevel.WRITE;
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected ClusterBlockLevel indexBlockLevel() {
|
|
||||||
return globalBlock == false ? ClusterBlockLevel.WRITE : null;
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
setState(clusterService, ClusterStateCreationUtils.stateWithActivePrimary("index", true, 0));
|
ClusterBlocks.Builder block = ClusterBlocks.builder().addGlobalBlock(new ClusterBlock(1, "non retryable", false, true,
|
||||||
|
false, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL));
|
||||||
{
|
|
||||||
setStateWithBlock(clusterService, nonRetryableBlock, globalBlock);
|
|
||||||
|
|
||||||
Request request = globalBlock ? new Request() : new Request().index("index");
|
|
||||||
PlainActionFuture<TestResponse> listener = new PlainActionFuture<>();
|
|
||||||
ReplicationTask task = maybeTask();
|
|
||||||
|
|
||||||
TestAction.ReroutePhase reroutePhase = action.new ReroutePhase(task, request, listener);
|
|
||||||
reroutePhase.run();
|
|
||||||
|
|
||||||
ClusterBlockException exception =
|
|
||||||
assertListenerThrows("primary action should fail operation", listener, ClusterBlockException.class);
|
|
||||||
assertThat(((ClusterBlockException) exception.unwrapCause()).blocks().iterator().next(), is(nonRetryableBlock));
|
|
||||||
assertPhase(task, "failed");
|
|
||||||
}
|
|
||||||
{
|
|
||||||
setStateWithBlock(clusterService, retryableBlock, globalBlock);
|
|
||||||
|
|
||||||
Request requestWithTimeout = (globalBlock ? new Request() : new Request().index("index")).timeout("5ms");
|
|
||||||
PlainActionFuture<TestResponse> listener = new PlainActionFuture<>();
|
|
||||||
ReplicationTask task = maybeTask();
|
|
||||||
|
|
||||||
TestAction.ReroutePhase reroutePhase = action.new ReroutePhase(task, requestWithTimeout, listener);
|
|
||||||
reroutePhase.run();
|
|
||||||
|
|
||||||
ClusterBlockException exception =
|
|
||||||
assertListenerThrows("failed to timeout on retryable block", listener, ClusterBlockException.class);
|
|
||||||
assertThat(((ClusterBlockException) exception.unwrapCause()).blocks().iterator().next(), is(retryableBlock));
|
|
||||||
assertPhase(task, "failed");
|
|
||||||
assertTrue(requestWithTimeout.isRetrySet.get());
|
|
||||||
}
|
|
||||||
{
|
|
||||||
setStateWithBlock(clusterService, retryableBlock, globalBlock);
|
|
||||||
|
|
||||||
Request request = globalBlock ? new Request() : new Request().index("index");
|
|
||||||
PlainActionFuture<TestResponse> listener = new PlainActionFuture<>();
|
|
||||||
ReplicationTask task = maybeTask();
|
|
||||||
|
|
||||||
TestAction.ReroutePhase reroutePhase = action.new ReroutePhase(task, request, listener);
|
|
||||||
reroutePhase.run();
|
|
||||||
|
|
||||||
assertFalse("primary phase should wait on retryable block", listener.isDone());
|
|
||||||
assertPhase(task, "waiting_for_retry");
|
|
||||||
assertTrue(request.isRetrySet.get());
|
|
||||||
|
|
||||||
setStateWithBlock(clusterService, nonRetryableBlock, globalBlock);
|
|
||||||
|
|
||||||
ClusterBlockException exception = assertListenerThrows("primary phase should fail operation when moving from a retryable " +
|
|
||||||
"block to a non-retryable one", listener, ClusterBlockException.class);
|
|
||||||
assertThat(((ClusterBlockException) exception.unwrapCause()).blocks().iterator().next(), is(nonRetryableBlock));
|
|
||||||
assertIndexShardUninitialized();
|
|
||||||
}
|
|
||||||
{
|
|
||||||
Request requestWithTimeout = new Request().index("unknown").setShardId(new ShardId("unknown", "_na_", 0)).timeout("5ms");
|
|
||||||
PlainActionFuture<TestResponse> listener = new PlainActionFuture<>();
|
|
||||||
ReplicationTask task = maybeTask();
|
|
||||||
|
|
||||||
TestAction testActionWithNoBlocks = new TestAction(Settings.EMPTY, "internal:testActionWithNoBlocks", transportService,
|
|
||||||
clusterService, shardStateAction, threadPool);
|
|
||||||
listener = new PlainActionFuture<>();
|
|
||||||
TestAction.ReroutePhase reroutePhase = testActionWithNoBlocks.new ReroutePhase(task, requestWithTimeout, listener);
|
|
||||||
reroutePhase.run();
|
|
||||||
assertListenerThrows("should fail with an IndexNotFoundException when no blocks", listener, IndexNotFoundException.class);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void testBlocksInPrimaryAction() {
|
|
||||||
final boolean globalBlock = randomBoolean();
|
|
||||||
|
|
||||||
final TestAction actionWithBlocks =
|
|
||||||
new TestAction(Settings.EMPTY, "internal:actionWithBlocks", transportService, clusterService, shardStateAction, threadPool) {
|
|
||||||
@Override
|
|
||||||
protected ClusterBlockLevel globalBlockLevel() {
|
|
||||||
return globalBlock ? ClusterBlockLevel.WRITE : null;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected ClusterBlockLevel indexBlockLevel() {
|
|
||||||
return globalBlock == false ? ClusterBlockLevel.WRITE : null;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
final String index = "index";
|
|
||||||
final ShardId shardId = new ShardId(index, "_na_", 0);
|
|
||||||
setState(clusterService, stateWithActivePrimary(index, true, randomInt(5)));
|
|
||||||
|
|
||||||
final ClusterBlocks.Builder block = ClusterBlocks.builder();
|
|
||||||
if (globalBlock) {
|
|
||||||
block.addGlobalBlock(new ClusterBlock(randomIntBetween(1, 16), "test global block", randomBoolean(), randomBoolean(),
|
|
||||||
randomBoolean(), RestStatus.BAD_REQUEST, ClusterBlockLevel.ALL));
|
|
||||||
} else {
|
|
||||||
block.addIndexBlock(index, new ClusterBlock(randomIntBetween(1, 16), "test index block", randomBoolean(), randomBoolean(),
|
|
||||||
randomBoolean(), RestStatus.FORBIDDEN, ClusterBlockLevel.READ_WRITE));
|
|
||||||
}
|
|
||||||
setState(clusterService, ClusterState.builder(clusterService.state()).blocks(block));
|
setState(clusterService, ClusterState.builder(clusterService.state()).blocks(block));
|
||||||
|
TestAction.ReroutePhase reroutePhase = action.new ReroutePhase(task, request, listener);
|
||||||
|
reroutePhase.run();
|
||||||
|
assertListenerThrows("primary phase should fail operation", listener, ClusterBlockException.class);
|
||||||
|
assertPhase(task, "failed");
|
||||||
|
|
||||||
final ClusterState clusterState = clusterService.state();
|
block = ClusterBlocks.builder()
|
||||||
final String targetAllocationID = clusterState.getRoutingTable().shardRoutingTable(shardId).primaryShard().allocationId().getId();
|
.addGlobalBlock(new ClusterBlock(1, "retryable", true, true, false, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL));
|
||||||
final long primaryTerm = clusterState.metaData().index(index).primaryTerm(shardId.id());
|
setState(clusterService, ClusterState.builder(clusterService.state()).blocks(block));
|
||||||
final Request request = new Request(shardId);
|
listener = new PlainActionFuture<>();
|
||||||
final ReplicationTask task = maybeTask();
|
reroutePhase = action.new ReroutePhase(task, new Request().timeout("5ms"), listener);
|
||||||
final PlainActionFuture<TestResponse> listener = new PlainActionFuture<>();
|
reroutePhase.run();
|
||||||
|
assertListenerThrows("failed to timeout on retryable block", listener, ClusterBlockException.class);
|
||||||
|
assertPhase(task, "failed");
|
||||||
|
assertFalse(request.isRetrySet.get());
|
||||||
|
|
||||||
final TransportReplicationAction.AsyncPrimaryAction asyncPrimaryActionWithBlocks =
|
listener = new PlainActionFuture<>();
|
||||||
actionWithBlocks.new AsyncPrimaryAction(request, targetAllocationID, primaryTerm, createTransportChannel(listener), task);
|
reroutePhase = action.new ReroutePhase(task, request = new Request(), listener);
|
||||||
asyncPrimaryActionWithBlocks.run();
|
reroutePhase.run();
|
||||||
|
assertFalse("primary phase should wait on retryable block", listener.isDone());
|
||||||
|
assertPhase(task, "waiting_for_retry");
|
||||||
|
assertTrue(request.isRetrySet.get());
|
||||||
|
|
||||||
final ExecutionException exception = expectThrows(ExecutionException.class, listener::get);
|
block = ClusterBlocks.builder().addGlobalBlock(new ClusterBlock(1, "non retryable", false, true, false,
|
||||||
assertThat(exception.getCause(), instanceOf(ClusterBlockException.class));
|
RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL));
|
||||||
assertThat(exception.getCause(), hasToString(containsString("test " + (globalBlock ? "global" : "index") + " block")));
|
setState(clusterService, ClusterState.builder(clusterService.state()).blocks(block));
|
||||||
assertPhase(task, "finished");
|
assertListenerThrows("primary phase should fail operation when moving from a retryable block to a non-retryable one", listener,
|
||||||
|
ClusterBlockException.class);
|
||||||
|
assertIndexShardUninitialized();
|
||||||
|
|
||||||
|
action = new TestAction(Settings.EMPTY, "internal:testActionWithNoBlocks", transportService, clusterService, shardStateAction,
|
||||||
|
threadPool) {
|
||||||
|
@Override
|
||||||
|
protected ClusterBlockLevel globalBlockLevel() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
listener = new PlainActionFuture<>();
|
||||||
|
reroutePhase = action.new ReroutePhase(task, new Request().timeout("5ms"), listener);
|
||||||
|
reroutePhase.run();
|
||||||
|
assertListenerThrows("should fail with an IndexNotFoundException when no blocks checked", listener, IndexNotFoundException.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void assertIndexShardUninitialized() {
|
public void assertIndexShardUninitialized() {
|
||||||
|
@ -468,12 +377,21 @@ public class TransportReplicationActionTests extends ESTestCase {
|
||||||
PlainActionFuture<TestResponse> listener = new PlainActionFuture<>();
|
PlainActionFuture<TestResponse> listener = new PlainActionFuture<>();
|
||||||
ReplicationTask task = maybeTask();
|
ReplicationTask task = maybeTask();
|
||||||
|
|
||||||
|
ClusterBlockLevel indexBlockLevel = randomBoolean() ? ClusterBlockLevel.WRITE : null;
|
||||||
TestAction action = new TestAction(Settings.EMPTY, "internal:testActionWithBlocks", transportService,
|
TestAction action = new TestAction(Settings.EMPTY, "internal:testActionWithBlocks", transportService,
|
||||||
clusterService, shardStateAction, threadPool);
|
clusterService, shardStateAction, threadPool) {
|
||||||
|
@Override
|
||||||
|
protected ClusterBlockLevel indexBlockLevel() {
|
||||||
|
return indexBlockLevel;
|
||||||
|
}
|
||||||
|
};
|
||||||
TestAction.ReroutePhase reroutePhase = action.new ReroutePhase(task, request, listener);
|
TestAction.ReroutePhase reroutePhase = action.new ReroutePhase(task, request, listener);
|
||||||
reroutePhase.run();
|
reroutePhase.run();
|
||||||
assertListenerThrows("must throw index closed exception", listener, IndexClosedException.class);
|
if (indexBlockLevel == ClusterBlockLevel.WRITE) {
|
||||||
|
assertListenerThrows("must throw block exception", listener, ClusterBlockException.class);
|
||||||
|
} else {
|
||||||
|
assertListenerThrows("must throw index closed exception", listener, IndexClosedException.class);
|
||||||
|
}
|
||||||
assertPhase(task, "failed");
|
assertPhase(task, "failed");
|
||||||
assertFalse(request.isRetrySet.get());
|
assertFalse(request.isRetrySet.get());
|
||||||
}
|
}
|
||||||
|
@ -764,12 +682,12 @@ public class TransportReplicationActionTests extends ESTestCase {
|
||||||
PlainActionFuture<TestResponse> listener = new PlainActionFuture<>();
|
PlainActionFuture<TestResponse> listener = new PlainActionFuture<>();
|
||||||
|
|
||||||
|
|
||||||
final IndexShard shard = mockIndexShard(shardId, clusterService);
|
final IndexShard shard = mock(IndexShard.class);
|
||||||
when(shard.getPendingPrimaryTerm()).thenReturn(primaryTerm);
|
when(shard.getPendingPrimaryTerm()).thenReturn(primaryTerm);
|
||||||
when(shard.routingEntry()).thenReturn(routingEntry);
|
when(shard.routingEntry()).thenReturn(routingEntry);
|
||||||
when(shard.isRelocatedPrimary()).thenReturn(false);
|
when(shard.isRelocatedPrimary()).thenReturn(false);
|
||||||
IndexShardRoutingTable shardRoutingTable = clusterService.state().routingTable().shardRoutingTable(shardId);
|
IndexShardRoutingTable shardRoutingTable = clusterService.state().routingTable().shardRoutingTable(shardId);
|
||||||
Set<String> inSyncIds = randomBoolean() ? singleton(routingEntry.allocationId().getId()) :
|
Set<String> inSyncIds = randomBoolean() ? Collections.singleton(routingEntry.allocationId().getId()) :
|
||||||
clusterService.state().metaData().index(index).inSyncAllocationIds(0);
|
clusterService.state().metaData().index(index).inSyncAllocationIds(0);
|
||||||
when(shard.getReplicationGroup()).thenReturn(
|
when(shard.getReplicationGroup()).thenReturn(
|
||||||
new ReplicationGroup(shardRoutingTable,
|
new ReplicationGroup(shardRoutingTable,
|
||||||
|
@ -1104,17 +1022,6 @@ public class TransportReplicationActionTests extends ESTestCase {
|
||||||
transportService.stop();
|
transportService.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testIsRetryableClusterBlockException() {
|
|
||||||
final TestAction action = new TestAction(Settings.EMPTY, "internal:testIsRetryableClusterBlockException", transportService,
|
|
||||||
clusterService, shardStateAction, threadPool);
|
|
||||||
assertFalse(action.isRetryableClusterBlockException(randomRetryPrimaryException(new ShardId("index", "_na_", 0))));
|
|
||||||
|
|
||||||
final boolean retryable = randomBoolean();
|
|
||||||
ClusterBlock randomBlock = new ClusterBlock(randomIntBetween(1, 16), "test", retryable, randomBoolean(),
|
|
||||||
randomBoolean(), randomFrom(RestStatus.values()), EnumSet.of(randomFrom(ClusterBlockLevel.values())));
|
|
||||||
assertEquals(retryable, action.isRetryableClusterBlockException(new ClusterBlockException(singleton(randomBlock))));
|
|
||||||
}
|
|
||||||
|
|
||||||
private void assertConcreteShardRequest(TransportRequest capturedRequest, Request expectedRequest, AllocationId expectedAllocationId) {
|
private void assertConcreteShardRequest(TransportRequest capturedRequest, Request expectedRequest, AllocationId expectedAllocationId) {
|
||||||
final TransportReplicationAction.ConcreteShardRequest<?> concreteShardRequest =
|
final TransportReplicationAction.ConcreteShardRequest<?> concreteShardRequest =
|
||||||
(TransportReplicationAction.ConcreteShardRequest<?>) capturedRequest;
|
(TransportReplicationAction.ConcreteShardRequest<?>) capturedRequest;
|
||||||
|
@ -1208,6 +1115,15 @@ public class TransportReplicationActionTests extends ESTestCase {
|
||||||
Request::new, Request::new, ThreadPool.Names.SAME);
|
Request::new, Request::new, ThreadPool.Names.SAME);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TestAction(Settings settings, String actionName, TransportService transportService,
|
||||||
|
ClusterService clusterService, ShardStateAction shardStateAction,
|
||||||
|
ThreadPool threadPool, boolean withDocumentFailureOnPrimary, boolean withDocumentFailureOnReplica) {
|
||||||
|
super(settings, actionName, transportService, clusterService, mockIndicesService(clusterService), threadPool,
|
||||||
|
shardStateAction,
|
||||||
|
new ActionFilters(new HashSet<>()), new IndexNameExpressionResolver(),
|
||||||
|
Request::new, Request::new, ThreadPool.Names.SAME);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected TestResponse newResponseInstance() {
|
protected TestResponse newResponseInstance() {
|
||||||
return new TestResponse();
|
return new TestResponse();
|
||||||
|
@ -1267,7 +1183,6 @@ public class TransportReplicationActionTests extends ESTestCase {
|
||||||
|
|
||||||
private IndexShard mockIndexShard(ShardId shardId, ClusterService clusterService) {
|
private IndexShard mockIndexShard(ShardId shardId, ClusterService clusterService) {
|
||||||
final IndexShard indexShard = mock(IndexShard.class);
|
final IndexShard indexShard = mock(IndexShard.class);
|
||||||
when(indexShard.shardId()).thenReturn(shardId);
|
|
||||||
doAnswer(invocation -> {
|
doAnswer(invocation -> {
|
||||||
ActionListener<Releasable> callback = (ActionListener<Releasable>) invocation.getArguments()[0];
|
ActionListener<Releasable> callback = (ActionListener<Releasable>) invocation.getArguments()[0];
|
||||||
count.incrementAndGet();
|
count.incrementAndGet();
|
||||||
|
|
Loading…
Reference in New Issue