Inline TransportReplAct#createReplicatedOperation (#41197)

`TransportReplicationAction.AsyncPrimaryAction#createReplicatedOperation`
exists so it can be overridden in tests. This commit re-works these tests to
use a real `ReplicationOperation` and inlines the now-unnecessary method.

Relates #40706.
This commit is contained in:
David Turner 2019-04-16 13:36:29 +01:00
parent 10e58210a0
commit 8577bbd73b
3 changed files with 93 additions and 132 deletions

View File

@ -74,7 +74,7 @@ public class ReplicationOperation<
private final long primaryTerm;
// exposed for tests
final ActionListener<PrimaryResultT> resultListener;
private final ActionListener<PrimaryResultT> resultListener;
private volatile PrimaryResultT primaryResult = null;

View File

@ -358,37 +358,35 @@ public abstract class TransportReplicationAction<
});
} else {
setPhase(replicationTask, "primary");
createReplicatedOperation(primaryRequest.getRequest(),
ActionListener.wrap(result -> result.respond(
new ActionListener<Response>() {
@Override
public void onResponse(Response response) {
if (syncGlobalCheckpointAfterOperation) {
final IndexShard shard = primaryShardReference.indexShard;
try {
shard.maybeSyncGlobalCheckpoint("post-operation");
} catch (final Exception e) {
// only log non-closed exceptions
if (ExceptionsHelper.unwrap(
e, AlreadyClosedException.class, IndexShardClosedException.class) == null) {
// intentionally swallow, a missed global checkpoint sync should not fail this operation
logger.info(
new ParameterizedMessage(
"{} failed to execute post-operation global checkpoint sync", shard.shardId()), e);
}
}
}
primaryShardReference.close(); // release shard operation lock before responding to caller
setPhase(replicationTask, "finished");
onCompletionListener.onResponse(response);
}
@Override
public void onFailure(Exception e) {
handleException(primaryShardReference, e);
final ActionListener<Response> referenceClosingListener = ActionListener.wrap(response -> {
primaryShardReference.close(); // release shard operation lock before responding to caller
setPhase(replicationTask, "finished");
onCompletionListener.onResponse(response);
}, e -> handleException(primaryShardReference, e));
final ActionListener<Response> globalCheckpointSyncingListener = ActionListener.wrap(response -> {
if (syncGlobalCheckpointAfterOperation) {
final IndexShard shard = primaryShardReference.indexShard;
try {
shard.maybeSyncGlobalCheckpoint("post-operation");
} catch (final Exception e) {
// only log non-closed exceptions
if (ExceptionsHelper.unwrap(
e, AlreadyClosedException.class, IndexShardClosedException.class) == null) {
// intentionally swallow, a missed global checkpoint sync should not fail this operation
logger.info(
new ParameterizedMessage(
"{} failed to execute post-operation global checkpoint sync", shard.shardId()), e);
}
}), e -> handleException(primaryShardReference, e)
), primaryShardReference).execute();
}
}
referenceClosingListener.onResponse(response);
}, referenceClosingListener::onFailure);
new ReplicationOperation<>(primaryRequest.getRequest(), primaryShardReference,
ActionListener.wrap(result -> result.respond(globalCheckpointSyncingListener), referenceClosingListener::onFailure),
newReplicasProxy(), logger, actionName, primaryRequest.getPrimaryTerm()).execute();
}
} catch (Exception e) {
handleException(primaryShardReference, e);
@ -406,12 +404,6 @@ public abstract class TransportReplicationAction<
onCompletionListener.onFailure(e);
}
protected ReplicationOperation<Request, ReplicaRequest, PrimaryResult<ReplicaRequest, Response>> createReplicatedOperation(
Request request, ActionListener<PrimaryResult<ReplicaRequest, Response>> listener,
PrimaryShardReference primaryShardReference) {
return new ReplicationOperation<>(request, primaryShardReference, listener,
newReplicasProxy(), logger, actionName, primaryRequest.getPrimaryTerm());
}
}
public static class PrimaryResult<ReplicaRequest extends ReplicationRequest<ReplicaRequest>,

View File

