Revert "[RCI] Check blocks while having index shard permit in TransportReplicationAction (#35332)"

This reverts commit 31567cefb4.
This commit is contained in:
Tanguy Leroux 2018-11-16 15:38:40 +01:00
parent 76b77dbaa4
commit d3d7c0158e
2 changed files with 138 additions and 236 deletions

View File

@ -235,39 +235,9 @@ public abstract class TransportReplicationAction<
return TransportRequestOptions.EMPTY; return TransportRequestOptions.EMPTY;
} }
private String concreteIndex(final ClusterState state, final ReplicationRequest request) {
return resolveIndex() ? indexNameExpressionResolver.concreteSingleIndex(state, request).getName() : request.index();
}
private ClusterBlockException blockExceptions(final ClusterState state, final String indexName) {
ClusterBlockLevel globalBlockLevel = globalBlockLevel();
if (globalBlockLevel != null) {
ClusterBlockException blockException = state.blocks().globalBlockedException(globalBlockLevel);
if (blockException != null) {
return blockException;
}
}
ClusterBlockLevel indexBlockLevel = indexBlockLevel();
if (indexBlockLevel != null) {
ClusterBlockException blockException = state.blocks().indexBlockedException(indexBlockLevel, indexName);
if (blockException != null) {
return blockException;
}
}
return null;
}
protected boolean retryPrimaryException(final Throwable e) { protected boolean retryPrimaryException(final Throwable e) {
return e.getClass() == ReplicationOperation.RetryOnPrimaryException.class return e.getClass() == ReplicationOperation.RetryOnPrimaryException.class
|| TransportActions.isShardNotAvailableException(e) || TransportActions.isShardNotAvailableException(e);
|| isRetryableClusterBlockException(e);
}
boolean isRetryableClusterBlockException(final Throwable e) {
if (e instanceof ClusterBlockException) {
return ((ClusterBlockException) e).retryable();
}
return false;
} }
protected class OperationTransportHandler implements TransportRequestHandler<Request> { protected class OperationTransportHandler implements TransportRequestHandler<Request> {
@ -340,15 +310,6 @@ public abstract class TransportReplicationAction<
@Override @Override
public void onResponse(PrimaryShardReference primaryShardReference) { public void onResponse(PrimaryShardReference primaryShardReference) {
try { try {
final ClusterState clusterState = clusterService.state();
final IndexMetaData indexMetaData = clusterState.metaData().getIndexSafe(primaryShardReference.routingEntry().index());
final ClusterBlockException blockException = blockExceptions(clusterState, indexMetaData.getIndex().getName());
if (blockException != null) {
logger.trace("cluster is blocked, action failed on primary", blockException);
throw blockException;
}
if (primaryShardReference.isRelocated()) { if (primaryShardReference.isRelocated()) {
primaryShardReference.close(); // release shard operation lock as soon as possible primaryShardReference.close(); // release shard operation lock as soon as possible
setPhase(replicationTask, "primary_delegation"); setPhase(replicationTask, "primary_delegation");
@ -362,7 +323,7 @@ public abstract class TransportReplicationAction<
response.readFrom(in); response.readFrom(in);
return response; return response;
}; };
DiscoveryNode relocatingNode = clusterState.nodes().get(primary.relocatingNodeId()); DiscoveryNode relocatingNode = clusterService.state().nodes().get(primary.relocatingNodeId());
transportService.sendRequest(relocatingNode, transportPrimaryAction, transportService.sendRequest(relocatingNode, transportPrimaryAction,
new ConcreteShardRequest<>(request, primary.allocationId().getRelocationId(), primaryTerm), new ConcreteShardRequest<>(request, primary.allocationId().getRelocationId(), primaryTerm),
transportOptions, transportOptions,
@ -735,17 +696,12 @@ public abstract class TransportReplicationAction<
protected void doRun() { protected void doRun() {
setPhase(task, "routing"); setPhase(task, "routing");
final ClusterState state = observer.setAndGetObservedState(); final ClusterState state = observer.setAndGetObservedState();
final String concreteIndex = concreteIndex(state, request); if (handleBlockExceptions(state)) {
final ClusterBlockException blockException = blockExceptions(state, concreteIndex); return;
if (blockException != null) {
if (blockException.retryable()) {
logger.trace("cluster is blocked, scheduling a retry", blockException);
retry(blockException);
} else {
finishAsFailed(blockException);
} }
} else {
// request does not have a shardId yet, we need to pass the concrete index to resolve shardId // request does not have a shardId yet, we need to pass the concrete index to resolve shardId
final String concreteIndex = concreteIndex(state);
final IndexMetaData indexMetaData = state.metaData().index(concreteIndex); final IndexMetaData indexMetaData = state.metaData().index(concreteIndex);
if (indexMetaData == null) { if (indexMetaData == null) {
retry(new IndexNotFoundException(concreteIndex)); retry(new IndexNotFoundException(concreteIndex));
@ -758,8 +714,7 @@ public abstract class TransportReplicationAction<
// resolve all derived request fields, so we can route and apply it // resolve all derived request fields, so we can route and apply it
resolveRequest(indexMetaData, request); resolveRequest(indexMetaData, request);
assert request.shardId() != null : "request shardId must be set in resolveRequest"; assert request.shardId() != null : "request shardId must be set in resolveRequest";
assert request.waitForActiveShards() != ActiveShardCount.DEFAULT : assert request.waitForActiveShards() != ActiveShardCount.DEFAULT : "request waitForActiveShards must be set in resolveRequest";
"request waitForActiveShards must be set in resolveRequest";
final ShardRouting primary = primary(state); final ShardRouting primary = primary(state);
if (retryIfUnavailable(state, primary)) { if (retryIfUnavailable(state, primary)) {
@ -772,7 +727,6 @@ public abstract class TransportReplicationAction<
performRemoteAction(state, primary, node); performRemoteAction(state, primary, node);
} }
} }
}
private void performLocalAction(ClusterState state, ShardRouting primary, DiscoveryNode node, IndexMetaData indexMetaData) { private void performLocalAction(ClusterState state, ShardRouting primary, DiscoveryNode node, IndexMetaData indexMetaData) {
setPhase(task, "waiting_on_primary"); setPhase(task, "waiting_on_primary");
@ -822,11 +776,44 @@ public abstract class TransportReplicationAction<
return false; return false;
} }
private String concreteIndex(ClusterState state) {
return resolveIndex() ? indexNameExpressionResolver.concreteSingleIndex(state, request).getName() : request.index();
}
private ShardRouting primary(ClusterState state) { private ShardRouting primary(ClusterState state) {
IndexShardRoutingTable indexShard = state.getRoutingTable().shardRoutingTable(request.shardId()); IndexShardRoutingTable indexShard = state.getRoutingTable().shardRoutingTable(request.shardId());
return indexShard.primaryShard(); return indexShard.primaryShard();
} }
private boolean handleBlockExceptions(ClusterState state) {
ClusterBlockLevel globalBlockLevel = globalBlockLevel();
if (globalBlockLevel != null) {
ClusterBlockException blockException = state.blocks().globalBlockedException(globalBlockLevel);
if (blockException != null) {
handleBlockException(blockException);
return true;
}
}
ClusterBlockLevel indexBlockLevel = indexBlockLevel();
if (indexBlockLevel != null) {
ClusterBlockException blockException = state.blocks().indexBlockedException(indexBlockLevel, concreteIndex(state));
if (blockException != null) {
handleBlockException(blockException);
return true;
}
}
return false;
}
private void handleBlockException(ClusterBlockException blockException) {
if (blockException.retryable()) {
logger.trace("cluster is blocked, scheduling a retry", blockException);
retry(blockException);
} else {
finishAsFailed(blockException);
}
}
private void performAction(final DiscoveryNode node, final String action, final boolean isPrimaryAction, private void performAction(final DiscoveryNode node, final String action, final boolean isPrimaryAction,
final TransportRequest requestToPerform) { final TransportRequest requestToPerform) {
transportService.sendRequest(node, action, requestToPerform, transportOptions, new TransportResponseHandler<Response>() { transportService.sendRequest(node, action, requestToPerform, transportOptions, new TransportResponseHandler<Response>() {

View File

@ -89,7 +89,6 @@ import org.junit.BeforeClass;
import java.io.IOException; import java.io.IOException;
import java.util.Collections; import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Locale; import java.util.Locale;
@ -101,7 +100,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static java.util.Collections.singleton;
import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.state; import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.state;
import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.stateWithActivePrimary; import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.stateWithActivePrimary;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_WAIT_FOR_ACTIVE_SHARDS; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_WAIT_FOR_ACTIVE_SHARDS;
@ -110,11 +108,9 @@ import static org.elasticsearch.test.ClusterServiceUtils.setState;
import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.Matchers.arrayWithSize; import static org.hamcrest.Matchers.arrayWithSize;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasToString;
import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.core.Is.is;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyLong;
@ -186,157 +182,70 @@ public class TransportReplicationActionTests extends ESTestCase {
threadPool = null; threadPool = null;
} }
private <T> T assertListenerThrows(String msg, PlainActionFuture<?> listener, Class<T> klass) { <T> void assertListenerThrows(String msg, PlainActionFuture<T> listener, Class<?> klass) throws InterruptedException {
ExecutionException exception = expectThrows(ExecutionException.class, msg, listener::get); try {
assertThat(exception.getCause(), instanceOf(klass)); listener.get();
@SuppressWarnings("unchecked") fail(msg);
final T cause = (T) exception.getCause(); } catch (ExecutionException ex) {
return cause; assertThat(ex.getCause(), instanceOf(klass));
}
} }
private void setStateWithBlock(final ClusterService clusterService, final ClusterBlock block, final boolean globalBlock) { public void testBlocks() throws ExecutionException, InterruptedException {
final ClusterBlocks.Builder blocks = ClusterBlocks.builder(); Request request = new Request();
if (globalBlock) { PlainActionFuture<TestResponse> listener = new PlainActionFuture<>();
blocks.addGlobalBlock(block); ReplicationTask task = maybeTask();
} else { TestAction action = new TestAction(Settings.EMPTY, "internal:testActionWithBlocks",
blocks.addIndexBlock("index", block);
}
setState(clusterService, ClusterState.builder(clusterService.state()).blocks(blocks).build());
}
public void testBlocksInReroutePhase() throws Exception {
final ClusterBlock nonRetryableBlock =
new ClusterBlock(1, "non retryable", false, true, false, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL);
final ClusterBlock retryableBlock =
new ClusterBlock(1, "retryable", true, true, false, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL);
final boolean globalBlock = randomBoolean();
final TestAction action = new TestAction(Settings.EMPTY, "internal:testActionWithBlocks",
transportService, clusterService, shardStateAction, threadPool) { transportService, clusterService, shardStateAction, threadPool) {
@Override @Override
protected ClusterBlockLevel globalBlockLevel() { protected ClusterBlockLevel globalBlockLevel() {
return globalBlock ? ClusterBlockLevel.WRITE : null; return ClusterBlockLevel.WRITE;
}
@Override
protected ClusterBlockLevel indexBlockLevel() {
return globalBlock == false ? ClusterBlockLevel.WRITE : null;
} }
}; };
setState(clusterService, ClusterStateCreationUtils.stateWithActivePrimary("index", true, 0)); ClusterBlocks.Builder block = ClusterBlocks.builder().addGlobalBlock(new ClusterBlock(1, "non retryable", false, true,
false, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL));
{ setState(clusterService, ClusterState.builder(clusterService.state()).blocks(block));
setStateWithBlock(clusterService, nonRetryableBlock, globalBlock);
Request request = globalBlock ? new Request() : new Request().index("index");
PlainActionFuture<TestResponse> listener = new PlainActionFuture<>();
ReplicationTask task = maybeTask();
TestAction.ReroutePhase reroutePhase = action.new ReroutePhase(task, request, listener); TestAction.ReroutePhase reroutePhase = action.new ReroutePhase(task, request, listener);
reroutePhase.run(); reroutePhase.run();
assertListenerThrows("primary phase should fail operation", listener, ClusterBlockException.class);
ClusterBlockException exception =
assertListenerThrows("primary action should fail operation", listener, ClusterBlockException.class);
assertThat(((ClusterBlockException) exception.unwrapCause()).blocks().iterator().next(), is(nonRetryableBlock));
assertPhase(task, "failed"); assertPhase(task, "failed");
}
{
setStateWithBlock(clusterService, retryableBlock, globalBlock);
Request requestWithTimeout = (globalBlock ? new Request() : new Request().index("index")).timeout("5ms"); block = ClusterBlocks.builder()
PlainActionFuture<TestResponse> listener = new PlainActionFuture<>(); .addGlobalBlock(new ClusterBlock(1, "retryable", true, true, false, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL));
ReplicationTask task = maybeTask(); setState(clusterService, ClusterState.builder(clusterService.state()).blocks(block));
listener = new PlainActionFuture<>();
TestAction.ReroutePhase reroutePhase = action.new ReroutePhase(task, requestWithTimeout, listener); reroutePhase = action.new ReroutePhase(task, new Request().timeout("5ms"), listener);
reroutePhase.run(); reroutePhase.run();
ClusterBlockException exception =
assertListenerThrows("failed to timeout on retryable block", listener, ClusterBlockException.class); assertListenerThrows("failed to timeout on retryable block", listener, ClusterBlockException.class);
assertThat(((ClusterBlockException) exception.unwrapCause()).blocks().iterator().next(), is(retryableBlock));
assertPhase(task, "failed"); assertPhase(task, "failed");
assertTrue(requestWithTimeout.isRetrySet.get()); assertFalse(request.isRetrySet.get());
}
{
setStateWithBlock(clusterService, retryableBlock, globalBlock);
Request request = globalBlock ? new Request() : new Request().index("index"); listener = new PlainActionFuture<>();
PlainActionFuture<TestResponse> listener = new PlainActionFuture<>(); reroutePhase = action.new ReroutePhase(task, request = new Request(), listener);
ReplicationTask task = maybeTask();
TestAction.ReroutePhase reroutePhase = action.new ReroutePhase(task, request, listener);
reroutePhase.run(); reroutePhase.run();
assertFalse("primary phase should wait on retryable block", listener.isDone()); assertFalse("primary phase should wait on retryable block", listener.isDone());
assertPhase(task, "waiting_for_retry"); assertPhase(task, "waiting_for_retry");
assertTrue(request.isRetrySet.get()); assertTrue(request.isRetrySet.get());
setStateWithBlock(clusterService, nonRetryableBlock, globalBlock); block = ClusterBlocks.builder().addGlobalBlock(new ClusterBlock(1, "non retryable", false, true, false,
RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL));
ClusterBlockException exception = assertListenerThrows("primary phase should fail operation when moving from a retryable " + setState(clusterService, ClusterState.builder(clusterService.state()).blocks(block));
"block to a non-retryable one", listener, ClusterBlockException.class); assertListenerThrows("primary phase should fail operation when moving from a retryable block to a non-retryable one", listener,
assertThat(((ClusterBlockException) exception.unwrapCause()).blocks().iterator().next(), is(nonRetryableBlock)); ClusterBlockException.class);
assertIndexShardUninitialized(); assertIndexShardUninitialized();
}
{
Request requestWithTimeout = new Request().index("unknown").setShardId(new ShardId("unknown", "_na_", 0)).timeout("5ms");
PlainActionFuture<TestResponse> listener = new PlainActionFuture<>();
ReplicationTask task = maybeTask();
TestAction testActionWithNoBlocks = new TestAction(Settings.EMPTY, "internal:testActionWithNoBlocks", transportService, action = new TestAction(Settings.EMPTY, "internal:testActionWithNoBlocks", transportService, clusterService, shardStateAction,
clusterService, shardStateAction, threadPool); threadPool) {
listener = new PlainActionFuture<>();
TestAction.ReroutePhase reroutePhase = testActionWithNoBlocks.new ReroutePhase(task, requestWithTimeout, listener);
reroutePhase.run();
assertListenerThrows("should fail with an IndexNotFoundException when no blocks", listener, IndexNotFoundException.class);
}
}
public void testBlocksInPrimaryAction() {
final boolean globalBlock = randomBoolean();
final TestAction actionWithBlocks =
new TestAction(Settings.EMPTY, "internal:actionWithBlocks", transportService, clusterService, shardStateAction, threadPool) {
@Override @Override
protected ClusterBlockLevel globalBlockLevel() { protected ClusterBlockLevel globalBlockLevel() {
return globalBlock ? ClusterBlockLevel.WRITE : null; return null;
}
@Override
protected ClusterBlockLevel indexBlockLevel() {
return globalBlock == false ? ClusterBlockLevel.WRITE : null;
} }
}; };
listener = new PlainActionFuture<>();
final String index = "index"; reroutePhase = action.new ReroutePhase(task, new Request().timeout("5ms"), listener);
final ShardId shardId = new ShardId(index, "_na_", 0); reroutePhase.run();
setState(clusterService, stateWithActivePrimary(index, true, randomInt(5))); assertListenerThrows("should fail with an IndexNotFoundException when no blocks checked", listener, IndexNotFoundException.class);
final ClusterBlocks.Builder block = ClusterBlocks.builder();
if (globalBlock) {
block.addGlobalBlock(new ClusterBlock(randomIntBetween(1, 16), "test global block", randomBoolean(), randomBoolean(),
randomBoolean(), RestStatus.BAD_REQUEST, ClusterBlockLevel.ALL));
} else {
block.addIndexBlock(index, new ClusterBlock(randomIntBetween(1, 16), "test index block", randomBoolean(), randomBoolean(),
randomBoolean(), RestStatus.FORBIDDEN, ClusterBlockLevel.READ_WRITE));
}
setState(clusterService, ClusterState.builder(clusterService.state()).blocks(block));
final ClusterState clusterState = clusterService.state();
final String targetAllocationID = clusterState.getRoutingTable().shardRoutingTable(shardId).primaryShard().allocationId().getId();
final long primaryTerm = clusterState.metaData().index(index).primaryTerm(shardId.id());
final Request request = new Request(shardId);
final ReplicationTask task = maybeTask();
final PlainActionFuture<TestResponse> listener = new PlainActionFuture<>();
final TransportReplicationAction.AsyncPrimaryAction asyncPrimaryActionWithBlocks =
actionWithBlocks.new AsyncPrimaryAction(request, targetAllocationID, primaryTerm, createTransportChannel(listener), task);
asyncPrimaryActionWithBlocks.run();
final ExecutionException exception = expectThrows(ExecutionException.class, listener::get);
assertThat(exception.getCause(), instanceOf(ClusterBlockException.class));
assertThat(exception.getCause(), hasToString(containsString("test " + (globalBlock ? "global" : "index") + " block")));
assertPhase(task, "finished");
} }
public void assertIndexShardUninitialized() { public void assertIndexShardUninitialized() {
@ -468,12 +377,21 @@ public class TransportReplicationActionTests extends ESTestCase {
PlainActionFuture<TestResponse> listener = new PlainActionFuture<>(); PlainActionFuture<TestResponse> listener = new PlainActionFuture<>();
ReplicationTask task = maybeTask(); ReplicationTask task = maybeTask();
ClusterBlockLevel indexBlockLevel = randomBoolean() ? ClusterBlockLevel.WRITE : null;
TestAction action = new TestAction(Settings.EMPTY, "internal:testActionWithBlocks", transportService, TestAction action = new TestAction(Settings.EMPTY, "internal:testActionWithBlocks", transportService,
clusterService, shardStateAction, threadPool); clusterService, shardStateAction, threadPool) {
@Override
protected ClusterBlockLevel indexBlockLevel() {
return indexBlockLevel;
}
};
TestAction.ReroutePhase reroutePhase = action.new ReroutePhase(task, request, listener); TestAction.ReroutePhase reroutePhase = action.new ReroutePhase(task, request, listener);
reroutePhase.run(); reroutePhase.run();
if (indexBlockLevel == ClusterBlockLevel.WRITE) {
assertListenerThrows("must throw block exception", listener, ClusterBlockException.class);
} else {
assertListenerThrows("must throw index closed exception", listener, IndexClosedException.class); assertListenerThrows("must throw index closed exception", listener, IndexClosedException.class);
}
assertPhase(task, "failed"); assertPhase(task, "failed");
assertFalse(request.isRetrySet.get()); assertFalse(request.isRetrySet.get());
} }
@ -764,12 +682,12 @@ public class TransportReplicationActionTests extends ESTestCase {
PlainActionFuture<TestResponse> listener = new PlainActionFuture<>(); PlainActionFuture<TestResponse> listener = new PlainActionFuture<>();
final IndexShard shard = mockIndexShard(shardId, clusterService); final IndexShard shard = mock(IndexShard.class);
when(shard.getPendingPrimaryTerm()).thenReturn(primaryTerm); when(shard.getPendingPrimaryTerm()).thenReturn(primaryTerm);
when(shard.routingEntry()).thenReturn(routingEntry); when(shard.routingEntry()).thenReturn(routingEntry);
when(shard.isRelocatedPrimary()).thenReturn(false); when(shard.isRelocatedPrimary()).thenReturn(false);
IndexShardRoutingTable shardRoutingTable = clusterService.state().routingTable().shardRoutingTable(shardId); IndexShardRoutingTable shardRoutingTable = clusterService.state().routingTable().shardRoutingTable(shardId);
Set<String> inSyncIds = randomBoolean() ? singleton(routingEntry.allocationId().getId()) : Set<String> inSyncIds = randomBoolean() ? Collections.singleton(routingEntry.allocationId().getId()) :
clusterService.state().metaData().index(index).inSyncAllocationIds(0); clusterService.state().metaData().index(index).inSyncAllocationIds(0);
when(shard.getReplicationGroup()).thenReturn( when(shard.getReplicationGroup()).thenReturn(
new ReplicationGroup(shardRoutingTable, new ReplicationGroup(shardRoutingTable,
@ -1104,17 +1022,6 @@ public class TransportReplicationActionTests extends ESTestCase {
transportService.stop(); transportService.stop();
} }
public void testIsRetryableClusterBlockException() {
final TestAction action = new TestAction(Settings.EMPTY, "internal:testIsRetryableClusterBlockException", transportService,
clusterService, shardStateAction, threadPool);
assertFalse(action.isRetryableClusterBlockException(randomRetryPrimaryException(new ShardId("index", "_na_", 0))));
final boolean retryable = randomBoolean();
ClusterBlock randomBlock = new ClusterBlock(randomIntBetween(1, 16), "test", retryable, randomBoolean(),
randomBoolean(), randomFrom(RestStatus.values()), EnumSet.of(randomFrom(ClusterBlockLevel.values())));
assertEquals(retryable, action.isRetryableClusterBlockException(new ClusterBlockException(singleton(randomBlock))));
}
private void assertConcreteShardRequest(TransportRequest capturedRequest, Request expectedRequest, AllocationId expectedAllocationId) { private void assertConcreteShardRequest(TransportRequest capturedRequest, Request expectedRequest, AllocationId expectedAllocationId) {
final TransportReplicationAction.ConcreteShardRequest<?> concreteShardRequest = final TransportReplicationAction.ConcreteShardRequest<?> concreteShardRequest =
(TransportReplicationAction.ConcreteShardRequest<?>) capturedRequest; (TransportReplicationAction.ConcreteShardRequest<?>) capturedRequest;
@ -1208,6 +1115,15 @@ public class TransportReplicationActionTests extends ESTestCase {
Request::new, Request::new, ThreadPool.Names.SAME); Request::new, Request::new, ThreadPool.Names.SAME);
} }
TestAction(Settings settings, String actionName, TransportService transportService,
ClusterService clusterService, ShardStateAction shardStateAction,
ThreadPool threadPool, boolean withDocumentFailureOnPrimary, boolean withDocumentFailureOnReplica) {
super(settings, actionName, transportService, clusterService, mockIndicesService(clusterService), threadPool,
shardStateAction,
new ActionFilters(new HashSet<>()), new IndexNameExpressionResolver(),
Request::new, Request::new, ThreadPool.Names.SAME);
}
@Override @Override
protected TestResponse newResponseInstance() { protected TestResponse newResponseInstance() {
return new TestResponse(); return new TestResponse();
@ -1267,7 +1183,6 @@ public class TransportReplicationActionTests extends ESTestCase {
private IndexShard mockIndexShard(ShardId shardId, ClusterService clusterService) { private IndexShard mockIndexShard(ShardId shardId, ClusterService clusterService) {
final IndexShard indexShard = mock(IndexShard.class); final IndexShard indexShard = mock(IndexShard.class);
when(indexShard.shardId()).thenReturn(shardId);
doAnswer(invocation -> { doAnswer(invocation -> {
ActionListener<Releasable> callback = (ActionListener<Releasable>) invocation.getArguments()[0]; ActionListener<Releasable> callback = (ActionListener<Releasable>) invocation.getArguments()[0];
count.incrementAndGet(); count.incrementAndGet();