diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java index 0b5975cf025..df3ff16ff88 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java @@ -53,10 +53,11 @@ public class TransportShardRefreshAction } @Override - protected PrimaryResult shardOperationOnPrimary(BasicReplicationRequest shardRequest, IndexShard primary) { + protected PrimaryResult shardOperationOnPrimary( + BasicReplicationRequest shardRequest, IndexShard primary) { primary.refresh("api"); logger.trace("{} refresh request executed on primary", primary.shardId()); - return new PrimaryResult(shardRequest, new ReplicationResponse()); + return new PrimaryResult<>(shardRequest, new ReplicationResponse()); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java index 3f09f00b9ac..15c6feae9fc 100644 --- a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java @@ -67,16 +67,16 @@ public class TransportResyncReplicationAction extends TransportWriteAction request, Supplier replicaRequest, String executor) { - transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, new OperationTransportHandler()); + transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, this::handleOperationRequest); // we should never reject resync because of thread pool capacity on primary transportService.registerRequestHandler(transportPrimaryAction, () -> new ConcreteShardRequest<>(request), executor, true, true, - new PrimaryOperationTransportHandler()); + this::handlePrimaryRequest); transportService.registerRequestHandler(transportReplicaAction, () -> new ConcreteReplicaRequest<>(replicaRequest), executor, true, true, - new ReplicaOperationTransportHandler()); + this::handleReplicaRequest); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/support/ChannelActionListener.java b/server/src/main/java/org/elasticsearch/action/support/ChannelActionListener.java index b23758758e2..b93298eefe8 100644 --- a/server/src/main/java/org/elasticsearch/action/support/ChannelActionListener.java +++ b/server/src/main/java/org/elasticsearch/action/support/ChannelActionListener.java @@ -55,6 +55,7 @@ public final class ChannelActionListener< try { channel.sendResponse(e); } catch (Exception e1) { + e1.addSuppressed(e); logger.warn(() -> new ParameterizedMessage( "Failed to send error response for action [{}] and request [{}]", actionName, request), e1); } diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index f0ba0a520bd..101c116c4ac 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -30,6 +30,7 @@ import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.UnavailableShardsException; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActiveShardCount; +import org.elasticsearch.action.support.ChannelActionListener; import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.action.support.TransportActions; import org.elasticsearch.client.transport.NoNodeAvailableException; @@ -70,10 +71,8 @@ import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.TransportChannel; -import org.elasticsearch.transport.TransportChannelResponseHandler; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequest; -import org.elasticsearch.transport.TransportRequestHandler; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportResponse.Empty; @@ -155,14 +154,12 @@ public abstract class TransportReplicationAction< protected void registerRequestHandlers(String actionName, TransportService transportService, Supplier request, Supplier replicaRequest, String executor) { - transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, new OperationTransportHandler()); + transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, this::handleOperationRequest); transportService.registerRequestHandler(transportPrimaryAction, () -> new ConcreteShardRequest<>(request), executor, - new PrimaryOperationTransportHandler()); + this::handlePrimaryRequest); // we must never reject on because of thread pool capacity on replicas - transportService.registerRequestHandler(transportReplicaAction, - () -> new ConcreteReplicaRequest<>(replicaRequest), - executor, true, true, - new ReplicaOperationTransportHandler()); + transportService.registerRequestHandler( + transportReplicaAction, () -> new ConcreteReplicaRequest<>(replicaRequest), executor, true, true, this::handleReplicaRequest); } @Override @@ -272,71 +269,30 @@ public abstract class TransportReplicationAction< return false; } - protected class OperationTransportHandler implements TransportRequestHandler { - - public OperationTransportHandler() { - - } - - @Override - public void messageReceived(final Request request, final TransportChannel channel, Task task) throws Exception { - execute(task, request, new ActionListener() { - @Override - public void onResponse(Response result) { - try { - channel.sendResponse(result); - } catch (Exception e) { - onFailure(e); - } - } - - @Override - public void onFailure(Exception e) { - try { - channel.sendResponse(e); - } catch (Exception inner) { - inner.addSuppressed(e); - logger.warn(() -> new ParameterizedMessage("Failed to send response for {}", actionName), inner); - } - } - }); - } + protected void handleOperationRequest(final Request request, final TransportChannel channel, Task task) { + execute(task, request, new ChannelActionListener<>(channel, actionName, request)); } - protected class PrimaryOperationTransportHandler implements TransportRequestHandler> { - - public PrimaryOperationTransportHandler() { - - } - - @Override - public void messageReceived(ConcreteShardRequest request, TransportChannel channel, Task task) { - new AsyncPrimaryAction(request.request, request.targetAllocationID, request.primaryTerm, channel, (ReplicationTask) task).run(); - } + protected void handlePrimaryRequest(final ConcreteShardRequest request, final TransportChannel channel, final Task task) { + new AsyncPrimaryAction( + request, new ChannelActionListener<>(channel, transportPrimaryAction, request), (ReplicationTask) task).run(); } class AsyncPrimaryAction extends AbstractRunnable { - - private final Request request; - // targetAllocationID of the shard this request is meant for - private final String targetAllocationID; - // primary term of the shard this request is meant for - private final long primaryTerm; - private final TransportChannel channel; + private final ActionListener onCompletionListener; private final ReplicationTask replicationTask; + private final ConcreteShardRequest primaryRequest; - AsyncPrimaryAction(Request request, String targetAllocationID, long primaryTerm, TransportChannel channel, + AsyncPrimaryAction(ConcreteShardRequest primaryRequest, ActionListener onCompletionListener, ReplicationTask replicationTask) { - this.request = request; - this.targetAllocationID = targetAllocationID; - this.primaryTerm = primaryTerm; - this.channel = channel; + this.primaryRequest = primaryRequest; + this.onCompletionListener = onCompletionListener; this.replicationTask = replicationTask; } @Override protected void doRun() throws Exception { - final ShardId shardId = request.shardId(); + final ShardId shardId = primaryRequest.getRequest().shardId(); final IndexShard indexShard = getIndexShard(shardId); final ShardRouting shardRouting = indexShard.routingEntry(); // we may end up here if the cluster state used to route the primary is so stale that the underlying @@ -346,17 +302,17 @@ public abstract class TransportReplicationAction< throw new ReplicationOperation.RetryOnPrimaryException(shardId, "actual shard is not a primary " + shardRouting); } final String actualAllocationId = shardRouting.allocationId().getId(); - if (actualAllocationId.equals(targetAllocationID) == false) { - throw new ShardNotFoundException(shardId, "expected allocation id [{}] but found [{}]", targetAllocationID, - actualAllocationId); + if (actualAllocationId.equals(primaryRequest.getTargetAllocationID()) == false) { + throw new ShardNotFoundException(shardId, "expected allocation id [{}] but found [{}]", + primaryRequest.getTargetAllocationID(), actualAllocationId); } final long actualTerm = indexShard.getPendingPrimaryTerm(); - if (actualTerm != primaryTerm) { - throw new ShardNotFoundException(shardId, "expected allocation id [{}] with term [{}] but found [{}]", targetAllocationID, - primaryTerm, actualTerm); + if (actualTerm != primaryRequest.getPrimaryTerm()) { + throw new ShardNotFoundException(shardId, "expected allocation id [{}] with term [{}] but found [{}]", + primaryRequest.getTargetAllocationID(), primaryRequest.getPrimaryTerm(), actualTerm); } - acquirePrimaryOperationPermit(indexShard, request, ActionListener.wrap( + acquirePrimaryOperationPermit(indexShard, primaryRequest.getRequest(), ActionListener.wrap( releasable -> runWithPrimaryShardReference(new PrimaryShardReference(indexShard, releasable)), this::onFailure )); @@ -388,11 +344,10 @@ public abstract class TransportReplicationAction< }; DiscoveryNode relocatingNode = clusterState.nodes().get(primary.relocatingNodeId()); transportService.sendRequest(relocatingNode, transportPrimaryAction, - new ConcreteShardRequest<>(request, primary.allocationId().getRelocationId(), primaryTerm), + new ConcreteShardRequest<>(primaryRequest.getRequest(), primary.allocationId().getRelocationId(), + primaryRequest.getPrimaryTerm()), transportOptions, - new TransportChannelResponseHandler(logger, channel, "rerouting indexing to target primary " + primary, - reader) { - + new ActionListenerResponseHandler(onCompletionListener, reader) { @Override public void handleResponse(Response response) { setPhase(replicationTask, "finished"); @@ -408,7 +363,7 @@ public abstract class TransportReplicationAction< } else { setPhase(replicationTask, "primary"); final ActionListener listener = createResponseListener(primaryShardReference); - createReplicatedOperation(request, + createReplicatedOperation(primaryRequest.getRequest(), ActionListener.wrap(result -> result.respond(listener), listener::onFailure), primaryShardReference) .execute(); @@ -422,12 +377,7 @@ public abstract class TransportReplicationAction< @Override public void onFailure(Exception e) { setPhase(replicationTask, "finished"); - try { - channel.sendResponse(e); - } catch (IOException inner) { - inner.addSuppressed(e); - logger.warn("failed to send response", inner); - } + onCompletionListener.onFailure(e); } private ActionListener createResponseListener(final PrimaryShardReference primaryShardReference) { @@ -452,22 +402,14 @@ public abstract class TransportReplicationAction< } primaryShardReference.close(); // release shard operation lock before responding to caller setPhase(replicationTask, "finished"); - try { - channel.sendResponse(response); - } catch (IOException e) { - onFailure(e); - } + onCompletionListener.onResponse(response); } @Override public void onFailure(Exception e) { primaryShardReference.close(); // release shard operation lock before responding to caller setPhase(replicationTask, "finished"); - try { - channel.sendResponse(e); - } catch (IOException e1) { - logger.warn("failed to send response", e); - } + onCompletionListener.onFailure(e); } }; } @@ -476,7 +418,7 @@ public abstract class TransportReplicationAction< Request request, ActionListener> listener, PrimaryShardReference primaryShardReference) { return new ReplicationOperation<>(request, primaryShardReference, listener, - newReplicasProxy(primaryTerm), logger, actionName); + newReplicasProxy(primaryRequest.getPrimaryTerm()), logger, actionName); } } @@ -545,24 +487,10 @@ public abstract class TransportReplicationAction< } } - public class ReplicaOperationTransportHandler implements TransportRequestHandler> { - - @Override - public void messageReceived( - final ConcreteReplicaRequest replicaRequest, - final TransportChannel channel, - final Task task) - throws Exception { - new AsyncReplicaAction( - replicaRequest.getRequest(), - replicaRequest.getTargetAllocationID(), - replicaRequest.getPrimaryTerm(), - replicaRequest.getGlobalCheckpoint(), - replicaRequest.getMaxSeqNoOfUpdatesOrDeletes(), - channel, - (ReplicationTask) task).run(); - } - + protected void handleReplicaRequest(final ConcreteReplicaRequest replicaRequest, + final TransportChannel channel, final Task task) { + new AsyncReplicaAction( + replicaRequest, new ChannelActionListener<>(channel, transportReplicaAction, replicaRequest), (ReplicationTask) task).run(); } public static class RetryOnReplicaException extends ElasticsearchException { @@ -578,13 +506,7 @@ public abstract class TransportReplicationAction< } private final class AsyncReplicaAction extends AbstractRunnable implements ActionListener { - private final ReplicaRequest request; - // allocation id of the replica this request is meant for - private final String targetAllocationID; - private final long primaryTerm; - private final long globalCheckpoint; - private final long maxSeqNoOfUpdatesOrDeletes; - private final TransportChannel channel; + private final ActionListener onCompletionListener; private final IndexShard replica; /** * The task on the node with the replica shard. @@ -593,23 +515,14 @@ public abstract class TransportReplicationAction< // important: we pass null as a timeout as failing a replica is // something we want to avoid at all costs private final ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger, threadPool.getThreadContext()); + private final ConcreteReplicaRequest replicaRequest; - AsyncReplicaAction( - ReplicaRequest request, - String targetAllocationID, - long primaryTerm, - long globalCheckpoint, - long maxSeqNoOfUpdatesOrDeletes, - TransportChannel channel, - ReplicationTask task) { - this.request = request; - this.channel = channel; + AsyncReplicaAction(ConcreteReplicaRequest replicaRequest, ActionListener onCompletionListener, + ReplicationTask task) { + this.replicaRequest = replicaRequest; + this.onCompletionListener = onCompletionListener; this.task = task; - this.targetAllocationID = targetAllocationID; - this.primaryTerm = primaryTerm; - this.globalCheckpoint = globalCheckpoint; - this.maxSeqNoOfUpdatesOrDeletes = maxSeqNoOfUpdatesOrDeletes; - final ShardId shardId = request.shardId(); + final ShardId shardId = replicaRequest.getRequest().shardId(); assert shardId != null : "request shardId must be set"; this.replica = getIndexShard(shardId); } @@ -617,7 +530,7 @@ public abstract class TransportReplicationAction< @Override public void onResponse(Releasable releasable) { try { - final ReplicaResult replicaResult = shardOperationOnReplica(request, replica); + final ReplicaResult replicaResult = shardOperationOnReplica(replicaRequest.getRequest(), replica); releasable.close(); // release shard operation lock before responding to caller final TransportReplicationAction.ReplicaResponse response = new ReplicaResponse(replica.getLocalCheckpoint(), replica.getGlobalCheckpoint()); @@ -635,22 +548,17 @@ public abstract class TransportReplicationAction< () -> new ParameterizedMessage( "Retrying operation on replica, action [{}], request [{}]", transportReplicaAction, - request), + replicaRequest.getRequest()), e); - request.onRetry(); + replicaRequest.getRequest().onRetry(); observer.waitForNextChange(new ClusterStateObserver.Listener() { @Override public void onNewClusterState(ClusterState state) { // Forking a thread on local node via transport service so that custom transport service have an // opportunity to execute custom logic before the replica operation begins - String extraMessage = "action [" + transportReplicaAction + "], request[" + request + "]"; - TransportChannelResponseHandler handler = - new TransportChannelResponseHandler<>(logger, channel, extraMessage, - (in) -> TransportResponse.Empty.INSTANCE); transportService.sendRequest(clusterService.localNode(), transportReplicaAction, - new ConcreteReplicaRequest<>(request, targetAllocationID, primaryTerm, - globalCheckpoint, maxSeqNoOfUpdatesOrDeletes), - handler); + replicaRequest, + new ActionListenerResponseHandler<>(onCompletionListener, in -> new ReplicaResponse())); } @Override @@ -669,25 +577,20 @@ public abstract class TransportReplicationAction< } protected void responseWithFailure(Exception e) { - try { - setPhase(task, "finished"); - channel.sendResponse(e); - } catch (IOException responseException) { - responseException.addSuppressed(e); - logger.warn(() -> new ParameterizedMessage( - "failed to send error message back to client for action [{}]", transportReplicaAction), responseException); - } + setPhase(task, "finished"); + onCompletionListener.onFailure(e); } @Override protected void doRun() throws Exception { setPhase(task, "replica"); final String actualAllocationId = this.replica.routingEntry().allocationId().getId(); - if (actualAllocationId.equals(targetAllocationID) == false) { - throw new ShardNotFoundException(this.replica.shardId(), "expected allocation id [{}] but found [{}]", targetAllocationID, - actualAllocationId); + if (actualAllocationId.equals(replicaRequest.getTargetAllocationID()) == false) { + throw new ShardNotFoundException(this.replica.shardId(), "expected allocation id [{}] but found [{}]", + replicaRequest.getTargetAllocationID(), actualAllocationId); } - acquireReplicaOperationPermit(replica, request, this, primaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes); + acquireReplicaOperationPermit(replica, replicaRequest.getRequest(), this, replicaRequest.getPrimaryTerm(), + replicaRequest.getGlobalCheckpoint(), replicaRequest.getMaxSeqNoOfUpdatesOrDeletes()); } /** @@ -703,15 +606,12 @@ public abstract class TransportReplicationAction< @Override public void onResponse(Empty response) { if (logger.isTraceEnabled()) { - logger.trace("action [{}] completed on shard [{}] for request [{}]", transportReplicaAction, request.shardId(), - request); + logger.trace("action [{}] completed on shard [{}] for request [{}]", transportReplicaAction, + replicaRequest.getRequest().shardId(), + replicaRequest.getRequest()); } setPhase(task, "finished"); - try { - channel.sendResponse(replicaResponse); - } catch (Exception e) { - onFailure(e); - } + onCompletionListener.onResponse(replicaResponse); } @Override @@ -983,12 +883,13 @@ public abstract class TransportReplicationAction< replica.acquireReplicaOperationPermit(primaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, onAcquired, executor, request); } - class ShardReference implements Releasable { + class PrimaryShardReference implements Releasable, + ReplicationOperation.Primary> { protected final IndexShard indexShard; private final Releasable operationLock; - ShardReference(IndexShard indexShard, Releasable operationLock) { + PrimaryShardReference(IndexShard indexShard, Releasable operationLock) { this.indexShard = indexShard; this.operationLock = operationLock; } @@ -1006,15 +907,6 @@ public abstract class TransportReplicationAction< return indexShard.routingEntry(); } - } - - class PrimaryShardReference extends ShardReference - implements ReplicationOperation.Primary> { - - PrimaryShardReference(IndexShard indexShard, Releasable operationLock) { - super(indexShard, operationLock); - } - public boolean isRelocated() { return indexShard.isRelocatedPrimary(); } @@ -1029,8 +921,8 @@ public abstract class TransportReplicationAction< } @Override - public PrimaryResult perform(Request request) throws Exception { - PrimaryResult result = shardOperationOnPrimary(request, indexShard); + public PrimaryResult perform(Request request) throws Exception { + PrimaryResult result = shardOperationOnPrimary(request, indexShard); assert result.replicaRequest() == null || result.finalFailure == null : "a replica request [" + result.replicaRequest() + "] with a primary failure [" + result.finalFailure + "]"; return result; diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index ffc9c2bf70a..9164d9e4184 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -331,8 +331,10 @@ public class TransportReplicationActionTests extends ESTestCase { final ReplicationTask task = maybeTask(); final PlainActionFuture listener = new PlainActionFuture<>(); + final TransportReplicationAction.ConcreteShardRequest primaryRequest + = new TransportReplicationAction.ConcreteShardRequest<>(request, targetAllocationID, primaryTerm); final TransportReplicationAction.AsyncPrimaryAction asyncPrimaryActionWithBlocks = - actionWithBlocks.new AsyncPrimaryAction(request, targetAllocationID, primaryTerm, createTransportChannel(listener), task); + actionWithBlocks.new AsyncPrimaryAction(primaryRequest, listener, task); asyncPrimaryActionWithBlocks.run(); final ExecutionException exception = expectThrows(ExecutionException.class, listener::get); @@ -589,7 +591,9 @@ public class TransportReplicationActionTests extends ESTestCase { isRelocated.set(true); executeOnPrimary = false; } - action.new AsyncPrimaryAction(request, primaryShard.allocationId().getId(), primaryTerm, createTransportChannel(listener), task) { + final TransportReplicationAction.ConcreteShardRequest primaryRequest + = new TransportReplicationAction.ConcreteShardRequest<>(request, primaryShard.allocationId().getId(), primaryTerm); + action.new AsyncPrimaryAction(primaryRequest, listener, task) { @Override protected ReplicationOperation> createReplicatedOperation( @@ -645,8 +649,9 @@ public class TransportReplicationActionTests extends ESTestCase { PlainActionFuture listener = new PlainActionFuture<>(); ReplicationTask task = maybeTask(); AtomicBoolean executed = new AtomicBoolean(); - action.new AsyncPrimaryAction(request, primaryShard.allocationId().getRelocationId(), primaryTerm, - createTransportChannel(listener), task) { + final TransportReplicationAction.ConcreteShardRequest primaryRequest + = new TransportReplicationAction.ConcreteShardRequest<>(request, primaryShard.allocationId().getRelocationId(), primaryTerm); + action.new AsyncPrimaryAction(primaryRequest, listener, task) { @Override protected ReplicationOperation> createReplicatedOperation( @@ -792,9 +797,7 @@ public class TransportReplicationActionTests extends ESTestCase { } }; - TransportReplicationAction.PrimaryOperationTransportHandler primaryPhase = - action.new PrimaryOperationTransportHandler(); - primaryPhase.messageReceived(concreteShardRequest, createTransportChannel(listener), null); + action.handlePrimaryRequest(concreteShardRequest, createTransportChannel(listener), null); CapturingTransport.CapturedRequest[] requestsToReplicas = transport.capturedRequests(); assertThat(requestsToReplicas, arrayWithSize(1)); assertThat(((TransportReplicationAction.ConcreteShardRequest) requestsToReplicas[0].request).getPrimaryTerm(), @@ -817,7 +820,9 @@ public class TransportReplicationActionTests extends ESTestCase { final boolean throwExceptionOnCreation = i == 1; final boolean throwExceptionOnRun = i == 2; final boolean respondWithError = i == 3; - action.new AsyncPrimaryAction(request, primaryShard.allocationId().getId(), primaryTerm, createTransportChannel(listener), task) { + final TransportReplicationAction.ConcreteShardRequest primaryRequest + = new TransportReplicationAction.ConcreteShardRequest<>(request, primaryShard.allocationId().getId(), primaryTerm); + action.new AsyncPrimaryAction(primaryRequest, listener, task) { @Override protected ReplicationOperation> createReplicatedOperation( @@ -880,9 +885,8 @@ public class TransportReplicationActionTests extends ESTestCase { return new ReplicaResult(); } }; - final TestAction.ReplicaOperationTransportHandler replicaOperationTransportHandler = action.new ReplicaOperationTransportHandler(); try { - replicaOperationTransportHandler.messageReceived( + action.handleReplicaRequest( new TransportReplicationAction.ConcreteReplicaRequest<>( new Request().setShardId(shardId), replicaRouting.allocationId().getId(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()), @@ -938,7 +942,7 @@ public class TransportReplicationActionTests extends ESTestCase { final boolean wrongAllocationId = randomBoolean(); final long requestTerm = wrongAllocationId && randomBoolean() ? primaryTerm : primaryTerm + randomIntBetween(1, 10); Request request = new Request(shardId).timeout("1ms"); - action.new PrimaryOperationTransportHandler().messageReceived( + action.handlePrimaryRequest( new TransportReplicationAction.ConcreteShardRequest<>(request, wrongAllocationId ? "_not_a_valid_aid_" : primary.allocationId().getId(), requestTerm), @@ -973,7 +977,7 @@ public class TransportReplicationActionTests extends ESTestCase { PlainActionFuture listener = new PlainActionFuture<>(); Request request = new Request(shardId).timeout("1ms"); - action.new ReplicaOperationTransportHandler().messageReceived( + action.handleReplicaRequest( new TransportReplicationAction.ConcreteReplicaRequest<>(request, "_not_a_valid_aid_", randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()), createTransportChannel(listener), maybeTask() @@ -1015,12 +1019,11 @@ public class TransportReplicationActionTests extends ESTestCase { return new ReplicaResult(); } }; - final TestAction.ReplicaOperationTransportHandler replicaOperationTransportHandler = action.new ReplicaOperationTransportHandler(); final PlainActionFuture listener = new PlainActionFuture<>(); final Request request = new Request().setShardId(shardId); final long checkpoint = randomNonNegativeLong(); final long maxSeqNoOfUpdatesOrDeletes = randomNonNegativeLong(); - replicaOperationTransportHandler.messageReceived( + action.handleReplicaRequest( new TransportReplicationAction.ConcreteReplicaRequest<>(request, replica.allocationId().getId(), primaryTerm, checkpoint, maxSeqNoOfUpdatesOrDeletes), createTransportChannel(listener), task); @@ -1084,12 +1087,11 @@ public class TransportReplicationActionTests extends ESTestCase { return new ReplicaResult(); } }; - final TestAction.ReplicaOperationTransportHandler replicaOperationTransportHandler = action.new ReplicaOperationTransportHandler(); final PlainActionFuture listener = new PlainActionFuture<>(); final Request request = new Request().setShardId(shardId); final long checkpoint = randomNonNegativeLong(); final long maxSeqNoOfUpdates = randomNonNegativeLong(); - replicaOperationTransportHandler.messageReceived( + action.handleReplicaRequest( new TransportReplicationAction.ConcreteReplicaRequest<>(request, replica.allocationId().getId(), primaryTerm, checkpoint, maxSeqNoOfUpdates), createTransportChannel(listener), task); @@ -1221,10 +1223,10 @@ public class TransportReplicationActionTests extends ESTestCase { } @Override - protected PrimaryResult shardOperationOnPrimary(Request shardRequest, IndexShard primary) throws Exception { + protected PrimaryResult shardOperationOnPrimary(Request shardRequest, IndexShard primary) { boolean executedBefore = shardRequest.processedOnPrimary.getAndSet(true); assert executedBefore == false : "request has already been executed on the primary"; - return new PrimaryResult(shardRequest, new TestResponse()); + return new PrimaryResult<>(shardRequest, new TestResponse()); } @Override diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java index 1cb1bfde34e..8fe204cee2c 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java @@ -203,8 +203,10 @@ public class TransportReplicationAllPermitsAcquisitionTests extends IndexShardTe actions[threadId] = singlePermitAction; Thread thread = new Thread(() -> { + final TransportReplicationAction.ConcreteShardRequest primaryRequest + = new TransportReplicationAction.ConcreteShardRequest<>(request(), allocationId(), primaryTerm()); TransportReplicationAction.AsyncPrimaryAction asyncPrimaryAction = - singlePermitAction.new AsyncPrimaryAction(request(), allocationId(), primaryTerm(), transportChannel(listener), null) { + singlePermitAction.new AsyncPrimaryAction(primaryRequest, listener, null) { @Override protected void doRun() throws Exception { if (delayed) { @@ -254,8 +256,10 @@ public class TransportReplicationAllPermitsAcquisitionTests extends IndexShardTe final PlainActionFuture allPermitFuture = new PlainActionFuture<>(); Thread thread = new Thread(() -> { + final TransportReplicationAction.ConcreteShardRequest primaryRequest + = new TransportReplicationAction.ConcreteShardRequest<>(request(), allocationId(), primaryTerm()); TransportReplicationAction.AsyncPrimaryAction asyncPrimaryAction = - allPermitsAction.new AsyncPrimaryAction(request(), allocationId(), primaryTerm(), transportChannel(allPermitFuture), null) { + allPermitsAction.new AsyncPrimaryAction(primaryRequest, allPermitFuture, null) { @Override void runWithPrimaryShardReference(final TransportReplicationAction.PrimaryShardReference reference) { assertEquals("All permits must be acquired", 0, reference.indexShard.getActiveOperationsCount()); @@ -407,9 +411,8 @@ public class TransportReplicationAllPermitsAcquisitionTests extends IndexShardTe final DiscoveryNode node, final ActionListener listener) { assertEquals("Replica is always assigned to node 2 in this test", clusterService.state().nodes().get("_node2"), node); - ReplicaOperationTransportHandler replicaOperationTransportHandler = new ReplicaOperationTransportHandler(); try { - replicaOperationTransportHandler.messageReceived(replicaRequest, new TransportChannel() { + handleReplicaRequest(replicaRequest, new TransportChannel() { @Override public String getProfileName() { return null; @@ -530,32 +533,4 @@ public class TransportReplicationAllPermitsAcquisitionTests extends IndexShardTe static class Response extends ReplicationResponse { } - - /** - * Transport channel that is needed for replica operation testing. - */ - public TransportChannel transportChannel(final PlainActionFuture listener) { - return new TransportChannel() { - - @Override - public String getProfileName() { - return ""; - } - - @Override - public void sendResponse(TransportResponse response) throws IOException { - listener.onResponse(((Response) response)); - } - - @Override - public void sendResponse(Exception exception) throws IOException { - listener.onFailure(exception); - } - - @Override - public String getChannelType() { - return "replica_test"; - } - }; - } }