Move primary term from replicas proxy to repl op (#41119)

A small refactoring that removes the primaryTerm field from ReplicasProxy and
instead passes it directly in to the methods that need it. Relates #40706.
This commit is contained in:
David Turner 2019-04-11 20:26:18 +01:00
parent e120deb08f
commit b522de975d
10 changed files with 130 additions and 128 deletions

View File

@ -115,8 +115,8 @@ public class TransportVerifyShardBeforeCloseAction extends TransportReplicationA
}
@Override
protected ReplicationOperation.Replicas<ShardRequest> newReplicasProxy(final long primaryTerm) {
return new VerifyShardBeforeCloseActionReplicasProxy(primaryTerm);
protected ReplicationOperation.Replicas<ShardRequest> newReplicasProxy() {
return new VerifyShardBeforeCloseActionReplicasProxy();
}
/**
@ -125,13 +125,9 @@ public class TransportVerifyShardBeforeCloseAction extends TransportReplicationA
* or reopened in an unverified state with potential non flushed translog operations.
*/
class VerifyShardBeforeCloseActionReplicasProxy extends ReplicasProxy {
VerifyShardBeforeCloseActionReplicasProxy(final long primaryTerm) {
super(primaryTerm);
}
@Override
public void markShardCopyAsStaleIfNeeded(final ShardId shardId, final String allocationId, final ActionListener<Void> listener) {
public void markShardCopyAsStaleIfNeeded(final ShardId shardId, final String allocationId, final long primaryTerm,
final ActionListener<Void> listener) {
shardStateAction.remoteShardFailed(shardId, allocationId, primaryTerm, true, "mark copy as stale", null, listener);
}
}

View File

@ -68,8 +68,8 @@ public class TransportResyncReplicationAction extends TransportWriteAction<Resyn
}
@Override
protected ReplicationOperation.Replicas newReplicasProxy(long primaryTerm) {
return new ResyncActionReplicasProxy(primaryTerm);
protected ReplicationOperation.Replicas newReplicasProxy() {
return new ResyncActionReplicasProxy();
}
@Override
@ -96,9 +96,10 @@ public class TransportResyncReplicationAction extends TransportWriteAction<Resyn
}
@Override
protected WriteReplicaResult shardOperationOnReplica(ResyncReplicationRequest request, IndexShard replica) throws Exception {
protected WriteReplicaResult<ResyncReplicationRequest> shardOperationOnReplica(ResyncReplicationRequest request,
IndexShard replica) throws Exception {
Translog.Location location = performOnReplica(request, replica);
return new WriteReplicaResult(request, location, null, replica, logger);
return new WriteReplicaResult<>(request, location, null, replica, logger);
}
public static Translog.Location performOnReplica(ResyncReplicationRequest request, IndexShard replica) throws Exception {
@ -174,12 +175,9 @@ public class TransportResyncReplicationAction extends TransportWriteAction<Resyn
*/
class ResyncActionReplicasProxy extends ReplicasProxy {
ResyncActionReplicasProxy(long primaryTerm) {
super(primaryTerm);
}
@Override
public void failShardIfNeeded(ShardRouting replica, String message, Exception exception, ActionListener<Void> listener) {
public void failShardIfNeeded(ShardRouting replica, long primaryTerm, String message, Exception exception,
ActionListener<Void> listener) {
shardStateAction.remoteShardFailed(
replica.shardId(), replica.allocationId().getId(), primaryTerm, false, message, exception, listener);
}

View File

@ -71,7 +71,10 @@ public class ReplicationOperation<
private final Primary<Request, ReplicaRequest, PrimaryResultT> primary;
private final Replicas<ReplicaRequest> replicasProxy;
private final AtomicBoolean finished = new AtomicBoolean();
protected final ActionListener<PrimaryResultT> resultListener;
private final long primaryTerm;
// exposed for tests
final ActionListener<PrimaryResultT> resultListener;
private volatile PrimaryResultT primaryResult = null;
@ -80,13 +83,14 @@ public class ReplicationOperation<
public ReplicationOperation(Request request, Primary<Request, ReplicaRequest, PrimaryResultT> primary,
ActionListener<PrimaryResultT> listener,
Replicas<ReplicaRequest> replicas,
Logger logger, String opType) {
Logger logger, String opType, long primaryTerm) {
this.replicasProxy = replicas;
this.primary = primary;
this.resultListener = listener;
this.logger = logger;
this.request = request;
this.opType = opType;
this.primaryTerm = primaryTerm;
}
public void execute() throws Exception {
@ -137,7 +141,7 @@ public class ReplicationOperation<
// if inSyncAllocationIds contains allocation ids of shards that don't exist in RoutingTable, mark copies as stale
for (String allocationId : replicationGroup.getUnavailableInSyncShards()) {
pendingActions.incrementAndGet();
replicasProxy.markShardCopyAsStaleIfNeeded(replicaRequest.shardId(), allocationId,
replicasProxy.markShardCopyAsStaleIfNeeded(replicaRequest.shardId(), allocationId, primaryTerm,
ActionListener.wrap(r -> decPendingAndFinishIfNeeded(), ReplicationOperation.this::onNoLongerPrimary));
}
}
@ -165,44 +169,45 @@ public class ReplicationOperation<
totalShards.incrementAndGet();
pendingActions.incrementAndGet();
replicasProxy.performOn(shard, replicaRequest, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, new ActionListener<ReplicaResponse>() {
@Override
public void onResponse(ReplicaResponse response) {
successfulShards.incrementAndGet();
try {
primary.updateLocalCheckpointForShard(shard.allocationId().getId(), response.localCheckpoint());
primary.updateGlobalCheckpointForShard(shard.allocationId().getId(), response.globalCheckpoint());
} catch (final AlreadyClosedException e) {
// okay, the index was deleted or this shard was never activated after a relocation; fall through and finish normally
} catch (final Exception e) {
// fail the primary but fall through and let the rest of operation processing complete
final String message = String.format(Locale.ROOT, "primary failed updating local checkpoint for replica %s", shard);
primary.failShard(message, e);
replicasProxy.performOn(shard, replicaRequest, primaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes,
new ActionListener<ReplicaResponse>() {
@Override
public void onResponse(ReplicaResponse response) {
successfulShards.incrementAndGet();
try {
primary.updateLocalCheckpointForShard(shard.allocationId().getId(), response.localCheckpoint());
primary.updateGlobalCheckpointForShard(shard.allocationId().getId(), response.globalCheckpoint());
} catch (final AlreadyClosedException e) {
// the index was deleted or this shard was never activated after a relocation; fall through and finish normally
} catch (final Exception e) {
// fail the primary but fall through and let the rest of operation processing complete
final String message = String.format(Locale.ROOT, "primary failed updating local checkpoint for replica %s", shard);
primary.failShard(message, e);
}
decPendingAndFinishIfNeeded();
}
decPendingAndFinishIfNeeded();
}
@Override
public void onFailure(Exception replicaException) {
logger.trace(() -> new ParameterizedMessage(
"[{}] failure while performing [{}] on replica {}, request [{}]",
shard.shardId(), opType, shard, replicaRequest), replicaException);
// Only report "critical" exceptions - TODO: Reach out to the master node to get the latest shard state then report.
if (TransportActions.isShardNotAvailableException(replicaException) == false) {
RestStatus restStatus = ExceptionsHelper.status(replicaException);
shardReplicaFailures.add(new ReplicationResponse.ShardInfo.Failure(
shard.shardId(), shard.currentNodeId(), replicaException, restStatus, false));
@Override
public void onFailure(Exception replicaException) {
logger.trace(() -> new ParameterizedMessage(
"[{}] failure while performing [{}] on replica {}, request [{}]",
shard.shardId(), opType, shard, replicaRequest), replicaException);
// Only report "critical" exceptions - TODO: Reach out to the master node to get the latest shard state then report.
if (TransportActions.isShardNotAvailableException(replicaException) == false) {
RestStatus restStatus = ExceptionsHelper.status(replicaException);
shardReplicaFailures.add(new ReplicationResponse.ShardInfo.Failure(
shard.shardId(), shard.currentNodeId(), replicaException, restStatus, false));
}
String message = String.format(Locale.ROOT, "failed to perform %s on replica %s", opType, shard);
replicasProxy.failShardIfNeeded(shard, primaryTerm, message, replicaException,
ActionListener.wrap(r -> decPendingAndFinishIfNeeded(), ReplicationOperation.this::onNoLongerPrimary));
}
String message = String.format(Locale.ROOT, "failed to perform %s on replica %s", opType, shard);
replicasProxy.failShardIfNeeded(shard, message, replicaException,
ActionListener.wrap(r -> decPendingAndFinishIfNeeded(), ReplicationOperation.this::onNoLongerPrimary));
}
@Override
public String toString() {
return "[" + replicaRequest + "][" + shard + "]";
}
});
@Override
public String toString() {
return "[" + replicaRequest + "][" + shard + "]";
}
});
}
private void onNoLongerPrimary(Exception failure) {
@ -373,25 +378,27 @@ public class ReplicationOperation<
*
* @param replica the shard this request should be executed on
* @param replicaRequest the operation to perform
* @param primaryTerm the primary term
* @param globalCheckpoint the global checkpoint on the primary
* @param maxSeqNoOfUpdatesOrDeletes the max seq_no of updates (index operations overwriting Lucene) or deletes on primary
* after this replication was executed on it.
* @param listener callback for handling the response or failure
*/
void performOn(ShardRouting replica, RequestT replicaRequest, long globalCheckpoint,
long maxSeqNoOfUpdatesOrDeletes, ActionListener<ReplicaResponse> listener);
void performOn(ShardRouting replica, RequestT replicaRequest,
long primaryTerm, long globalCheckpoint, long maxSeqNoOfUpdatesOrDeletes, ActionListener<ReplicaResponse> listener);
/**
* Fail the specified shard if needed, removing it from the current set
* of active shards. Whether a failure is needed is left up to the
* implementation.
*
* @param replica shard to fail
* @param message a (short) description of the reason
* @param exception the original exception which caused the ReplicationOperation to request the shard to be failed
* @param listener a listener that will be notified when the failing shard has been removed from the in-sync set
* @param replica shard to fail
* @param primaryTerm the primary term
* @param message a (short) description of the reason
* @param exception the original exception which caused the ReplicationOperation to request the shard to be failed
* @param listener a listener that will be notified when the failing shard has been removed from the in-sync set
*/
void failShardIfNeeded(ShardRouting replica, String message, Exception exception, ActionListener<Void> listener);
void failShardIfNeeded(ShardRouting replica, long primaryTerm, String message, Exception exception, ActionListener<Void> listener);
/**
* Marks shard copy as stale if needed, removing its allocation id from
@ -400,9 +407,10 @@ public class ReplicationOperation<
*
* @param shardId shard id
* @param allocationId allocation id to remove from the set of in-sync allocation ids
* @param primaryTerm the primary term
* @param listener a listener that will be notified when the failing shard has been removed from the in-sync set
*/
void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, ActionListener<Void> listener);
void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, long primaryTerm, ActionListener<Void> listener);
}
/**
@ -427,11 +435,11 @@ public class ReplicationOperation<
}
public static class RetryOnPrimaryException extends ElasticsearchException {
public RetryOnPrimaryException(ShardId shardId, String msg) {
RetryOnPrimaryException(ShardId shardId, String msg) {
this(shardId, msg, null);
}
public RetryOnPrimaryException(ShardId shardId, String msg, Throwable cause) {
RetryOnPrimaryException(ShardId shardId, String msg, Throwable cause) {
super(msg, cause);
setShard(shardId);
}

View File

@ -164,8 +164,8 @@ public abstract class TransportReplicationAction<
new ReroutePhase((ReplicationTask) task, request, listener).run();
}
protected ReplicationOperation.Replicas<ReplicaRequest> newReplicasProxy(long primaryTerm) {
return new ReplicasProxy(primaryTerm);
protected ReplicationOperation.Replicas<ReplicaRequest> newReplicasProxy() {
return new ReplicasProxy();
}
protected abstract Response newResponseInstance();
@ -410,7 +410,7 @@ public abstract class TransportReplicationAction<
Request request, ActionListener<PrimaryResult<ReplicaRequest, Response>> listener,
PrimaryShardReference primaryShardReference) {
return new ReplicationOperation<>(request, primaryShardReference, listener,
newReplicasProxy(primaryRequest.getPrimaryTerm()), logger, actionName);
newReplicasProxy(), logger, actionName, primaryRequest.getPrimaryTerm());
}
}
@ -1035,16 +1035,11 @@ public abstract class TransportReplicationAction<
*/
protected class ReplicasProxy implements ReplicationOperation.Replicas<ReplicaRequest> {
protected final long primaryTerm;
public ReplicasProxy(long primaryTerm) {
this.primaryTerm = primaryTerm;
}
@Override
public void performOn(
final ShardRouting replica,
final ReplicaRequest request,
final long primaryTerm,
final long globalCheckpoint,
final long maxSeqNoOfUpdatesOrDeletes,
final ActionListener<ReplicationOperation.ReplicaResponse> listener) {
@ -1065,7 +1060,8 @@ public abstract class TransportReplicationAction<
}
@Override
public void failShardIfNeeded(ShardRouting replica, String message, Exception exception, ActionListener<Void> listener) {
public void failShardIfNeeded(ShardRouting replica, long primaryTerm, String message, Exception exception,
ActionListener<Void> listener) {
// This does not need to fail the shard. The idea is that this
// is a non-write operation (something like a refresh or a global
// checkpoint sync) and therefore the replica should still be
@ -1074,7 +1070,7 @@ public abstract class TransportReplicationAction<
}
@Override
public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, ActionListener<Void> listener) {
public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, long primaryTerm, ActionListener<Void> listener) {
// This does not need to make the shard stale. The idea is that this
// is a non-write operation (something like a refresh or a global
// checkpoint sync) and therefore the replica should still be

View File

@ -96,8 +96,8 @@ public abstract class TransportWriteAction<
}
@Override
protected ReplicationOperation.Replicas newReplicasProxy(long primaryTerm) {
return new WriteActionReplicasProxy(primaryTerm);
protected ReplicationOperation.Replicas<ReplicaRequest> newReplicasProxy() {
return new WriteActionReplicasProxy();
}
/**
@ -371,12 +371,9 @@ public abstract class TransportWriteAction<
*/
class WriteActionReplicasProxy extends ReplicasProxy {
WriteActionReplicasProxy(long primaryTerm) {
super(primaryTerm);
}
@Override
public void failShardIfNeeded(ShardRouting replica, String message, Exception exception, ActionListener<Void> listener) {
public void failShardIfNeeded(ShardRouting replica, long primaryTerm, String message, Exception exception,
ActionListener<Void> listener) {
if (TransportActions.isShardNotAvailableException(exception) == false) {
logger.warn(new ParameterizedMessage("[{}] {}", replica.shardId(), message), exception);
}
@ -385,7 +382,7 @@ public abstract class TransportWriteAction<
}
@Override
public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, ActionListener<Void> listener) {
public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, long primaryTerm, ActionListener<Void> listener) {
shardStateAction.remoteShardFailed(shardId, allocationId, primaryTerm, true, "mark copy as stale", null, listener);
}
}

View File

@ -139,7 +139,7 @@ public class TransportVerifyShardBeforeCloseActionTests extends ESTestCase {
final TaskId taskId = new TaskId("_node_id", randomNonNegativeLong());
final TransportVerifyShardBeforeCloseAction.ShardRequest request =
new TransportVerifyShardBeforeCloseAction.ShardRequest(indexShard.shardId(), clusterBlock, taskId);
final PlainActionFuture res = PlainActionFuture.newFuture();
final PlainActionFuture<Void> res = PlainActionFuture.newFuture();
action.shardOperationOnPrimary(request, indexShard, ActionListener.wrap(
r -> {
assertNotNull(r);
@ -228,10 +228,10 @@ public class TransportVerifyShardBeforeCloseActionTests extends ESTestCase {
TaskId taskId = new TaskId(clusterService.localNode().getId(), 0L);
TransportVerifyShardBeforeCloseAction.ShardRequest request =
new TransportVerifyShardBeforeCloseAction.ShardRequest(shardId, clusterBlock, taskId);
ReplicationOperation.Replicas<TransportVerifyShardBeforeCloseAction.ShardRequest> proxy = action.newReplicasProxy(primaryTerm);
ReplicationOperation.Replicas<TransportVerifyShardBeforeCloseAction.ShardRequest> proxy = action.newReplicasProxy();
ReplicationOperation<TransportVerifyShardBeforeCloseAction.ShardRequest,
TransportVerifyShardBeforeCloseAction.ShardRequest, PrimaryResult> operation =
new ReplicationOperation<>(request, createPrimary(primaryRouting, replicationGroup), listener, proxy, logger, "test");
TransportVerifyShardBeforeCloseAction.ShardRequest, PrimaryResult> operation = new ReplicationOperation<>(
request, createPrimary(primaryRouting, replicationGroup), listener, proxy, logger, "test", primaryTerm);
operation.execute();
final CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear();
@ -284,7 +284,7 @@ public class TransportVerifyShardBeforeCloseActionTests extends ESTestCase {
@Override
public void perform(
TransportVerifyShardBeforeCloseAction.ShardRequest request, ActionListener<PrimaryResult> listener) {
TransportVerifyShardBeforeCloseAction.ShardRequest request, ActionListener<PrimaryResult> listener) {
listener.onResponse(new PrimaryResult(request));
}

View File

@ -116,10 +116,10 @@ public class ReplicationOperationTests extends ESTestCase {
Request request = new Request(shardId);
PlainActionFuture<TestPrimary.Result> listener = new PlainActionFuture<>();
final TestReplicaProxy replicasProxy = new TestReplicaProxy(primaryTerm, simulatedFailures);
final TestReplicaProxy replicasProxy = new TestReplicaProxy(simulatedFailures);
final TestPrimary primary = new TestPrimary(primaryShard, () -> replicationGroup);
final TestReplicationOperation op = new TestReplicationOperation(request, primary, listener, replicasProxy);
final TestReplicationOperation op = new TestReplicationOperation(request, primary, listener, replicasProxy, primaryTerm);
op.execute();
assertThat("request was not processed on primary", request.processedOnPrimary.get(), equalTo(true));
assertThat(request.processedOnReplicas, equalTo(expectedReplicas));
@ -213,12 +213,12 @@ public class ReplicationOperationTests extends ESTestCase {
} else {
shardActionFailure = new ShardStateAction.NoLongerPrimaryShardException(failedReplica.shardId(), "the king is dead");
}
final TestReplicaProxy replicasProxy = new TestReplicaProxy(primaryTerm, expectedFailures) {
final TestReplicaProxy replicasProxy = new TestReplicaProxy(expectedFailures) {
@Override
public void failShardIfNeeded(ShardRouting replica, String message, Exception exception,
public void failShardIfNeeded(ShardRouting replica, long primaryTerm, String message, Exception exception,
ActionListener<Void> shardActionListener) {
if (testPrimaryDemotedOnStaleShardCopies) {
super.failShardIfNeeded(replica, message, exception, shardActionListener);
super.failShardIfNeeded(replica, primaryTerm, message, exception, shardActionListener);
} else {
assertThat(replica, equalTo(failedReplica));
shardActionListener.onFailure(shardActionFailure);
@ -226,11 +226,12 @@ public class ReplicationOperationTests extends ESTestCase {
}
@Override
public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, ActionListener<Void> shardActionListener) {
public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, long primaryTerm,
ActionListener<Void> shardActionListener) {
if (testPrimaryDemotedOnStaleShardCopies) {
shardActionListener.onFailure(shardActionFailure);
} else {
super.markShardCopyAsStaleIfNeeded(shardId, allocationId, shardActionListener);
super.markShardCopyAsStaleIfNeeded(shardId, allocationId, primaryTerm, shardActionListener);
}
}
};
@ -242,7 +243,7 @@ public class ReplicationOperationTests extends ESTestCase {
assertTrue(primaryFailed.compareAndSet(false, true));
}
};
final TestReplicationOperation op = new TestReplicationOperation(request, primary, listener, replicasProxy);
final TestReplicationOperation op = new TestReplicationOperation(request, primary, listener, replicasProxy, primaryTerm);
op.execute();
assertThat("request was not processed on primary", request.processedOnPrimary.get(), equalTo(true));
@ -299,7 +300,7 @@ public class ReplicationOperationTests extends ESTestCase {
Request request = new Request(shardId);
PlainActionFuture<TestPrimary.Result> listener = new PlainActionFuture<>();
final TestReplicationOperation op = new TestReplicationOperation(request, primary, listener,
new TestReplicaProxy(primaryTerm));
new TestReplicaProxy(), primaryTerm);
op.execute();
assertThat("request was not processed on primary", request.processedOnPrimary.get(), equalTo(true));
@ -343,7 +344,7 @@ public class ReplicationOperationTests extends ESTestCase {
final ShardRouting primaryShard = shardRoutingTable.primaryShard();
final TestReplicationOperation op = new TestReplicationOperation(request,
new TestPrimary(primaryShard, () -> initialReplicationGroup),
listener, new TestReplicaProxy(primaryTerm), logger, "test");
listener, new TestReplicaProxy(), logger, "test", primaryTerm);
if (passesActiveShardCheck) {
assertThat(op.checkActiveShardCount(), nullValue());
@ -401,8 +402,8 @@ public class ReplicationOperationTests extends ESTestCase {
};
final PlainActionFuture<TestPrimary.Result> listener = new PlainActionFuture<>();
final ReplicationOperation.Replicas<Request> replicas = new TestReplicaProxy(primaryTerm, Collections.emptyMap());
TestReplicationOperation operation = new TestReplicationOperation(request, primary, listener, replicas);
final ReplicationOperation.Replicas<Request> replicas = new TestReplicaProxy(Collections.emptyMap());
TestReplicationOperation operation = new TestReplicationOperation(request, primary, listener, replicas, primaryTerm);
operation.execute();
assertThat(primaryFailed.get(), equalTo(fatal));
@ -577,14 +578,11 @@ public class ReplicationOperationTests extends ESTestCase {
final Set<String> markedAsStaleCopies = ConcurrentCollections.newConcurrentSet();
final long primaryTerm;
TestReplicaProxy(long primaryTerm) {
this(primaryTerm, Collections.emptyMap());
TestReplicaProxy() {
this(Collections.emptyMap());
}
TestReplicaProxy(long primaryTerm, Map<ShardRouting, Exception> opFailures) {
this.primaryTerm = primaryTerm;
TestReplicaProxy(Map<ShardRouting, Exception> opFailures) {
this.opFailures = opFailures;
}
@ -592,6 +590,7 @@ public class ReplicationOperationTests extends ESTestCase {
public void performOn(
final ShardRouting replica,
final Request request,
final long primaryTerm,
final long globalCheckpoint,
final long maxSeqNoOfUpdatesOrDeletes,
final ActionListener<ReplicationOperation.ReplicaResponse> listener) {
@ -609,7 +608,8 @@ public class ReplicationOperationTests extends ESTestCase {
}
@Override
public void failShardIfNeeded(ShardRouting replica, String message, Exception exception, ActionListener<Void> listener) {
public void failShardIfNeeded(ShardRouting replica, long primaryTerm, String message, Exception exception,
ActionListener<Void> listener) {
if (failedReplicas.add(replica) == false) {
fail("replica [" + replica + "] was failed twice");
}
@ -621,7 +621,7 @@ public class ReplicationOperationTests extends ESTestCase {
}
@Override
public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, ActionListener<Void> listener) {
public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, long primaryTerm, ActionListener<Void> listener) {
if (markedAsStaleCopies.add(allocationId) == false) {
fail("replica [" + allocationId + "] was marked as stale twice");
}
@ -631,14 +631,14 @@ public class ReplicationOperationTests extends ESTestCase {
class TestReplicationOperation extends ReplicationOperation<Request, Request, TestPrimary.Result> {
TestReplicationOperation(Request request, Primary<Request, Request, TestPrimary.Result> primary,
ActionListener<TestPrimary.Result> listener, Replicas<Request> replicas) {
this(request, primary, listener, replicas, ReplicationOperationTests.this.logger, "test");
ActionListener<TestPrimary.Result> listener, Replicas<Request> replicas, long primaryTerm) {
this(request, primary, listener, replicas, ReplicationOperationTests.this.logger, "test", primaryTerm);
}
TestReplicationOperation(Request request, Primary<Request, Request, TestPrimary.Result> primary,
ActionListener<TestPrimary.Result> listener,
Replicas<Request> replicas, Logger logger, String opType) {
super(request, primary, listener, replicas, logger, opType);
Replicas<Request> replicas, Logger logger, String opType, long primaryTerm) {
super(request, primary, listener, replicas, logger, opType, primaryTerm);
}
}

View File

@ -603,7 +603,7 @@ public class TransportReplicationActionTests extends ESTestCase {
Request request,
ActionListener<TransportReplicationAction.PrimaryResult<Request, TestResponse>> actionListener,
TransportReplicationAction<Request, Request, TestResponse>.PrimaryShardReference primaryShardReference) {
return new NoopReplicationOperation(request, actionListener) {
return new NoopReplicationOperation(request, actionListener, primaryTerm) {
@Override
public void execute() throws Exception {
assertPhase(task, "primary");
@ -661,7 +661,7 @@ public class TransportReplicationActionTests extends ESTestCase {
Request request,
ActionListener<TransportReplicationAction.PrimaryResult<Request, TestResponse>> actionListener,
TransportReplicationAction<Request, Request, TestResponse>.PrimaryShardReference primaryShardReference) {
return new NoopReplicationOperation(request, actionListener) {
return new NoopReplicationOperation(request, actionListener, primaryTerm) {
@Override
public void execute() throws Exception {
assertPhase(task, "primary");
@ -710,7 +710,8 @@ public class TransportReplicationActionTests extends ESTestCase {
ClusterState state = stateWithActivePrimary(index, true, 1 + randomInt(3), randomInt(2));
logger.info("using state: {}", state);
setState(clusterService, state);
ReplicationOperation.Replicas proxy = action.newReplicasProxy(state.metaData().index(index).primaryTerm(0));
final long primaryTerm = state.metaData().index(index).primaryTerm(0);
ReplicationOperation.Replicas<Request> proxy = action.newReplicasProxy();
// check that at unknown node fails
PlainActionFuture<ReplicaResponse> listener = new PlainActionFuture<>();
@ -720,6 +721,7 @@ public class TransportReplicationActionTests extends ESTestCase {
TestShardRouting.newShardRouting(shardId, "NOT THERE",
routingState == ShardRoutingState.RELOCATING ? state.nodes().iterator().next().getId() : null, false, routingState),
new Request(NO_SHARD_ID),
primaryTerm,
randomNonNegativeLong(),
randomNonNegativeLong(),
listener);
@ -730,7 +732,7 @@ public class TransportReplicationActionTests extends ESTestCase {
final ShardRouting replica = randomFrom(shardRoutings.replicaShards().stream()
.filter(ShardRouting::assignedToNode).collect(Collectors.toList()));
listener = new PlainActionFuture<>();
proxy.performOn(replica, new Request(NO_SHARD_ID), randomNonNegativeLong(), randomNonNegativeLong(), listener);
proxy.performOn(replica, new Request(NO_SHARD_ID), primaryTerm, randomNonNegativeLong(), randomNonNegativeLong(), listener);
assertFalse(listener.isDone());
CapturingTransport.CapturedRequest[] captures = transport.getCapturedRequestsAndClear();
@ -753,7 +755,7 @@ public class TransportReplicationActionTests extends ESTestCase {
AtomicReference<Object> failure = new AtomicReference<>();
AtomicBoolean success = new AtomicBoolean();
proxy.failShardIfNeeded(replica, "test", new ElasticsearchException("simulated"),
proxy.failShardIfNeeded(replica, primaryTerm, "test", new ElasticsearchException("simulated"),
ActionListener.wrap(r -> success.set(true), failure::set));
CapturingTransport.CapturedRequest[] shardFailedRequests = transport.getCapturedRequestsAndClear();
// A replication action doesn't not fail the request
@ -836,7 +838,7 @@ public class TransportReplicationActionTests extends ESTestCase {
if (throwExceptionOnCreation) {
throw new ElasticsearchException("simulated exception, during createReplicatedOperation");
}
return new NoopReplicationOperation(request, actionListener) {
return new NoopReplicationOperation(request, actionListener, primaryTerm) {
@Override
public void execute() throws Exception {
assertIndexShardCounter(1);
@ -1322,8 +1324,9 @@ public class TransportReplicationActionTests extends ESTestCase {
class NoopReplicationOperation extends ReplicationOperation<Request, Request, TestAction.PrimaryResult<Request, TestResponse>> {
NoopReplicationOperation(Request request, ActionListener<TestAction.PrimaryResult<Request, TestResponse>> listener) {
super(request, null, listener, null, TransportReplicationActionTests.this.logger, "noop");
NoopReplicationOperation(Request request, ActionListener<TestAction.PrimaryResult<Request, TestResponse>> listener,
long primaryTerm) {
super(request, null, listener, null, TransportReplicationActionTests.this.logger, "noop", primaryTerm);
}
@Override

View File

@ -278,7 +278,8 @@ public class TransportWriteActionTests extends ESTestCase {
ClusterState state = ClusterStateCreationUtils.stateWithActivePrimary(index, true, 1 + randomInt(3), randomInt(2));
logger.info("using state: {}", state);
ClusterServiceUtils.setState(clusterService, state);
ReplicationOperation.Replicas proxy = action.newReplicasProxy(state.metaData().index(index).primaryTerm(0));
final long primaryTerm = state.metaData().index(index).primaryTerm(0);
ReplicationOperation.Replicas<TestRequest> proxy = action.newReplicasProxy();
// check that at unknown node fails
PlainActionFuture<ReplicaResponse> listener = new PlainActionFuture<>();
@ -288,7 +289,7 @@ public class TransportWriteActionTests extends ESTestCase {
TestShardRouting.newShardRouting(shardId, "NOT THERE",
routingState == ShardRoutingState.RELOCATING ? state.nodes().iterator().next().getId() : null, false, routingState),
new TestRequest(),
randomNonNegativeLong(), randomNonNegativeLong(), listener);
primaryTerm, randomNonNegativeLong(), randomNonNegativeLong(), listener);
assertTrue(listener.isDone());
assertListenerThrows("non existent node should throw a NoNodeAvailableException", listener, NoNodeAvailableException.class);
@ -296,7 +297,7 @@ public class TransportWriteActionTests extends ESTestCase {
final ShardRouting replica = randomFrom(shardRoutings.replicaShards().stream()
.filter(ShardRouting::assignedToNode).collect(Collectors.toList()));
listener = new PlainActionFuture<>();
proxy.performOn(replica, new TestRequest(), randomNonNegativeLong(), randomNonNegativeLong(), listener);
proxy.performOn(replica, new TestRequest(), primaryTerm, randomNonNegativeLong(), randomNonNegativeLong(), listener);
assertFalse(listener.isDone());
CapturingTransport.CapturedRequest[] captures = transport.getCapturedRequestsAndClear();
@ -319,7 +320,7 @@ public class TransportWriteActionTests extends ESTestCase {
AtomicReference<Object> failure = new AtomicReference<>();
AtomicBoolean success = new AtomicBoolean();
proxy.failShardIfNeeded(replica, "test", new ElasticsearchException("simulated"),
proxy.failShardIfNeeded(replica, primaryTerm, "test", new ElasticsearchException("simulated"),
ActionListener.wrap(r -> success.set(true), failure::set));
CapturingTransport.CapturedRequest[] shardFailedRequests = transport.getCapturedRequestsAndClear();
// A write replication action proxy should fail the shard

View File

@ -608,8 +608,8 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
public void execute() {
try {
new ReplicationOperation<>(request, new PrimaryRef(),
ActionListener.wrap(result -> result.respond(listener), listener::onFailure), new ReplicasRef(), logger, opType
).execute();
ActionListener.wrap(result -> result.respond(listener), listener::onFailure), new ReplicasRef(), logger, opType,
primaryTerm).execute();
} catch (Exception e) {
listener.onFailure(e);
}
@ -678,6 +678,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
public void performOn(
final ShardRouting replicaRouting,
final ReplicaRequest request,
final long primaryTerm,
final long globalCheckpoint,
final long maxSeqNoOfUpdatesOrDeletes,
final ActionListener<ReplicationOperation.ReplicaResponse> listener) {
@ -708,12 +709,14 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
}
@Override
public void failShardIfNeeded(ShardRouting replica, String message, Exception exception, ActionListener<Void> listener) {
public void failShardIfNeeded(ShardRouting replica, long primaryTerm, String message, Exception exception,
ActionListener<Void> listener) {
throw new UnsupportedOperationException("failing shard " + replica + " isn't supported. failure: " + message, exception);
}
@Override
public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, ActionListener<Void> listener) {
public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, long primaryTerm,
ActionListener<Void> listener) {
throw new UnsupportedOperationException("can't mark " + shardId + ", aid [" + allocationId + "] as stale");
}
}