Remove some abstractions from `TransportReplicationAction` (#40706)

`TransportReplicationAction` is a rather complex beast, and some of its
concrete implementations do not need all of its features. More specifically, it
(a) chases a primary around the cluster until it manages to pin it down and
then (b) executes an action on that primary and all its replicas. There are
some actions that are coordinated by the primary itself, meaning that there is
no need for the chase-the-primary phases, and in the case of peer recovery
retention leases and primary/replica resync it is important to bypass these
first phases.

This commit is a step towards separating the `TransportReplicationAction` into
these two parts. It is a mostly mechanical sequence of steps to remove some
abstractions that are no longer in use.
This commit is contained in:
David Turner 2019-04-03 08:32:57 +01:00
parent 4c8c4e5951
commit e64524c46f
6 changed files with 97 additions and 226 deletions

View File

@ -53,10 +53,11 @@ public class TransportShardRefreshAction
}
@Override
protected PrimaryResult shardOperationOnPrimary(BasicReplicationRequest shardRequest, IndexShard primary) {
protected PrimaryResult<BasicReplicationRequest, ReplicationResponse> shardOperationOnPrimary(
BasicReplicationRequest shardRequest, IndexShard primary) {
primary.refresh("api");
logger.trace("{} refresh request executed on primary", primary.shardId());
return new PrimaryResult(shardRequest, new ReplicationResponse());
return new PrimaryResult<>(shardRequest, new ReplicationResponse());
}
@Override

View File

@ -67,16 +67,16 @@ public class TransportResyncReplicationAction extends TransportWriteAction<Resyn
@Override
protected void registerRequestHandlers(String actionName, TransportService transportService, Supplier<ResyncReplicationRequest> request,
Supplier<ResyncReplicationRequest> replicaRequest, String executor) {
transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, new OperationTransportHandler());
transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, this::handleOperationRequest);
// we should never reject resync because of thread pool capacity on primary
transportService.registerRequestHandler(transportPrimaryAction,
() -> new ConcreteShardRequest<>(request),
executor, true, true,
new PrimaryOperationTransportHandler());
this::handlePrimaryRequest);
transportService.registerRequestHandler(transportReplicaAction,
() -> new ConcreteReplicaRequest<>(replicaRequest),
executor, true, true,
new ReplicaOperationTransportHandler());
this::handleReplicaRequest);
}
@Override

View File

@ -55,6 +55,7 @@ public final class ChannelActionListener<
try {
channel.sendResponse(e);
} catch (Exception e1) {
e1.addSuppressed(e);
logger.warn(() -> new ParameterizedMessage(
"Failed to send error response for action [{}] and request [{}]", actionName, request), e1);
}

View File

