From e64524c46fa3c9d36922eade57507d264a2d051d Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 3 Apr 2019 08:32:57 +0100 Subject: [PATCH] Remove some abstractions from `TransportReplicationAction` (#40706) `TransportReplicationAction` is a rather complex beast, and some of its concrete implementations do not need all of its features. More specifically, it (a) chases a primary around the cluster until it manages to pin it down and then (b) executes an action on that primary and all its replicas. There are some actions that are coordinated by the primary itself, meaning that there is no need for the chase-the-primary phases, and in the case of peer recovery retention leases and primary/replica resync it is important to bypass these first phases. This commit is a step towards separating the `TransportReplicationAction` into these two parts. It is a mostly mechanical sequence of steps to remove some abstractions that are no longer in use. --- .../refresh/TransportShardRefreshAction.java | 5 +- .../TransportResyncReplicationAction.java | 6 +- .../action/support/ChannelActionListener.java | 1 + .../TransportReplicationAction.java | 234 +++++------------- .../TransportReplicationActionTests.java | 38 +-- ...ReplicationAllPermitsAcquisitionTests.java | 39 +-- 6 files changed, 97 insertions(+), 226 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java index 0b5975cf025..df3ff16ff88 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java @@ -53,10 +53,11 @@ public class TransportShardRefreshAction } @Override - protected PrimaryResult shardOperationOnPrimary(BasicReplicationRequest shardRequest, IndexShard primary) { + protected PrimaryResult shardOperationOnPrimary( + BasicReplicationRequest shardRequest, IndexShard primary) { primary.refresh("api"); logger.trace("{} refresh request executed on primary", primary.shardId()); - return new PrimaryResult(shardRequest, new ReplicationResponse()); + return new PrimaryResult<>(shardRequest, new ReplicationResponse()); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java index 3f09f00b9ac..15c6feae9fc 100644 --- a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java @@ -67,16 +67,16 @@ public class TransportResyncReplicationAction extends TransportWriteAction request, Supplier replicaRequest, String executor) { - transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, new OperationTransportHandler()); + transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, this::handleOperationRequest); // we should never reject resync because of thread pool capacity on primary transportService.registerRequestHandler(transportPrimaryAction, () -> new ConcreteShardRequest<>(request), executor, true, true, - new PrimaryOperationTransportHandler()); + this::handlePrimaryRequest); transportService.registerRequestHandler(transportReplicaAction, () -> new ConcreteReplicaRequest<>(replicaRequest), executor, true, true, - new ReplicaOperationTransportHandler()); + this::handleReplicaRequest); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/support/ChannelActionListener.java b/server/src/main/java/org/elasticsearch/action/support/ChannelActionListener.java index b23758758e2..b93298eefe8 100644 --- a/server/src/main/java/org/elasticsearch/action/support/ChannelActionListener.java +++ b/server/src/main/java/org/elasticsearch/action/support/ChannelActionListener.java @@ -55,6 +55,7 @@ public final class ChannelActionListener< try { channel.sendResponse(e); } catch (Exception e1) { + e1.addSuppressed(e); logger.warn(() -> new ParameterizedMessage( "Failed to send error response for action [{}] and request [{}]", actionName, request), e1); } diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index f0ba0a520bd..101c116c4ac 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -30,6 +30,7 @@ import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.UnavailableShardsException; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActiveShardCount; +import org.elasticsearch.action.support.ChannelActionListener; import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.action.support.TransportActions; import org.elasticsearch.client.transport.NoNodeAvailableException; @@ -70,10 +71,8 @@ import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.TransportChannel; -import org.elasticsearch.transport.TransportChannelResponseHandler; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequest; -import org.elasticsearch.transport.TransportRequestHandler; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportResponse.Empty; @@ -155,14 +154,12 @@ public abstract class TransportReplicationAction< protected void registerRequestHandlers(String actionName, TransportService transportService, Supplier request, Supplier replicaRequest, String executor) { - transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, new OperationTransportHandler()); + transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, this::handleOperationRequest); transportService.registerRequestHandler(transportPrimaryAction, () -> new ConcreteShardRequest<>(request), executor, - new PrimaryOperationTransportHandler()); + this::handlePrimaryRequest); // we must never reject on because of thread pool capacity on replicas - transportService.registerRequestHandler(transportReplicaAction, - () -> new ConcreteReplicaRequest<>(replicaRequest), - executor, true, true, - new ReplicaOperationTransportHandler()); + transportService.registerRequestHandler( + transportReplicaAction, () -> new ConcreteReplicaRequest<>(replicaRequest), executor, true, true, this::handleReplicaRequest); } @Override @@ -272,71 +269,30 @@ public abstract class TransportReplicationAction< return false; } - protected class OperationTransportHandler implements TransportRequestHandler { - - public OperationTransportHandler() { - - } - - @Override - public void messageReceived(final Request request, final TransportChannel channel, Task task) throws Exception { - execute(task, request, new ActionListener() { - @Override - public void onResponse(Response result) { - try { - channel.sendResponse(result); - } catch (Exception e) { - onFailure(e); - } - } - - @Override - public void onFailure(Exception e) { - try { - channel.sendResponse(e); - } catch (Exception inner) { - inner.addSuppressed(e); - logger.warn(() -> new ParameterizedMessage("Failed to send response for {}", actionName), inner); - } - } - }); - } + protected void handleOperationRequest(final Request request, final TransportChannel channel, Task task) { + execute(task, request, new ChannelActionListener<>(channel, actionName, request)); } - protected class PrimaryOperationTransportHandler implements TransportRequestHandler> { - - public PrimaryOperationTransportHandler() { - - } - - @Override - public void messageReceived(ConcreteShardRequest request, TransportChannel channel, Task task) { - new AsyncPrimaryAction(request.request, request.targetAllocationID, request.primaryTerm, channel, (ReplicationTask) task).run(); - } + protected void handlePrimaryRequest(final ConcreteShardRequest request, final TransportChannel channel, final Task task) { + new AsyncPrimaryAction( + request, new ChannelActionListener<>(channel, transportPrimaryAction, request), (ReplicationTask) task).run(); } class AsyncPrimaryAction extends AbstractRunnable { - - private final Request request; - // targetAllocationID of the shard this request is meant for - private final String targetAllocationID; - // primary term of the shard this request is meant for - private final long primaryTerm; - private final TransportChannel channel; + private final ActionListener onCompletionListener; private final ReplicationTask replicationTask; + private final ConcreteShardRequest primaryRequest; - AsyncPrimaryAction(Request request, String targetAllocationID, long primaryTerm, TransportChannel channel, + AsyncPrimaryAction(ConcreteShardRequest primaryRequest, ActionListener onCompletionListener, ReplicationTask replicationTask) { - this.request = request; - this.targetAllocationID = targetAllocationID; - this.primaryTerm = primaryTerm; - this.channel = channel; + this.primaryRequest = primaryRequest; + this.onCompletionListener = onCompletionListener; this.replicationTask = replicationTask; } @Override protected void doRun() throws Exception { - final ShardId shardId = request.shardId(); + final ShardId shardId = primaryRequest.getRequest().shardId(); final IndexShard indexShard = getIndexShard(shardId); final ShardRouting shardRouting = indexShard.routingEntry(); // we may end up here if the cluster state used to route the primary is so stale that the underlying @@ -346,17 +302,17 @@ public abstract class TransportReplicationAction< throw new ReplicationOperation.RetryOnPrimaryException(shardId, "actual shard is not a primary " + shardRouting); } final String actualAllocationId = shardRouting.allocationId().getId(); - if (actualAllocationId.equals(targetAllocationID) == false) { - throw new ShardNotFoundException(shardId, "expected allocation id [{}] but found [{}]", targetAllocationID, - actualAllocationId); + if (actualAllocationId.equals(primaryRequest.getTargetAllocationID()) == false) { + throw new ShardNotFoundException(shardId, "expected allocation id [{}] but found [{}]", + primaryRequest.getTargetAllocationID(), actualAllocationId); } final long actualTerm = indexShard.getPendingPrimaryTerm(); - if (actualTerm != primaryTerm) { - throw new ShardNotFoundException(shardId, "expected allocation id [{}] with term [{}] but found [{}]", targetAllocationID, - primaryTerm, actualTerm); + if (actualTerm != primaryRequest.getPrimaryTerm()) { + throw new ShardNotFoundException(shardId, "expected allocation id [{}] with term [{}] but found [{}]", + primaryRequest.getTargetAllocationID(), primaryRequest.getPrimaryTerm(), actualTerm); } - acquirePrimaryOperationPermit(indexShard, request, ActionListener.wrap( + acquirePrimaryOperationPermit(indexShard, primaryRequest.getRequest(), ActionListener.wrap( releasable -> runWithPrimaryShardReference(new PrimaryShardReference(indexShard, releasable)), this::onFailure )); @@ -388,11 +344,10 @@ public abstract class TransportReplicationAction< }; DiscoveryNode relocatingNode = clusterState.nodes().get(primary.relocatingNodeId()); transportService.sendRequest(relocatingNode, transportPrimaryAction, - new ConcreteShardRequest<>(request, primary.allocationId().getRelocationId(), primaryTerm), + new ConcreteShardRequest<>(primaryRequest.getRequest(), primary.allocationId().getRelocationId(), + primaryRequest.getPrimaryTerm()), transportOptions, - new TransportChannelResponseHandler(logger, channel, "rerouting indexing to target primary " + primary, - reader) { - + new ActionListenerResponseHandler(onCompletionListener, reader) { @Override public void handleResponse(Response response) { setPhase(replicationTask, "finished"); @@ -408,7 +363,7 @@ public abstract class TransportReplicationAction< } else { setPhase(replicationTask, "primary"); final ActionListener listener = createResponseListener(primaryShardReference); - createReplicatedOperation(request, + createReplicatedOperation(primaryRequest.getRequest(), ActionListener.wrap(result -> result.respond(listener), listener::onFailure), primaryShardReference) .execute(); @@ -422,12 +377,7 @@ public abstract class TransportReplicationAction< @Override public void onFailure(Exception e) { setPhase(replicationTask, "finished"); - try { - channel.sendResponse(e); - } catch (IOException inner) { - inner.addSuppressed(e); - logger.warn("failed to send response", inner); - } + onCompletionListener.onFailure(e); } private ActionListener createResponseListener(final PrimaryShardReference primaryShardReference) { @@ -452,22 +402,14 @@ public abstract class TransportReplicationAction< } primaryShardReference.close(); // release shard operation lock before responding to caller setPhase(replicationTask, "finished"); - try { - channel.sendResponse(response); - } catch (IOException e) { - onFailure(e); - } + onCompletionListener.onResponse(response); } @Override public void onFailure(Exception e) { primaryShardReference.close(); // release shard operation lock before responding to caller setPhase(replicationTask, "finished"); - try { - channel.sendResponse(e); - } catch (IOException e1) { - logger.warn("failed to send response", e); - } + onCompletionListener.onFailure(e); } }; } @@ -476,7 +418,7 @@ public abstract class TransportReplicationAction< Request request, ActionListener> listener, PrimaryShardReference primaryShardReference) { return new ReplicationOperation<>(request, primaryShardReference, listener, - newReplicasProxy(primaryTerm), logger, actionName); + newReplicasProxy(primaryRequest.getPrimaryTerm()), logger, actionName); } } @@ -545,24 +487,10 @@ public abstract class TransportReplicationAction< } } - public class ReplicaOperationTransportHandler implements TransportRequestHandler> { - - @Override - public void messageReceived( - final ConcreteReplicaRequest replicaRequest, - final TransportChannel channel, - final Task task) - throws Exception { - new AsyncReplicaAction( - replicaRequest.getRequest(), - replicaRequest.getTargetAllocationID(), - replicaRequest.getPrimaryTerm(), - replicaRequest.getGlobalCheckpoint(), - replicaRequest.getMaxSeqNoOfUpdatesOrDeletes(), - channel, - (ReplicationTask) task).run(); - } - + protected void handleReplicaRequest(final ConcreteReplicaRequest replicaRequest, + final TransportChannel channel, final Task task) { + new AsyncReplicaAction( + replicaRequest, new ChannelActionListener<>(channel, transportReplicaAction, replicaRequest), (ReplicationTask) task).run(); } public static class RetryOnReplicaException extends ElasticsearchException { @@ -578,13 +506,7 @@ public abstract class TransportReplicationAction< } private final class AsyncReplicaAction extends AbstractRunnable implements ActionListener { - private final ReplicaRequest request; - // allocation id of the replica this request is meant for - private final String targetAllocationID; - private final long primaryTerm; - private final long globalCheckpoint; - private final long maxSeqNoOfUpdatesOrDeletes; - private final TransportChannel channel; + private final ActionListener onCompletionListener; private final IndexShard replica; /** * The task on the node with the replica shard. @@ -593,23 +515,14 @@ public abstract class TransportReplicationAction< // important: we pass null as a timeout as failing a replica is // something we want to avoid at all costs private final ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger, threadPool.getThreadContext()); + private final ConcreteReplicaRequest replicaRequest; - AsyncReplicaAction( - ReplicaRequest request, - String targetAllocationID, - long primaryTerm, - long globalCheckpoint, - long maxSeqNoOfUpdatesOrDeletes, - TransportChannel channel, - ReplicationTask task) { - this.request = request; - this.channel = channel; + AsyncReplicaAction(ConcreteReplicaRequest replicaRequest, ActionListener onCompletionListener, + ReplicationTask task) { + this.replicaRequest = replicaRequest; + this.onCompletionListener = onCompletionListener; this.task = task; - this.targetAllocationID = targetAllocationID; - this.primaryTerm = primaryTerm; - this.globalCheckpoint = globalCheckpoint; - this.maxSeqNoOfUpdatesOrDeletes = maxSeqNoOfUpdatesOrDeletes; - final ShardId shardId = request.shardId(); + final ShardId shardId = replicaRequest.getRequest().shardId(); assert shardId != null : "request shardId must be set"; this.replica = getIndexShard(shardId); } @@ -617,7 +530,7 @@ public abstract class TransportReplicationAction< @Override public void onResponse(Releasable releasable) { try { - final ReplicaResult replicaResult = shardOperationOnReplica(request, replica); + final ReplicaResult replicaResult = shardOperationOnReplica(replicaRequest.getRequest(), replica); releasable.close(); // release shard operation lock before responding to caller final TransportReplicationAction.ReplicaResponse response = new ReplicaResponse(replica.getLocalCheckpoint(), replica.getGlobalCheckpoint()); @@ -635,22 +548,17 @@ public abstract class TransportReplicationAction< () -> new ParameterizedMessage( "Retrying operation on replica, action [{}], request [{}]", transportReplicaAction, - request), + replicaRequest.getRequest()), e); - request.onRetry(); + replicaRequest.getRequest().onRetry(); observer.waitForNextChange(new ClusterStateObserver.Listener() { @Override public void onNewClusterState(ClusterState state) { // Forking a thread on local node via transport service so that custom transport service have an // opportunity to execute custom logic before the replica operation begins - String extraMessage = "action [" + transportReplicaAction + "], request[" + request + "]"; - TransportChannelResponseHandler handler = - new TransportChannelResponseHandler<>(logger, channel, extraMessage, - (in) -> TransportResponse.Empty.INSTANCE); transportService.sendRequest(clusterService.localNode(), transportReplicaAction, - new ConcreteReplicaRequest<>(request, targetAllocationID, primaryTerm, - globalCheckpoint, maxSeqNoOfUpdatesOrDeletes), - handler); + replicaRequest, + new ActionListenerResponseHandler<>(onCompletionListener, in -> new ReplicaResponse())); } @Override @@ -669,25 +577,20 @@ public abstract class TransportReplicationAction< } protected void responseWithFailure(Exception e) { - try { - setPhase(task, "finished"); - channel.sendResponse(e); - } catch (IOException responseException) { - responseException.addSuppressed(e); - logger.warn(() -> new ParameterizedMessage( - "failed to send error message back to client for action [{}]", transportReplicaAction), responseException); - } + setPhase(task, "finished"); + onCompletionListener.onFailure(e); } @Override protected void doRun() throws Exception { setPhase(task, "replica"); final String actualAllocationId = this.replica.routingEntry().allocationId().getId(); - if (actualAllocationId.equals(targetAllocationID) == false) { - throw new ShardNotFoundException(this.replica.shardId(), "expected allocation id [{}] but found [{}]", targetAllocationID, - actualAllocationId); + if (actualAllocationId.equals(replicaRequest.getTargetAllocationID()) == false) { + throw new ShardNotFoundException(this.replica.shardId(), "expected allocation id [{}] but found [{}]", + replicaRequest.getTargetAllocationID(), actualAllocationId); } - acquireReplicaOperationPermit(replica, request, this, primaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes); + acquireReplicaOperationPermit(replica, replicaRequest.getRequest(), this, replicaRequest.getPrimaryTerm(), + replicaRequest.getGlobalCheckpoint(), replicaRequest.getMaxSeqNoOfUpdatesOrDeletes()); } /** @@ -703,15 +606,12 @@ public abstract class TransportReplicationAction< @Override public void onResponse(Empty response) { if (logger.isTraceEnabled()) { - logger.trace("action [{}] completed on shard [{}] for request [{}]", transportReplicaAction, request.shardId(), - request); + logger.trace("action [{}] completed on shard [{}] for request [{}]", transportReplicaAction, + replicaRequest.getRequest().shardId(), + replicaRequest.getRequest()); } setPhase(task, "finished"); - try { - channel.sendResponse(replicaResponse); - } catch (Exception e) { - onFailure(e); - } + onCompletionListener.onResponse(replicaResponse); } @Override @@ -983,12 +883,13 @@ public abstract class TransportReplicationAction< replica.acquireReplicaOperationPermit(primaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, onAcquired, executor, request); } - class ShardReference implements Releasable { + class PrimaryShardReference implements Releasable, + ReplicationOperation.Primary> { protected final IndexShard indexShard; private final Releasable operationLock; - ShardReference(IndexShard indexShard, Releasable operationLock) { + PrimaryShardReference(IndexShard indexShard, Releasable operationLock) { this.indexShard = indexShard; this.operationLock = operationLock; } @@ -1006,15 +907,6 @@ public abstract class TransportReplicationAction< return indexShard.routingEntry(); } - } - - class PrimaryShardReference extends ShardReference - implements ReplicationOperation.Primary> { - - PrimaryShardReference(IndexShard indexShard, Releasable operationLock) { - super(indexShard, operationLock); - } - public boolean isRelocated() { return indexShard.isRelocatedPrimary(); } @@ -1029,8 +921,8 @@ public abstract class TransportReplicationAction< } @Override - public PrimaryResult perform(Request request) throws Exception { - PrimaryResult result = shardOperationOnPrimary(request, indexShard); + public PrimaryResult perform(Request request) throws Exception { + PrimaryResult result = shardOperationOnPrimary(request, indexShard); assert result.replicaRequest() == null || result.finalFailure == null : "a replica request [" + result.replicaRequest() + "] with a primary failure [" + result.finalFailure + "]"; return result; diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index ffc9c2bf70a..9164d9e4184 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -331,8 +331,10 @@ public class TransportReplicationActionTests extends ESTestCase { final ReplicationTask task = maybeTask(); final PlainActionFuture listener = new PlainActionFuture<>(); + final TransportReplicationAction.ConcreteShardRequest primaryRequest + = new TransportReplicationAction.ConcreteShardRequest<>(request, targetAllocationID, primaryTerm); final TransportReplicationAction.AsyncPrimaryAction asyncPrimaryActionWithBlocks = - actionWithBlocks.new AsyncPrimaryAction(request, targetAllocationID, primaryTerm, createTransportChannel(listener), task); + actionWithBlocks.new AsyncPrimaryAction(primaryRequest, listener, task); asyncPrimaryActionWithBlocks.run(); final ExecutionException exception = expectThrows(ExecutionException.class, listener::get); @@ -589,7 +591,9 @@ public class TransportReplicationActionTests extends ESTestCase { isRelocated.set(true); executeOnPrimary = false; } - action.new AsyncPrimaryAction(request, primaryShard.allocationId().getId(), primaryTerm, createTransportChannel(listener), task) { + final TransportReplicationAction.ConcreteShardRequest primaryRequest + = new TransportReplicationAction.ConcreteShardRequest<>(request, primaryShard.allocationId().getId(), primaryTerm); + action.new AsyncPrimaryAction(primaryRequest, listener, task) { @Override protected ReplicationOperation> createReplicatedOperation( @@ -645,8 +649,9 @@ public class TransportReplicationActionTests extends ESTestCase { PlainActionFuture listener = new PlainActionFuture<>(); ReplicationTask task = maybeTask(); AtomicBoolean executed = new AtomicBoolean(); - action.new AsyncPrimaryAction(request, primaryShard.allocationId().getRelocationId(), primaryTerm, - createTransportChannel(listener), task) { + final TransportReplicationAction.ConcreteShardRequest primaryRequest + = new TransportReplicationAction.ConcreteShardRequest<>(request, primaryShard.allocationId().getRelocationId(), primaryTerm); + action.new AsyncPrimaryAction(primaryRequest, listener, task) { @Override protected ReplicationOperation> createReplicatedOperation( @@ -792,9 +797,7 @@ public class TransportReplicationActionTests extends ESTestCase { } }; - TransportReplicationAction.PrimaryOperationTransportHandler primaryPhase = - action.new PrimaryOperationTransportHandler(); - primaryPhase.messageReceived(concreteShardRequest, createTransportChannel(listener), null); + action.handlePrimaryRequest(concreteShardRequest, createTransportChannel(listener), null); CapturingTransport.CapturedRequest[] requestsToReplicas = transport.capturedRequests(); assertThat(requestsToReplicas, arrayWithSize(1)); assertThat(((TransportReplicationAction.ConcreteShardRequest) requestsToReplicas[0].request).getPrimaryTerm(), @@ -817,7 +820,9 @@ public class TransportReplicationActionTests extends ESTestCase { final boolean throwExceptionOnCreation = i == 1; final boolean throwExceptionOnRun = i == 2; final boolean respondWithError = i == 3; - action.new AsyncPrimaryAction(request, primaryShard.allocationId().getId(), primaryTerm, createTransportChannel(listener), task) { + final TransportReplicationAction.ConcreteShardRequest primaryRequest + = new TransportReplicationAction.ConcreteShardRequest<>(request, primaryShard.allocationId().getId(), primaryTerm); + action.new AsyncPrimaryAction(primaryRequest, listener, task) { @Override protected ReplicationOperation> createReplicatedOperation( @@ -880,9 +885,8 @@ public class TransportReplicationActionTests extends ESTestCase { return new ReplicaResult(); } }; - final TestAction.ReplicaOperationTransportHandler replicaOperationTransportHandler = action.new ReplicaOperationTransportHandler(); try { - replicaOperationTransportHandler.messageReceived( + action.handleReplicaRequest( new TransportReplicationAction.ConcreteReplicaRequest<>( new Request().setShardId(shardId), replicaRouting.allocationId().getId(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()), @@ -938,7 +942,7 @@ public class TransportReplicationActionTests extends ESTestCase { final boolean wrongAllocationId = randomBoolean(); final long requestTerm = wrongAllocationId && randomBoolean() ? primaryTerm : primaryTerm + randomIntBetween(1, 10); Request request = new Request(shardId).timeout("1ms"); - action.new PrimaryOperationTransportHandler().messageReceived( + action.handlePrimaryRequest( new TransportReplicationAction.ConcreteShardRequest<>(request, wrongAllocationId ? "_not_a_valid_aid_" : primary.allocationId().getId(), requestTerm), @@ -973,7 +977,7 @@ public class TransportReplicationActionTests extends ESTestCase { PlainActionFuture listener = new PlainActionFuture<>(); Request request = new Request(shardId).timeout("1ms"); - action.new ReplicaOperationTransportHandler().messageReceived( + action.handleReplicaRequest( new TransportReplicationAction.ConcreteReplicaRequest<>(request, "_not_a_valid_aid_", randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()), createTransportChannel(listener), maybeTask() @@ -1015,12 +1019,11 @@ public class TransportReplicationActionTests extends ESTestCase { return new ReplicaResult(); } }; - final TestAction.ReplicaOperationTransportHandler replicaOperationTransportHandler = action.new ReplicaOperationTransportHandler(); final PlainActionFuture listener = new PlainActionFuture<>(); final Request request = new Request().setShardId(shardId); final long checkpoint = randomNonNegativeLong(); final long maxSeqNoOfUpdatesOrDeletes = randomNonNegativeLong(); - replicaOperationTransportHandler.messageReceived( + action.handleReplicaRequest( new TransportReplicationAction.ConcreteReplicaRequest<>(request, replica.allocationId().getId(), primaryTerm, checkpoint, maxSeqNoOfUpdatesOrDeletes), createTransportChannel(listener), task); @@ -1084,12 +1087,11 @@ public class TransportReplicationActionTests extends ESTestCase { return new ReplicaResult(); } }; - final TestAction.ReplicaOperationTransportHandler replicaOperationTransportHandler = action.new ReplicaOperationTransportHandler(); final PlainActionFuture listener = new PlainActionFuture<>(); final Request request = new Request().setShardId(shardId); final long checkpoint = randomNonNegativeLong(); final long maxSeqNoOfUpdates = randomNonNegativeLong(); - replicaOperationTransportHandler.messageReceived( + action.handleReplicaRequest( new TransportReplicationAction.ConcreteReplicaRequest<>(request, replica.allocationId().getId(), primaryTerm, checkpoint, maxSeqNoOfUpdates), createTransportChannel(listener), task); @@ -1221,10 +1223,10 @@ public class TransportReplicationActionTests extends ESTestCase { } @Override - protected PrimaryResult shardOperationOnPrimary(Request shardRequest, IndexShard primary) throws Exception { + protected PrimaryResult shardOperationOnPrimary(Request shardRequest, IndexShard primary) { boolean executedBefore = shardRequest.processedOnPrimary.getAndSet(true); assert executedBefore == false : "request has already been executed on the primary"; - return new PrimaryResult(shardRequest, new TestResponse()); + return new PrimaryResult<>(shardRequest, new TestResponse()); } @Override diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java index 1cb1bfde34e..8fe204cee2c 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java @@ -203,8 +203,10 @@ public class TransportReplicationAllPermitsAcquisitionTests extends IndexShardTe actions[threadId] = singlePermitAction; Thread thread = new Thread(() -> { + final TransportReplicationAction.ConcreteShardRequest primaryRequest + = new TransportReplicationAction.ConcreteShardRequest<>(request(), allocationId(), primaryTerm()); TransportReplicationAction.AsyncPrimaryAction asyncPrimaryAction = - singlePermitAction.new AsyncPrimaryAction(request(), allocationId(), primaryTerm(), transportChannel(listener), null) { + singlePermitAction.new AsyncPrimaryAction(primaryRequest, listener, null) { @Override protected void doRun() throws Exception { if (delayed) { @@ -254,8 +256,10 @@ public class TransportReplicationAllPermitsAcquisitionTests extends IndexShardTe final PlainActionFuture allPermitFuture = new PlainActionFuture<>(); Thread thread = new Thread(() -> { + final TransportReplicationAction.ConcreteShardRequest primaryRequest + = new TransportReplicationAction.ConcreteShardRequest<>(request(), allocationId(), primaryTerm()); TransportReplicationAction.AsyncPrimaryAction asyncPrimaryAction = - allPermitsAction.new AsyncPrimaryAction(request(), allocationId(), primaryTerm(), transportChannel(allPermitFuture), null) { + allPermitsAction.new AsyncPrimaryAction(primaryRequest, allPermitFuture, null) { @Override void runWithPrimaryShardReference(final TransportReplicationAction.PrimaryShardReference reference) { assertEquals("All permits must be acquired", 0, reference.indexShard.getActiveOperationsCount()); @@ -407,9 +411,8 @@ public class TransportReplicationAllPermitsAcquisitionTests extends IndexShardTe final DiscoveryNode node, final ActionListener listener) { assertEquals("Replica is always assigned to node 2 in this test", clusterService.state().nodes().get("_node2"), node); - ReplicaOperationTransportHandler replicaOperationTransportHandler = new ReplicaOperationTransportHandler(); try { - replicaOperationTransportHandler.messageReceived(replicaRequest, new TransportChannel() { + handleReplicaRequest(replicaRequest, new TransportChannel() { @Override public String getProfileName() { return null; @@ -530,32 +533,4 @@ public class TransportReplicationAllPermitsAcquisitionTests extends IndexShardTe static class Response extends ReplicationResponse { } - - /** - * Transport channel that is needed for replica operation testing. - */ - public TransportChannel transportChannel(final PlainActionFuture listener) { - return new TransportChannel() { - - @Override - public String getProfileName() { - return ""; - } - - @Override - public void sendResponse(TransportResponse response) throws IOException { - listener.onResponse(((Response) response)); - } - - @Override - public void sendResponse(Exception exception) throws IOException { - listener.onFailure(exception); - } - - @Override - public String getChannelType() { - return "replica_test"; - } - }; - } }