@ -143,6 +143,7 @@ public class TransportReplicationActionTests extends ESTestCase {
if (requestOrWrappedRequest instanceof TransportReplicationAction.ConcreteShardRequest) {
requestOrWrappedRequest = ((TransportReplicationAction.ConcreteShardRequest<?>)requestOrWrappedRequest).getRequest();
}
//noinspection unchecked
return (R) requestOrWrappedRequest;
}
@ -209,7 +210,7 @@ public class TransportReplicationActionTests extends ESTestCase {
setState(clusterService, ClusterState.builder(clusterService.state()).blocks(blocks).build());
}
public void testBlocksInReroutePhase() throws Exception {
public void testBlocksInReroutePhase() {
final ClusterBlock nonRetryableBlock =
new ClusterBlock(1, "non retryable", false, true, false, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL);
final ClusterBlock retryableBlock =
@ -290,7 +291,6 @@ public class TransportReplicationActionTests extends ESTestCase {
TestAction testActionWithNoBlocks = new TestAction(Settings.EMPTY, "internal:testActionWithNoBlocks", transportService,
clusterService, shardStateAction, threadPool);
listener = new PlainActionFuture<>();
TestAction.ReroutePhase reroutePhase = testActionWithNoBlocks.new ReroutePhase(task, requestWithTimeout, listener);
reroutePhase.run();
assertListenerThrows("should fail with an IndexNotFoundException when no blocks", listener, IndexNotFoundException.class);
@ -350,7 +350,7 @@ public class TransportReplicationActionTests extends ESTestCase {
assertEquals(0, count.get());
}
public void testNotStartedPrimary() throws InterruptedException, ExecutionException {
public void testNotStartedPrimary() {
final String index = "test";
final ShardId shardId = new ShardId(index, "_na_", 0);
// no replicas in oder to skip the replication part
@ -399,7 +399,7 @@ public class TransportReplicationActionTests extends ESTestCase {
* This test checks that replication request is not routed back from relocation target to relocation source in case of
* stale index routing table on relocation target.
*/
public void testNoRerouteOnStaleClusterState() throws InterruptedException, ExecutionException {
public void testNoRerouteOnStaleClusterState() {
final String index = "test";
final ShardId shardId = new ShardId(index, "_na_", 0);
ClusterState state = state(index, true, ShardRoutingState.RELOCATING);
@ -441,7 +441,7 @@ public class TransportReplicationActionTests extends ESTestCase {
assertIndexShardCounter(0);
}
public void testUnknownIndexOrShardOnReroute() throws InterruptedException {
public void testUnknownIndexOrShardOnReroute() {
final String index = "test";
// no replicas in oder to skip the replication part
setState(clusterService, state(index, true,
@ -462,10 +462,9 @@ public class TransportReplicationActionTests extends ESTestCase {
reroutePhase.run();
assertListenerThrows("must throw shard not found exception", listener, ShardNotFoundException.class);
assertFalse(request.isRetrySet.get()); //TODO I'd have expected this to be true but we fail too early?
}
public void testClosedIndexOnReroute() throws InterruptedException {
public void testClosedIndexOnReroute() {
final String index = "test";
// no replicas in oder to skip the replication part
ClusterStateChanges clusterStateChanges = new ClusterStateChanges(xContentRegistry(), threadPool);
@ -488,7 +487,7 @@ public class TransportReplicationActionTests extends ESTestCase {
assertFalse(request.isRetrySet.get());
}
public void testStalePrimaryShardOnReroute() throws InterruptedException {
public void testStalePrimaryShardOnReroute() {
final String index = "test";
final ShardId shardId = new ShardId(index, "_na_", 0);
// no replicas in order to skip the replication part
@ -596,23 +595,17 @@ public class TransportReplicationActionTests extends ESTestCase {
}
final TransportReplicationAction.ConcreteShardRequest<Request> primaryRequest
= new TransportReplicationAction.ConcreteShardRequest<>(request, primaryShard.allocationId().getId(), primaryTerm);
action.new AsyncPrimaryAction(primaryRequest, listener, task) {
new TestAction(Settings.EMPTY, "internal:testAction2", transportService, clusterService, shardStateAction, threadPool) {
@Override
protected ReplicationOperation<Request, Request, TransportReplicationAction.PrimaryResult<Request, TestResponse>>
createReplicatedOperation(
Request request,
ActionListener<TransportReplicationAction.PrimaryResult<Request, TestResponse>> actionListener,
TransportReplicationAction<Request, Request, TestResponse>.PrimaryShardReference primaryShardReference) {
return new NoopReplicationOperation(request, actionListener, primaryTerm) {
@Override
public void execute() throws Exception {
assertPhase(task, "primary");
assertFalse(executed.getAndSet(true));
super.execute();
}
};
protected void shardOperationOnPrimary(Request shardRequest, IndexShard primary,
ActionListener<PrimaryResult<Request, TestResponse>> listener) {
assertPhase(task, "primary");
assertFalse(executed.getAndSet(true));
super.shardOperationOnPrimary(shardRequest, primary, listener);
}
}.run();
}.new AsyncPrimaryAction(primaryRequest, listener, task).run();
if (executeOnPrimary) {
assertTrue(executed.get());
assertTrue(listener.isDone());
@ -626,9 +619,12 @@ public class TransportReplicationActionTests extends ESTestCase {
transport.capturedRequestsByTargetNode().get(primaryShard.relocatingNodeId());
assertThat(requests, notNullValue());
assertThat(requests.size(), equalTo(1));
assertThat("primary request was not delegated to relocation target", requests.get(0).action, equalTo("internal:testAction[p]"));
assertThat("primary term not properly set on primary delegation",
((TransportReplicationAction.ConcreteShardRequest<Request>)requests.get(0).request).getPrimaryTerm(), equalTo(primaryTerm));
assertThat("primary request was not delegated to relocation target",
requests.get(0).action, equalTo("internal:testAction2[p]"));
//noinspection unchecked
final TransportReplicationAction.ConcreteShardRequest<Request> concreteShardRequest
= (TransportReplicationAction.ConcreteShardRequest<Request>) requests.get(0).request;
assertThat("primary term not properly set on primary delegation", concreteShardRequest.getPrimaryTerm(), equalTo(primaryTerm));
assertPhase(task, "primary_delegation");
transport.handleResponse(requests.get(0).requestId, new TestResponse());
assertTrue(listener.isDone());
@ -638,7 +634,7 @@ public class TransportReplicationActionTests extends ESTestCase {
}
}
public void testPrimaryPhaseExecutesDelegatedRequestOnRelocationTarget() throws Exception {
public void testPrimaryPhaseExecutesDelegatedRequestOnRelocationTarget() {
final String index = "test";
final ShardId shardId = new ShardId(index, "_na_", 0);
ClusterState state = state(index, true, ShardRoutingState.RELOCATING);
@ -654,34 +650,24 @@ public class TransportReplicationActionTests extends ESTestCase {
AtomicBoolean executed = new AtomicBoolean();
final TransportReplicationAction.ConcreteShardRequest<Request> primaryRequest
= new TransportReplicationAction.ConcreteShardRequest<>(request, primaryShard.allocationId().getRelocationId(), primaryTerm);
action.new AsyncPrimaryAction(primaryRequest, listener, task) {
@Override
protected ReplicationOperation<Request, Request, TransportReplicationAction.PrimaryResult<Request, TestResponse>>
createReplicatedOperation(
Request request,
ActionListener<TransportReplicationAction.PrimaryResult<Request, TestResponse>> actionListener,
TransportReplicationAction<Request, Request, TestResponse>.PrimaryShardReference primaryShardReference) {
return new NoopReplicationOperation(request, actionListener, primaryTerm) {
@Override
public void execute() throws Exception {
assertPhase(task, "primary");
assertFalse(executed.getAndSet(true));
super.execute();
}
};
}
new TestAction(Settings.EMPTY, "internal:testAction2", transportService, clusterService, shardStateAction, threadPool) {
@Override
public void onFailure(Exception e) {
throw new RuntimeException(e);
protected void shardOperationOnPrimary(Request shardRequest, IndexShard primary,
ActionListener<PrimaryResult<Request, TestResponse>> listener) {
assertPhase(task, "primary");
assertFalse(executed.getAndSet(true));
super.shardOperationOnPrimary(shardRequest, primary, listener);
}
}.run();
}.new AsyncPrimaryAction(primaryRequest, listener, task).run();
assertThat(executed.get(), equalTo(true));
assertPhase(task, "finished");
assertFalse(request.isRetrySet.get());
assertTrue(listener.isDone());
listener.actionGet(); // throws no exception
}
public void testPrimaryReference() throws Exception {
public void testPrimaryReference() {
final IndexShard shard = mock(IndexShard.class);
AtomicBoolean closed = new AtomicBoolean();
@ -789,6 +775,7 @@ public class TransportReplicationActionTests extends ESTestCase {
inSyncIds,
shardRoutingTable.getAllAllocationIds()));
doAnswer(invocation -> {
//noinspection unchecked
((ActionListener<Releasable>)invocation.getArguments()[0]).onResponse(() -> {});
return null;
}).when(shard).acquirePrimaryOperationPermit(any(), anyString(), anyObject());
@ -805,6 +792,7 @@ public class TransportReplicationActionTests extends ESTestCase {
action.handlePrimaryRequest(concreteShardRequest, createTransportChannel(listener), null);
CapturingTransport.CapturedRequest[] requestsToReplicas = transport.capturedRequests();
assertThat(requestsToReplicas, arrayWithSize(1));
//noinspection unchecked
assertThat(((TransportReplicationAction.ConcreteShardRequest<Request>) requestsToReplicas[0].request).getPrimaryTerm(),
equalTo(primaryTerm));
}
@ -821,47 +809,38 @@ public class TransportReplicationActionTests extends ESTestCase {
Request request = new Request(shardId);
PlainActionFuture<TestResponse> listener = new PlainActionFuture<>();
ReplicationTask task = maybeTask();
int i = randomInt(3);
final boolean throwExceptionOnCreation = i == 1;
final boolean throwExceptionOnRun = i == 2;
final boolean respondWithError = i == 3;
int i = randomInt(2);
final boolean throwExceptionOnRun = i == 1;
final boolean respondWithError = i == 2;
final TransportReplicationAction.ConcreteShardRequest<Request> primaryRequest
= new TransportReplicationAction.ConcreteShardRequest<>(request, primaryShard.allocationId().getId(), primaryTerm);
action.new AsyncPrimaryAction(primaryRequest, listener, task) {
new TestAction(Settings.EMPTY, "internal:testAction2", transportService, clusterService, shardStateAction, threadPool) {
@Override
protected ReplicationOperation<Request, Request, TransportReplicationAction.PrimaryResult<Request, TestResponse>>
createReplicatedOperation(
Request request,
ActionListener<TransportReplicationAction.PrimaryResult<Request, TestResponse>> actionListener,
TransportReplicationAction<Request, Request, TestResponse>.PrimaryShardReference primaryShardReference) {
protected void shardOperationOnPrimary(Request shardRequest, IndexShard primary,
ActionListener<PrimaryResult<Request, TestResponse>> listener) {
assertIndexShardCounter(1);
if (throwExceptionOnCreation) {
throw new ElasticsearchException("simulated exception, during createReplicatedOperation");
if (throwExceptionOnRun) {
throw new ElasticsearchException("simulated exception, during shardOperationOnPrimary");
} else if (respondWithError) {
listener.onFailure(new ElasticsearchException("simulated exception, as a response"));
} else {
super.shardOperationOnPrimary(request, primary, listener);
}
return new NoopReplicationOperation(request, actionListener, primaryTerm) {
@Override
public void execute() throws Exception {
assertIndexShardCounter(1);
assertPhase(task, "primary");
if (throwExceptionOnRun) {
throw new ElasticsearchException("simulated exception, during performOnPrimary");
} else if (respondWithError) {
this.resultListener.onFailure(new ElasticsearchException("simulated exception, as a response"));
} else {
super.execute();
}
}
};
}
}.run();
}.new AsyncPrimaryAction(primaryRequest, listener, task).run();
assertIndexShardCounter(0);
assertTrue(listener.isDone());
assertPhase(task, "finished");
try {
listener.get();
if (throwExceptionOnRun || respondWithError) {
fail("expected exception, but none was thrown");
}
} catch (ExecutionException e) {
if (throwExceptionOnCreation || throwExceptionOnRun || respondWithError) {
if (throwExceptionOnRun || respondWithError) {
Throwable cause = e.getCause();
assertThat(cause, instanceOf(ElasticsearchException.class));
assertThat(cause.getMessage(), containsString("simulated"));
@ -871,7 +850,7 @@ public class TransportReplicationActionTests extends ESTestCase {
}
}
public void testReplicasCounter() throws Exception {
public void testReplicasCounter() {
final ShardId shardId = new ShardId("test", "_na_", 0);
final ClusterState state = state(shardId.getIndexName(), true, ShardRoutingState.STARTED, ShardRoutingState.STARTED);
setState(clusterService, state);
@ -909,7 +888,7 @@ public class TransportReplicationActionTests extends ESTestCase {
* This test ensures that replication operations adhere to the {@link IndexMetaData#SETTING_WAIT_FOR_ACTIVE_SHARDS} setting
* when the request is using the default value for waitForActiveShards.
*/
public void testDefaultWaitForActiveShardsUsesIndexSetting() throws Exception {
public void testDefaultWaitForActiveShardsUsesIndexSetting() {
final String indexName = "test";
final ShardId shardId = new ShardId(indexName, "_na_", 0);
@ -1167,9 +1146,9 @@ public class TransportReplicationActionTests extends ESTestCase {
}
public static class Request extends ReplicationRequest<Request> {
public AtomicBoolean processedOnPrimary = new AtomicBoolean();
public AtomicInteger processedOnReplicas = new AtomicInteger();
public AtomicBoolean isRetrySet = new AtomicBoolean(false);
AtomicBoolean processedOnPrimary = new AtomicBoolean();
AtomicInteger processedOnReplicas = new AtomicInteger();
AtomicBoolean isRetrySet = new AtomicBoolean(false);
Request(StreamInput in) throws IOException {
super(in);
@ -1284,6 +1263,7 @@ public class TransportReplicationActionTests extends ESTestCase {
return indexService;
}
@SuppressWarnings("unchecked")
private IndexShard mockIndexShard(ShardId shardId, ClusterService clusterService) {
final IndexShard indexShard = mock(IndexShard.class);
when(indexShard.shardId()).thenReturn(shardId);
@ -1319,23 +1299,12 @@ public class TransportReplicationActionTests extends ESTestCase {
doThrow(new AssertionError("failed shard is not supported")).when(indexShard).failShard(anyString(), any(Exception.class));
when(indexShard.getPendingPrimaryTerm()).thenAnswer(i ->
clusterService.state().metaData().getIndexSafe(shardId.getIndex()).primaryTerm(shardId.id()));
ReplicationGroup replicationGroup = mock(ReplicationGroup.class);
when(indexShard.getReplicationGroup()).thenReturn(replicationGroup);
return indexShard;
}
class NoopReplicationOperation extends ReplicationOperation<Request, Request, TestAction.PrimaryResult<Request, TestResponse>> {
NoopReplicationOperation(Request request, ActionListener<TestAction.PrimaryResult<Request, TestResponse>> listener,
long primaryTerm) {
super(request, null, listener, null, TransportReplicationActionTests.this.logger, "noop", primaryTerm);
}
@Override
public void execute() throws Exception {
// Using the diamond operator (<>) prevents Eclipse from being able to compile this code
this.resultListener.onResponse(new TransportReplicationAction.PrimaryResult<Request, TestResponse>(null, new TestResponse()));
}
}
/**
* Transport channel that is needed for replica operation testing.
*/
@ -1348,12 +1317,12 @@ public class TransportReplicationActionTests extends ESTestCase {
}
@Override
public void sendResponse(TransportResponse response) throws IOException {
public void sendResponse(TransportResponse response) {
listener.onResponse(((TestResponse) response));
}
@Override
public void sendResponse(Exception exception) throws IOException {
public void sendResponse(Exception exception) {
listener.onFailure(exception);
}