@ -30,6 +30,7 @@ import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.UnavailableShardsException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.ChannelActionListener;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.client.transport.NoNodeAvailableException;
@ -70,10 +71,8 @@ import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportChannelResponseHandler;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportResponse.Empty;
@ -155,14 +154,12 @@ public abstract class TransportReplicationAction<
protected void registerRequestHandlers(String actionName, TransportService transportService, Supplier<Request> request,
Supplier<ReplicaRequest> replicaRequest, String executor) {
transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, new OperationTransportHandler());
transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, this::handleOperationRequest);
transportService.registerRequestHandler(transportPrimaryAction, () -> new ConcreteShardRequest<>(request), executor,
new PrimaryOperationTransportHandler());
this::handlePrimaryRequest);
// we must never reject on because of thread pool capacity on replicas
transportService.registerRequestHandler(transportReplicaAction,
() -> new ConcreteReplicaRequest<>(replicaRequest),
executor, true, true,
new ReplicaOperationTransportHandler());
transportService.registerRequestHandler(
transportReplicaAction, () -> new ConcreteReplicaRequest<>(replicaRequest), executor, true, true, this::handleReplicaRequest);
}
@Override
@ -272,71 +269,30 @@ public abstract class TransportReplicationAction<
return false;
}
protected class OperationTransportHandler implements TransportRequestHandler<Request> {
public OperationTransportHandler() {
}
@Override
public void messageReceived(final Request request, final TransportChannel channel, Task task) throws Exception {
execute(task, request, new ActionListener<Response>() {
@Override
public void onResponse(Response result) {
try {
channel.sendResponse(result);
} catch (Exception e) {
onFailure(e);
}
}
@Override
public void onFailure(Exception e) {
try {
channel.sendResponse(e);
} catch (Exception inner) {
inner.addSuppressed(e);
logger.warn(() -> new ParameterizedMessage("Failed to send response for {}", actionName), inner);
}
}
});
}
protected void handleOperationRequest(final Request request, final TransportChannel channel, Task task) {
execute(task, request, new ChannelActionListener<>(channel, actionName, request));
}
protected class PrimaryOperationTransportHandler implements TransportRequestHandler<ConcreteShardRequest<Request>> {
public PrimaryOperationTransportHandler() {
}
@Override
public void messageReceived(ConcreteShardRequest<Request> request, TransportChannel channel, Task task) {
new AsyncPrimaryAction(request.request, request.targetAllocationID, request.primaryTerm, channel, (ReplicationTask) task).run();
}
protected void handlePrimaryRequest(final ConcreteShardRequest<Request> request, final TransportChannel channel, final Task task) {
new AsyncPrimaryAction(
request, new ChannelActionListener<>(channel, transportPrimaryAction, request), (ReplicationTask) task).run();
}
class AsyncPrimaryAction extends AbstractRunnable {
private final Request request;
// targetAllocationID of the shard this request is meant for
private final String targetAllocationID;
// primary term of the shard this request is meant for
private final long primaryTerm;
private final TransportChannel channel;
private final ActionListener<Response> onCompletionListener;
private final ReplicationTask replicationTask;
private final ConcreteShardRequest<Request> primaryRequest;
AsyncPrimaryAction(Request request, String targetAllocationID, long primaryTerm, TransportChannel channel,
AsyncPrimaryAction(ConcreteShardRequest<Request> primaryRequest, ActionListener<Response> onCompletionListener,
ReplicationTask replicationTask) {
this.request = request;
this.targetAllocationID = targetAllocationID;
this.primaryTerm = primaryTerm;
this.channel = channel;
this.primaryRequest = primaryRequest;
this.onCompletionListener = onCompletionListener;
this.replicationTask = replicationTask;
}
@Override
protected void doRun() throws Exception {
final ShardId shardId = request.shardId();
final ShardId shardId = primaryRequest.getRequest().shardId();
final IndexShard indexShard = getIndexShard(shardId);
final ShardRouting shardRouting = indexShard.routingEntry();
// we may end up here if the cluster state used to route the primary is so stale that the underlying
@ -346,17 +302,17 @@ public abstract class TransportReplicationAction<
throw new ReplicationOperation.RetryOnPrimaryException(shardId, "actual shard is not a primary " + shardRouting);
}
final String actualAllocationId = shardRouting.allocationId().getId();
if (actualAllocationId.equals(targetAllocationID) == false) {
throw new ShardNotFoundException(shardId, "expected allocation id [{}] but found [{}]", targetAllocationID,
actualAllocationId);
if (actualAllocationId.equals(primaryRequest.getTargetAllocationID()) == false) {
throw new ShardNotFoundException(shardId, "expected allocation id [{}] but found [{}]",
primaryRequest.getTargetAllocationID(), actualAllocationId);
}
final long actualTerm = indexShard.getPendingPrimaryTerm();
if (actualTerm != primaryTerm) {
throw new ShardNotFoundException(shardId, "expected allocation id [{}] with term [{}] but found [{}]", targetAllocationID,
primaryTerm, actualTerm);
if (actualTerm != primaryRequest.getPrimaryTerm()) {
throw new ShardNotFoundException(shardId, "expected allocation id [{}] with term [{}] but found [{}]",
primaryRequest.getTargetAllocationID(), primaryRequest.getPrimaryTerm(), actualTerm);
}
acquirePrimaryOperationPermit(indexShard, request, ActionListener.wrap(
acquirePrimaryOperationPermit(indexShard, primaryRequest.getRequest(), ActionListener.wrap(
releasable -> runWithPrimaryShardReference(new PrimaryShardReference(indexShard, releasable)),
this::onFailure
));
@ -388,11 +344,10 @@ public abstract class TransportReplicationAction<
};
DiscoveryNode relocatingNode = clusterState.nodes().get(primary.relocatingNodeId());
transportService.sendRequest(relocatingNode, transportPrimaryAction,
new ConcreteShardRequest<>(request, primary.allocationId().getRelocationId(), primaryTerm),
new ConcreteShardRequest<>(primaryRequest.getRequest(), primary.allocationId().getRelocationId(),
primaryRequest.getPrimaryTerm()),
transportOptions,
new TransportChannelResponseHandler<Response>(logger, channel, "rerouting indexing to target primary " + primary,
reader) {
new ActionListenerResponseHandler<Response>(onCompletionListener, reader) {
@Override
public void handleResponse(Response response) {
setPhase(replicationTask, "finished");
@ -408,7 +363,7 @@ public abstract class TransportReplicationAction<
} else {
setPhase(replicationTask, "primary");
final ActionListener<Response> listener = createResponseListener(primaryShardReference);
createReplicatedOperation(request,
createReplicatedOperation(primaryRequest.getRequest(),
ActionListener.wrap(result -> result.respond(listener), listener::onFailure),
primaryShardReference)
.execute();
@ -422,12 +377,7 @@ public abstract class TransportReplicationAction<
@Override
public void onFailure(Exception e) {
setPhase(replicationTask, "finished");
try {
channel.sendResponse(e);
} catch (IOException inner) {
inner.addSuppressed(e);
logger.warn("failed to send response", inner);
}
onCompletionListener.onFailure(e);
}
private ActionListener<Response> createResponseListener(final PrimaryShardReference primaryShardReference) {
@ -452,22 +402,14 @@ public abstract class TransportReplicationAction<
}
primaryShardReference.close(); // release shard operation lock before responding to caller
setPhase(replicationTask, "finished");
try {
channel.sendResponse(response);
} catch (IOException e) {
onFailure(e);
}
onCompletionListener.onResponse(response);
}
@Override
public void onFailure(Exception e) {
primaryShardReference.close(); // release shard operation lock before responding to caller
setPhase(replicationTask, "finished");
try {
channel.sendResponse(e);
} catch (IOException e1) {
logger.warn("failed to send response", e);
}
onCompletionListener.onFailure(e);
}
};
}
@ -476,7 +418,7 @@ public abstract class TransportReplicationAction<
Request request, ActionListener<PrimaryResult<ReplicaRequest, Response>> listener,
PrimaryShardReference primaryShardReference) {
return new ReplicationOperation<>(request, primaryShardReference, listener,
newReplicasProxy(primaryTerm), logger, actionName);
newReplicasProxy(primaryRequest.getPrimaryTerm()), logger, actionName);
}
}
@ -545,24 +487,10 @@ public abstract class TransportReplicationAction<
}
}
public class ReplicaOperationTransportHandler implements TransportRequestHandler<ConcreteReplicaRequest<ReplicaRequest>> {
@Override
public void messageReceived(
final ConcreteReplicaRequest<ReplicaRequest> replicaRequest,
final TransportChannel channel,
final Task task)
throws Exception {
new AsyncReplicaAction(
replicaRequest.getRequest(),
replicaRequest.getTargetAllocationID(),
replicaRequest.getPrimaryTerm(),
replicaRequest.getGlobalCheckpoint(),
replicaRequest.getMaxSeqNoOfUpdatesOrDeletes(),
channel,
(ReplicationTask) task).run();
}
protected void handleReplicaRequest(final ConcreteReplicaRequest<ReplicaRequest> replicaRequest,
final TransportChannel channel, final Task task) {
new AsyncReplicaAction(
replicaRequest, new ChannelActionListener<>(channel, transportReplicaAction, replicaRequest), (ReplicationTask) task).run();
}
public static class RetryOnReplicaException extends ElasticsearchException {
@ -578,13 +506,7 @@ public abstract class TransportReplicationAction<
}
private final class AsyncReplicaAction extends AbstractRunnable implements ActionListener<Releasable> {
private final ReplicaRequest request;
// allocation id of the replica this request is meant for
private final String targetAllocationID;
private final long primaryTerm;
private final long globalCheckpoint;
private final long maxSeqNoOfUpdatesOrDeletes;
private final TransportChannel channel;
private final ActionListener<ReplicaResponse> onCompletionListener;
private final IndexShard replica;
/**
* The task on the node with the replica shard.
@ -593,23 +515,14 @@ public abstract class TransportReplicationAction<
// important: we pass null as a timeout as failing a replica is
// something we want to avoid at all costs
private final ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger, threadPool.getThreadContext());
private final ConcreteReplicaRequest<ReplicaRequest> replicaRequest;
AsyncReplicaAction(
ReplicaRequest request,
String targetAllocationID,
long primaryTerm,
long globalCheckpoint,
long maxSeqNoOfUpdatesOrDeletes,
TransportChannel channel,
ReplicationTask task) {
this.request = request;
this.channel = channel;
AsyncReplicaAction(ConcreteReplicaRequest<ReplicaRequest> replicaRequest, ActionListener<ReplicaResponse> onCompletionListener,
ReplicationTask task) {
this.replicaRequest = replicaRequest;
this.onCompletionListener = onCompletionListener;
this.task = task;
this.targetAllocationID = targetAllocationID;
this.primaryTerm = primaryTerm;
this.globalCheckpoint = globalCheckpoint;
this.maxSeqNoOfUpdatesOrDeletes = maxSeqNoOfUpdatesOrDeletes;
final ShardId shardId = request.shardId();
final ShardId shardId = replicaRequest.getRequest().shardId();
assert shardId != null : "request shardId must be set";
this.replica = getIndexShard(shardId);
}
@ -617,7 +530,7 @@ public abstract class TransportReplicationAction<
@Override
public void onResponse(Releasable releasable) {
try {
final ReplicaResult replicaResult = shardOperationOnReplica(request, replica);
final ReplicaResult replicaResult = shardOperationOnReplica(replicaRequest.getRequest(), replica);
releasable.close(); // release shard operation lock before responding to caller
final TransportReplicationAction.ReplicaResponse response =
new ReplicaResponse(replica.getLocalCheckpoint(), replica.getGlobalCheckpoint());
@ -635,22 +548,17 @@ public abstract class TransportReplicationAction<
() -> new ParameterizedMessage(
"Retrying operation on replica, action [{}], request [{}]",
transportReplicaAction,
request),
replicaRequest.getRequest()),
e);
request.onRetry();
replicaRequest.getRequest().onRetry();
observer.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
// Forking a thread on local node via transport service so that custom transport service have an
// opportunity to execute custom logic before the replica operation begins
String extraMessage = "action [" + transportReplicaAction + "], request[" + request + "]";
TransportChannelResponseHandler<TransportResponse.Empty> handler =
new TransportChannelResponseHandler<>(logger, channel, extraMessage,
(in) -> TransportResponse.Empty.INSTANCE);
transportService.sendRequest(clusterService.localNode(), transportReplicaAction,
new ConcreteReplicaRequest<>(request, targetAllocationID, primaryTerm,
globalCheckpoint, maxSeqNoOfUpdatesOrDeletes),
handler);
replicaRequest,
new ActionListenerResponseHandler<>(onCompletionListener, in -> new ReplicaResponse()));
}
@Override
@ -669,25 +577,20 @@ public abstract class TransportReplicationAction<
}
protected void responseWithFailure(Exception e) {
try {
setPhase(task, "finished");
channel.sendResponse(e);
} catch (IOException responseException) {
responseException.addSuppressed(e);
logger.warn(() -> new ParameterizedMessage(
"failed to send error message back to client for action [{}]", transportReplicaAction), responseException);
}
setPhase(task, "finished");
onCompletionListener.onFailure(e);
}
@Override
protected void doRun() throws Exception {
setPhase(task, "replica");
final String actualAllocationId = this.replica.routingEntry().allocationId().getId();
if (actualAllocationId.equals(targetAllocationID) == false) {
throw new ShardNotFoundException(this.replica.shardId(), "expected allocation id [{}] but found [{}]", targetAllocationID,
actualAllocationId);
if (actualAllocationId.equals(replicaRequest.getTargetAllocationID()) == false) {
throw new ShardNotFoundException(this.replica.shardId(), "expected allocation id [{}] but found [{}]",
replicaRequest.getTargetAllocationID(), actualAllocationId);
}
acquireReplicaOperationPermit(replica, request, this, primaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes);
acquireReplicaOperationPermit(replica, replicaRequest.getRequest(), this, replicaRequest.getPrimaryTerm(),
replicaRequest.getGlobalCheckpoint(), replicaRequest.getMaxSeqNoOfUpdatesOrDeletes());
}
/**
@ -703,15 +606,12 @@ public abstract class TransportReplicationAction<
@Override
public void onResponse(Empty response) {
if (logger.isTraceEnabled()) {
logger.trace("action [{}] completed on shard [{}] for request [{}]", transportReplicaAction, request.shardId(),
request);
logger.trace("action [{}] completed on shard [{}] for request [{}]", transportReplicaAction,
replicaRequest.getRequest().shardId(),
replicaRequest.getRequest());
}
setPhase(task, "finished");
try {
channel.sendResponse(replicaResponse);
} catch (Exception e) {
onFailure(e);
}
onCompletionListener.onResponse(replicaResponse);
}
@Override
@ -983,12 +883,13 @@ public abstract class TransportReplicationAction<
replica.acquireReplicaOperationPermit(primaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, onAcquired, executor, request);
}
class ShardReference implements Releasable {
class PrimaryShardReference implements Releasable,
ReplicationOperation.Primary<Request, ReplicaRequest, PrimaryResult<ReplicaRequest, Response>> {
protected final IndexShard indexShard;
private final Releasable operationLock;
ShardReference(IndexShard indexShard, Releasable operationLock) {
PrimaryShardReference(IndexShard indexShard, Releasable operationLock) {
this.indexShard = indexShard;
this.operationLock = operationLock;
}
@ -1006,15 +907,6 @@ public abstract class TransportReplicationAction<
return indexShard.routingEntry();
}
}
class PrimaryShardReference extends ShardReference
implements ReplicationOperation.Primary<Request, ReplicaRequest, PrimaryResult<ReplicaRequest, Response>> {
PrimaryShardReference(IndexShard indexShard, Releasable operationLock) {
super(indexShard, operationLock);
}
public boolean isRelocated() {
return indexShard.isRelocatedPrimary();
}
@ -1029,8 +921,8 @@ public abstract class TransportReplicationAction<
}
@Override
public PrimaryResult perform(Request request) throws Exception {
PrimaryResult result = shardOperationOnPrimary(request, indexShard);
public PrimaryResult<ReplicaRequest, Response> perform(Request request) throws Exception {
PrimaryResult<ReplicaRequest, Response> result = shardOperationOnPrimary(request, indexShard);
assert result.replicaRequest() == null || result.finalFailure == null : "a replica request [" + result.replicaRequest()
+ "] with a primary failure [" + result.finalFailure + "]";
return result;

View File

@ -331,8 +331,10 @@ public class TransportReplicationActionTests extends ESTestCase {
final ReplicationTask task = maybeTask();
final PlainActionFuture<TestResponse> listener = new PlainActionFuture<>();
final TransportReplicationAction.ConcreteShardRequest<Request> primaryRequest
= new TransportReplicationAction.ConcreteShardRequest<>(request, targetAllocationID, primaryTerm);
final TransportReplicationAction.AsyncPrimaryAction asyncPrimaryActionWithBlocks =
actionWithBlocks.new AsyncPrimaryAction(request, targetAllocationID, primaryTerm, createTransportChannel(listener), task);
actionWithBlocks.new AsyncPrimaryAction(primaryRequest, listener, task);
asyncPrimaryActionWithBlocks.run();
final ExecutionException exception = expectThrows(ExecutionException.class, listener::get);
@ -589,7 +591,9 @@ public class TransportReplicationActionTests extends ESTestCase {
isRelocated.set(true);
executeOnPrimary = false;
}
action.new AsyncPrimaryAction(request, primaryShard.allocationId().getId(), primaryTerm, createTransportChannel(listener), task) {
final TransportReplicationAction.ConcreteShardRequest<Request> primaryRequest
= new TransportReplicationAction.ConcreteShardRequest<>(request, primaryShard.allocationId().getId(), primaryTerm);
action.new AsyncPrimaryAction(primaryRequest, listener, task) {
@Override
protected ReplicationOperation<Request, Request, TransportReplicationAction.PrimaryResult<Request, TestResponse>>
createReplicatedOperation(
@ -645,8 +649,9 @@ public class TransportReplicationActionTests extends ESTestCase {
PlainActionFuture<TestResponse> listener = new PlainActionFuture<>();
ReplicationTask task = maybeTask();
AtomicBoolean executed = new AtomicBoolean();
action.new AsyncPrimaryAction(request, primaryShard.allocationId().getRelocationId(), primaryTerm,
createTransportChannel(listener), task) {
final TransportReplicationAction.ConcreteShardRequest<Request> primaryRequest
= new TransportReplicationAction.ConcreteShardRequest<>(request, primaryShard.allocationId().getRelocationId(), primaryTerm);
action.new AsyncPrimaryAction(primaryRequest, listener, task) {
@Override
protected ReplicationOperation<Request, Request, TransportReplicationAction.PrimaryResult<Request, TestResponse>>
createReplicatedOperation(
@ -792,9 +797,7 @@ public class TransportReplicationActionTests extends ESTestCase {
}
};
TransportReplicationAction<Request, Request, TestResponse>.PrimaryOperationTransportHandler primaryPhase =
action.new PrimaryOperationTransportHandler();
primaryPhase.messageReceived(concreteShardRequest, createTransportChannel(listener), null);
action.handlePrimaryRequest(concreteShardRequest, createTransportChannel(listener), null);
CapturingTransport.CapturedRequest[] requestsToReplicas = transport.capturedRequests();
assertThat(requestsToReplicas, arrayWithSize(1));
assertThat(((TransportReplicationAction.ConcreteShardRequest<Request>) requestsToReplicas[0].request).getPrimaryTerm(),
@ -817,7 +820,9 @@ public class TransportReplicationActionTests extends ESTestCase {
final boolean throwExceptionOnCreation = i == 1;
final boolean throwExceptionOnRun = i == 2;
final boolean respondWithError = i == 3;
action.new AsyncPrimaryAction(request, primaryShard.allocationId().getId(), primaryTerm, createTransportChannel(listener), task) {
final TransportReplicationAction.ConcreteShardRequest<Request> primaryRequest
= new TransportReplicationAction.ConcreteShardRequest<>(request, primaryShard.allocationId().getId(), primaryTerm);
action.new AsyncPrimaryAction(primaryRequest, listener, task) {
@Override
protected ReplicationOperation<Request, Request, TransportReplicationAction.PrimaryResult<Request, TestResponse>>
createReplicatedOperation(
@ -880,9 +885,8 @@ public class TransportReplicationActionTests extends ESTestCase {
return new ReplicaResult();
}
};
final TestAction.ReplicaOperationTransportHandler replicaOperationTransportHandler = action.new ReplicaOperationTransportHandler();
try {
replicaOperationTransportHandler.messageReceived(
action.handleReplicaRequest(
new TransportReplicationAction.ConcreteReplicaRequest<>(
new Request().setShardId(shardId), replicaRouting.allocationId().getId(), randomNonNegativeLong(),
randomNonNegativeLong(), randomNonNegativeLong()),
@ -938,7 +942,7 @@ public class TransportReplicationActionTests extends ESTestCase {
final boolean wrongAllocationId = randomBoolean();
final long requestTerm = wrongAllocationId && randomBoolean() ? primaryTerm : primaryTerm + randomIntBetween(1, 10);
Request request = new Request(shardId).timeout("1ms");
action.new PrimaryOperationTransportHandler().messageReceived(
action.handlePrimaryRequest(
new TransportReplicationAction.ConcreteShardRequest<>(request,
wrongAllocationId ? "_not_a_valid_aid_" : primary.allocationId().getId(),
requestTerm),
@ -973,7 +977,7 @@ public class TransportReplicationActionTests extends ESTestCase {
PlainActionFuture<TestResponse> listener = new PlainActionFuture<>();
Request request = new Request(shardId).timeout("1ms");
action.new ReplicaOperationTransportHandler().messageReceived(
action.handleReplicaRequest(
new TransportReplicationAction.ConcreteReplicaRequest<>(request, "_not_a_valid_aid_", randomNonNegativeLong(),
randomNonNegativeLong(), randomNonNegativeLong()),
createTransportChannel(listener), maybeTask()
@ -1015,12 +1019,11 @@ public class TransportReplicationActionTests extends ESTestCase {
return new ReplicaResult();
}
};
final TestAction.ReplicaOperationTransportHandler replicaOperationTransportHandler = action.new ReplicaOperationTransportHandler();
final PlainActionFuture<TestResponse> listener = new PlainActionFuture<>();
final Request request = new Request().setShardId(shardId);
final long checkpoint = randomNonNegativeLong();
final long maxSeqNoOfUpdatesOrDeletes = randomNonNegativeLong();
replicaOperationTransportHandler.messageReceived(
action.handleReplicaRequest(
new TransportReplicationAction.ConcreteReplicaRequest<>(request, replica.allocationId().getId(),
primaryTerm, checkpoint, maxSeqNoOfUpdatesOrDeletes),
createTransportChannel(listener), task);
@ -1084,12 +1087,11 @@ public class TransportReplicationActionTests extends ESTestCase {
return new ReplicaResult();
}
};
final TestAction.ReplicaOperationTransportHandler replicaOperationTransportHandler = action.new ReplicaOperationTransportHandler();
final PlainActionFuture<TestResponse> listener = new PlainActionFuture<>();
final Request request = new Request().setShardId(shardId);
final long checkpoint = randomNonNegativeLong();
final long maxSeqNoOfUpdates = randomNonNegativeLong();
replicaOperationTransportHandler.messageReceived(
action.handleReplicaRequest(
new TransportReplicationAction.ConcreteReplicaRequest<>(request, replica.allocationId().getId(),
primaryTerm, checkpoint, maxSeqNoOfUpdates),
createTransportChannel(listener), task);
@ -1221,10 +1223,10 @@ public class TransportReplicationActionTests extends ESTestCase {
}
@Override
protected PrimaryResult shardOperationOnPrimary(Request shardRequest, IndexShard primary) throws Exception {
protected PrimaryResult<Request, TestResponse> shardOperationOnPrimary(Request shardRequest, IndexShard primary) {
boolean executedBefore = shardRequest.processedOnPrimary.getAndSet(true);
assert executedBefore == false : "request has already been executed on the primary";
return new PrimaryResult(shardRequest, new TestResponse());
return new PrimaryResult<>(shardRequest, new TestResponse());
}
@Override

View File

@ -203,8 +203,10 @@ public class TransportReplicationAllPermitsAcquisitionTests extends IndexShardTe
actions[threadId] = singlePermitAction;
Thread thread = new Thread(() -> {
final TransportReplicationAction.ConcreteShardRequest<Request> primaryRequest
= new TransportReplicationAction.ConcreteShardRequest<>(request(), allocationId(), primaryTerm());
TransportReplicationAction.AsyncPrimaryAction asyncPrimaryAction =
singlePermitAction.new AsyncPrimaryAction(request(), allocationId(), primaryTerm(), transportChannel(listener), null) {
singlePermitAction.new AsyncPrimaryAction(primaryRequest, listener, null) {
@Override
protected void doRun() throws Exception {
if (delayed) {
@ -254,8 +256,10 @@ public class TransportReplicationAllPermitsAcquisitionTests extends IndexShardTe
final PlainActionFuture<Response> allPermitFuture = new PlainActionFuture<>();
Thread thread = new Thread(() -> {
final TransportReplicationAction.ConcreteShardRequest<Request> primaryRequest
= new TransportReplicationAction.ConcreteShardRequest<>(request(), allocationId(), primaryTerm());
TransportReplicationAction.AsyncPrimaryAction asyncPrimaryAction =
allPermitsAction.new AsyncPrimaryAction(request(), allocationId(), primaryTerm(), transportChannel(allPermitFuture), null) {
allPermitsAction.new AsyncPrimaryAction(primaryRequest, allPermitFuture, null) {
@Override
void runWithPrimaryShardReference(final TransportReplicationAction.PrimaryShardReference reference) {
assertEquals("All permits must be acquired", 0, reference.indexShard.getActiveOperationsCount());
@ -407,9 +411,8 @@ public class TransportReplicationAllPermitsAcquisitionTests extends IndexShardTe
final DiscoveryNode node,
final ActionListener<ReplicationOperation.ReplicaResponse> listener) {
assertEquals("Replica is always assigned to node 2 in this test", clusterService.state().nodes().get("_node2"), node);
ReplicaOperationTransportHandler replicaOperationTransportHandler = new ReplicaOperationTransportHandler();
try {
replicaOperationTransportHandler.messageReceived(replicaRequest, new TransportChannel() {
handleReplicaRequest(replicaRequest, new TransportChannel() {
@Override
public String getProfileName() {
return null;
@ -530,32 +533,4 @@ public class TransportReplicationAllPermitsAcquisitionTests extends IndexShardTe
static class Response extends ReplicationResponse {
}
/**
* Transport channel that is needed for replica operation testing.
*/
public TransportChannel transportChannel(final PlainActionFuture<Response> listener) {
return new TransportChannel() {
@Override
public String getProfileName() {
return "";
}
@Override
public void sendResponse(TransportResponse response) throws IOException {
listener.onResponse(((Response) response));
}
@Override
public void sendResponse(Exception exception) throws IOException {
listener.onFailure(exception);
}
@Override
public String getChannelType() {
return "replica_test";
}
};
}
}