Remove test-only customisation from TransReplAct (#40863)

The `getIndexShard()` and `sendReplicaRequest()` methods in
TransportReplicationAction are effectively only used to customise some
behaviour in tests. However there are other ways to do this that do not cause
such an obstacle to separating the TransportReplicationAction into its two
halves (see #40706).

This commit removes these customisation points and injects the test-only
behaviour using other techniques.
This commit is contained in:
David Turner 2019-04-05 08:50:44 +01:00
parent 669d72e47a
commit d8956d2601
5 changed files with 108 additions and 114 deletions

View File

@ -29,7 +29,6 @@ import org.elasticsearch.action.support.replication.TransportWriteAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
@ -74,19 +73,6 @@ public class TransportResyncReplicationAction extends TransportWriteAction<Resyn
return new ResyncActionReplicasProxy(primaryTerm);
}
@Override
protected void sendReplicaRequest(
final ConcreteReplicaRequest<ResyncReplicationRequest> replicaRequest,
final DiscoveryNode node,
final ActionListener<ReplicationOperation.ReplicaResponse> listener) {
if (node.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
super.sendReplicaRequest(replicaRequest, node, listener);
} else {
final long pre60NodeCheckpoint = SequenceNumbers.PRE_60_NODE_CHECKPOINT;
listener.onResponse(new ReplicaResponse(pre60NodeCheckpoint, pre60NodeCheckpoint));
}
}
@Override
protected ClusterBlockLevel globalBlockLevel() {
// resync should never be blocked because it's an internal action

View File

@ -619,7 +619,7 @@ public abstract class TransportReplicationAction<
}
}
protected IndexShard getIndexShard(final ShardId shardId) {
private IndexShard getIndexShard(final ShardId shardId) {
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
return indexService.getShard(shardId.id());
}
@ -1058,7 +1058,12 @@ public abstract class TransportReplicationAction<
}
final ConcreteReplicaRequest<ReplicaRequest> replicaRequest = new ConcreteReplicaRequest<>(
request, replica.allocationId().getId(), primaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes);
sendReplicaRequest(replicaRequest, node, listener);
final ActionListenerResponseHandler<ReplicaResponse> handler = new ActionListenerResponseHandler<>(listener, in -> {
ReplicaResponse replicaResponse = new ReplicaResponse();
replicaResponse.readFrom(in);
return replicaResponse;
});
transportService.sendRequest(node, transportReplicaAction, replicaRequest, transportOptions, handler);
}
@Override
@ -1080,25 +1085,6 @@ public abstract class TransportReplicationAction<
}
}
/**
* Sends the specified replica request to the specified node.
*
* @param replicaRequest the replica request
* @param node the node to send the request to
* @param listener callback for handling the response or failure
*/
protected void sendReplicaRequest(
final ConcreteReplicaRequest<ReplicaRequest> replicaRequest,
final DiscoveryNode node,
final ActionListener<ReplicationOperation.ReplicaResponse> listener) {
final ActionListenerResponseHandler<ReplicaResponse> handler = new ActionListenerResponseHandler<>(listener, in -> {
ReplicaResponse replicaResponse = new ReplicaResponse();
replicaResponse.readFrom(in);
return replicaResponse;
});
transportService.sendRequest(node, transportReplicaAction, replicaRequest, transportOptions, handler);
}
/** a wrapper class to encapsulate a request when being sent to a specific allocation id **/
public static class ConcreteShardRequest<R extends TransportRequest> extends TransportRequest {

View File

@ -22,16 +22,13 @@ package org.elasticsearch.index.seqno;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.replication.ReplicationOperation;
import org.elasticsearch.action.support.replication.ReplicationRequest;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.action.support.replication.TransportReplicationAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
@ -103,19 +100,6 @@ public class GlobalCheckpointSyncAction extends TransportReplicationAction<
return new ReplicationResponse();
}
@Override
protected void sendReplicaRequest(
final ConcreteReplicaRequest<Request> replicaRequest,
final DiscoveryNode node,
final ActionListener<ReplicationOperation.ReplicaResponse> listener) {
if (node.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
super.sendReplicaRequest(replicaRequest, node, listener);
} else {
final long pre60NodeCheckpoint = SequenceNumbers.PRE_60_NODE_CHECKPOINT;
listener.onResponse(new ReplicaResponse(pre60NodeCheckpoint, pre60NodeCheckpoint));
}
}
@Override
protected PrimaryResult<Request, ReplicationResponse> shardOperationOnPrimary(
final Request request, final IndexShard indexShard) throws Exception {

View File

@ -757,7 +757,7 @@ public class TransportReplicationActionTests extends ESTestCase {
assertEquals(0, shardFailedRequests.length);
}
public void testSeqNoIsSetOnPrimary() throws Exception {
public void testSeqNoIsSetOnPrimary() {
final String index = "test";
final ShardId shardId = new ShardId(index, "_na_", 0);
// we use one replica to check the primary term was set on the operation and sent to the replica
@ -788,14 +788,14 @@ public class TransportReplicationActionTests extends ESTestCase {
return null;
}).when(shard).acquirePrimaryOperationPermit(any(), anyString(), anyObject());
TestAction action =
new TestAction(Settings.EMPTY, "internal:testSeqNoIsSetOnPrimary", transportService, clusterService, shardStateAction,
threadPool) {
@Override
protected IndexShard getIndexShard(ShardId shardId) {
return shard;
}
};
final IndexService indexService = mock(IndexService.class);
when(indexService.getShard(shard.shardId().id())).thenReturn(shard);
final IndicesService indicesService = mock(IndicesService.class);
when(indicesService.indexServiceSafe(shard.shardId().getIndex())).thenReturn(indexService);
TestAction action = new TestAction(Settings.EMPTY, "internal:testSeqNoIsSetOnPrimary", transportService, clusterService,
shardStateAction, threadPool, indicesService);
action.handlePrimaryRequest(concreteShardRequest, createTransportChannel(listener), null);
CapturingTransport.CapturedRequest[] requestsToReplicas = transport.capturedRequests();
@ -1207,11 +1207,16 @@ public class TransportReplicationActionTests extends ESTestCase {
private class TestAction extends TransportReplicationAction<Request, Request, TestResponse> {
TestAction(Settings settings, String actionName, TransportService transportService,
ClusterService clusterService, ShardStateAction shardStateAction,
ThreadPool threadPool) {
super(settings, actionName, transportService, clusterService, mockIndicesService(clusterService), threadPool,
this(settings, actionName, transportService, clusterService, shardStateAction, threadPool, mockIndicesService(clusterService));
}
TestAction(Settings settings, String actionName, TransportService transportService,
ClusterService clusterService, ShardStateAction shardStateAction,
ThreadPool threadPool, IndicesService indicesService) {
super(settings, actionName, transportService, clusterService, indicesService, threadPool,
shardStateAction,
new ActionFilters(new HashSet<>()), new IndexNameExpressionResolver(),
Request::new, Request::new, ThreadPool.Names.SAME);
@ -1241,7 +1246,7 @@ public class TransportReplicationActionTests extends ESTestCase {
}
}
final IndicesService mockIndicesService(ClusterService clusterService) {
private IndicesService mockIndicesService(ClusterService clusterService) {
final IndicesService indicesService = mock(IndicesService.class);
when(indicesService.indexServiceSafe(any(Index.class))).then(invocation -> {
Index index = (Index)invocation.getArguments()[0];
@ -1261,7 +1266,7 @@ public class TransportReplicationActionTests extends ESTestCase {
return indicesService;
}
final IndexService mockIndexService(final IndexMetaData indexMetaData, ClusterService clusterService) {
private IndexService mockIndexService(final IndexMetaData indexMetaData, ClusterService clusterService) {
final IndexService indexService = mock(IndexService.class);
when(indexService.getShard(anyInt())).then(invocation -> {
int shard = (Integer) invocation.getArguments()[0];

View File

@ -41,18 +41,23 @@ import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.InternalEngineFactory;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardTestCase;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.test.transport.MockTransport;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportMessageListener;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;
import org.junit.After;
import org.junit.Before;
@ -70,6 +75,7 @@ import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_CREATION_DATE;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_INDEX_UUID;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
@ -78,6 +84,8 @@ import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_VERSION_C
import static org.elasticsearch.cluster.routing.TestShardRouting.newShardRouting;
import static org.elasticsearch.test.ClusterServiceUtils.createClusterService;
import static org.elasticsearch.test.ClusterServiceUtils.setState;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.endsWith;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasItem;
@ -85,6 +93,9 @@ import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.startsWith;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
@ -163,7 +174,49 @@ public class TransportReplicationAllPermitsAcquisitionTests extends IndexShardTe
setState(clusterService, state.build());
final Settings transportSettings = Settings.builder().put("node.name", node1.getId()).build();
transportService = MockTransportService.createNewService(transportSettings, Version.CURRENT, threadPool, null);
MockTransport transport = new MockTransport() {
@Override
protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode node) {
assertThat(action, allOf(startsWith("cluster:admin/test/"), endsWith("[r]")));
assertThat(node, equalTo(node2));
// node2 doesn't really exist, but we are performing some trickery in mockIndicesService() to pretend that node1 holds both
// the primary and the replica, so redirect the request back to node1.
transportService.sendRequest(transportService.getLocalNode(), action, request,
new TransportResponseHandler<TransportReplicationAction.ReplicaResponse>() {
@Override
public TransportReplicationAction.ReplicaResponse read(StreamInput in) throws IOException {
final TransportReplicationAction.ReplicaResponse replicaResponse
= new TransportReplicationAction.ReplicaResponse();
replicaResponse.readFrom(in);
return replicaResponse;
}
@SuppressWarnings("unchecked")
private TransportResponseHandler<TransportReplicationAction.ReplicaResponse> getResponseHandler() {
return (TransportResponseHandler<TransportReplicationAction.ReplicaResponse>)
getResponseHandlers().onResponseReceived(requestId, TransportMessageListener.NOOP_LISTENER);
}
@Override
public void handleResponse(TransportReplicationAction.ReplicaResponse response) {
getResponseHandler().handleResponse(response);
}
@Override
public void handleException(TransportException exp) {
getResponseHandler().handleException(exp);
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
});
}
};
transportService = transport.createTransportService(transportSettings, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR,
bta -> node1, null, emptySet());
transportService.start();
transportService.acceptIncomingRequests();
shardStateAction = new ShardStateAction(clusterService, transportService, null, null, threadPool);
@ -198,7 +251,8 @@ public class TransportReplicationAllPermitsAcquisitionTests extends IndexShardTe
final PlainActionFuture<Response> listener = new PlainActionFuture<>();
futures[threadId] = listener;
final TestAction singlePermitAction = new SinglePermitWithBlocksAction(Settings.EMPTY, "internalSinglePermit[" + threadId + "]",
final TestAction singlePermitAction = new SinglePermitWithBlocksAction(Settings.EMPTY,
"cluster:admin/test/single_permit[" + threadId + "]",
transportService, clusterService, shardStateAction, threadPool, shardId, primary, replica, globalBlock);
actions[threadId] = singlePermitAction;
@ -251,8 +305,8 @@ public class TransportReplicationAllPermitsAcquisitionTests extends IndexShardTe
logger.trace("now starting the operation that acquires all permits and sets the block in the cluster state");
// An action which acquires all operation permits during execution and set a block
final TestAction allPermitsAction = new AllPermitsThenBlockAction(Settings.EMPTY, "internalAllPermits", transportService,
clusterService, shardStateAction, threadPool, shardId, primary, replica);
final TestAction allPermitsAction = new AllPermitsThenBlockAction(Settings.EMPTY, "cluster:admin/test/all_permits",
transportService, clusterService, shardStateAction, threadPool, shardId, primary, replica);
final PlainActionFuture<Response> allPermitFuture = new PlainActionFuture<>();
Thread thread = new Thread(() -> {
@ -299,6 +353,7 @@ public class TransportReplicationAllPermitsAcquisitionTests extends IndexShardTe
}
final Response allPermitsResponse = allPermitFuture.get();
assertSuccessfulOperation(allPermitsAction, allPermitsResponse);
for (int i = 0; i < numOperations; i++) {
@ -357,18 +412,21 @@ public class TransportReplicationAllPermitsAcquisitionTests extends IndexShardTe
protected final ShardId shardId;
protected final IndexShard primary;
protected final IndexShard replica;
protected final SetOnce<Boolean> executedOnPrimary = new SetOnce<>();
protected final SetOnce<Boolean> executedOnReplica = new SetOnce<>();
final SetOnce<Boolean> executedOnPrimary;
final SetOnce<Boolean> executedOnReplica = new SetOnce<>();
TestAction(Settings settings, String actionName, TransportService transportService, ClusterService clusterService,
ShardStateAction shardStateAction, ThreadPool threadPool, ShardId shardId, IndexShard primary, IndexShard replica) {
super(settings, actionName, transportService, clusterService, null, threadPool, shardStateAction,
ShardStateAction shardStateAction, ThreadPool threadPool, ShardId shardId, IndexShard primary, IndexShard replica,
SetOnce<Boolean> executedOnPrimary) {
super(settings, actionName, transportService, clusterService, mockIndicesService(shardId, executedOnPrimary, primary, replica),
threadPool, shardStateAction,
new ActionFilters(new HashSet<>()), new IndexNameExpressionResolver(), Request::new, Request::new, ThreadPool.Names.SAME);
this.shardId = Objects.requireNonNull(shardId);
this.primary = Objects.requireNonNull(primary);
assertEquals(shardId, primary.shardId());
this.replica = Objects.requireNonNull(replica);
assertEquals(shardId, replica.shardId());
this.executedOnPrimary = executedOnPrimary;
}
@Override
@ -391,52 +449,25 @@ public class TransportReplicationAllPermitsAcquisitionTests extends IndexShardTe
@Override
protected ReplicaResult shardOperationOnReplica(Request shardRequest, IndexShard shard) throws Exception {
assertEquals("Replica is always assigned to node 2 in this test", clusterService.state().nodes().get("_node2").getId(),
shard.routingEntry().currentNodeId());
executedOnReplica.set(true);
// The TransportReplicationAction.getIndexShard() method is overridden for testing purpose but we double check here
// that the permit has been acquired on the replica shard
assertSame(replica, shard);
return new ReplicaResult();
}
}
@Override
protected IndexShard getIndexShard(final ShardId shardId) {
if (this.shardId.equals(shardId) == false) {
throw new AssertionError("shard id differs from " + shardId);
}
return (executedOnPrimary.get() == null) ? primary : replica;
}
private static IndicesService mockIndicesService(ShardId shardId, SetOnce<Boolean> executedOnPrimary, IndexShard primary,
IndexShard replica) {
final IndexService indexService = mock(IndexService.class);
when(indexService.getShard(shardId.id())).then(invocation -> (executedOnPrimary.get() == null) ? primary : replica);
@Override
protected void sendReplicaRequest(final ConcreteReplicaRequest<Request> replicaRequest,
final DiscoveryNode node,
final ActionListener<ReplicationOperation.ReplicaResponse> listener) {
assertEquals("Replica is always assigned to node 2 in this test", clusterService.state().nodes().get("_node2"), node);
try {
handleReplicaRequest(replicaRequest, new TransportChannel() {
@Override
public String getProfileName() {
return null;
}
final IndicesService indicesService = mock(IndicesService.class);
when(indicesService.indexServiceSafe(shardId.getIndex())).then(invocation -> indexService);
@Override
public String getChannelType() {
return null;
}
@Override
public void sendResponse(TransportResponse response) throws IOException {
listener.onResponse((ReplicationOperation.ReplicaResponse) response);
}
@Override
public void sendResponse(Exception exception) throws IOException {
listener.onFailure(exception);
}
}, null);
} catch (Exception e) {
listener.onFailure(e);
}
}
return indicesService;
}
/**
@ -452,7 +483,8 @@ public class TransportReplicationAllPermitsAcquisitionTests extends IndexShardTe
SinglePermitWithBlocksAction(Settings settings, String actionName, TransportService transportService, ClusterService clusterService,
ShardStateAction shardStateAction, ThreadPool threadPool,
ShardId shardId, IndexShard primary, IndexShard replica, boolean globalBlock) {
super(settings, actionName, transportService, clusterService, shardStateAction, threadPool, shardId, primary, replica);
super(settings, actionName, transportService, clusterService, shardStateAction, threadPool, shardId, primary, replica,
new SetOnce<>());
this.globalBlock = globalBlock;
}
@ -497,7 +529,8 @@ public class TransportReplicationAllPermitsAcquisitionTests extends IndexShardTe
AllPermitsThenBlockAction(Settings settings, String actionName, TransportService transportService, ClusterService clusterService,
ShardStateAction shardStateAction, ThreadPool threadPool,
ShardId shardId, IndexShard primary, IndexShard replica) {
super(settings, actionName, transportService, clusterService, shardStateAction, threadPool, shardId, primary, replica);
super(settings, actionName, transportService, clusterService, shardStateAction, threadPool, shardId, primary, replica,
new SetOnce<>());
}
@Override