Prepare to make send translog of recovery non-blocking (#37458)

This commit prepares the required infra to make send a translog snapshot
of the recovery source non-blocking. I'll make a follow-up to make the send
snapshot method non-blocking.

Relates #37291
This commit is contained in:
Nhat Nguyen 2019-01-15 13:17:25 -05:00 committed by GitHub
parent 02d4d8b409
commit 6647122f1c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 162 additions and 158 deletions

View File

@ -2998,7 +2998,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
* which is at least the value of the max_seq_no_of_updates marker on the primary after that operation was executed on the primary.
*
* @see #acquireReplicaOperationPermit(long, long, long, ActionListener, String, Object)
* @see org.elasticsearch.indices.recovery.RecoveryTarget#indexTranslogOperations(List, int, long, long)
* @see org.elasticsearch.indices.recovery.RecoveryTarget#indexTranslogOperations(List, int, long, long, ActionListener)
*/
public void advanceMaxSeqNoOfUpdatesOrDeletes(long seqNo) {
assert seqNo != UNASSIGNED_SEQ_NO

View File

@ -485,14 +485,12 @@ public class PeerRecoveryTargetService implements IndexEventListener {
public void messageReceived(final RecoveryTranslogOperationsRequest request, final TransportChannel channel,
Task task) throws IOException {
try (RecoveryRef recoveryRef =
onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) {
onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) {
final ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger, threadPool.getThreadContext());
final RecoveryTarget recoveryTarget = recoveryRef.target();
try {
recoveryTarget.indexTranslogOperations(request.operations(), request.totalTranslogOps(),
request.maxSeenAutoIdTimestampOnPrimary(), request.maxSeqNoOfUpdatesOrDeletesOnPrimary());
channel.sendResponse(new RecoveryTranslogOperationsResponse(recoveryTarget.indexShard().getLocalCheckpoint()));
} catch (MapperException exception) {
final ActionListener<RecoveryTranslogOperationsResponse> listener =
new HandledTransportAction.ChannelActionListener<>(channel, Actions.TRANSLOG_OPS, request);
final Consumer<Exception> retryOnMappingException = exception -> {
// in very rare cases a translog replay from primary is processed before a mapping update on this node
// which causes local mapping changes since the mapping (clusterstate) might not have arrived on this node.
logger.debug("delaying recovery due to missing mapping changes", exception);
@ -504,31 +502,36 @@ public class PeerRecoveryTargetService implements IndexEventListener {
try {
messageReceived(request, channel, task);
} catch (Exception e) {
onFailure(e);
}
}
protected void onFailure(Exception e) {
try {
channel.sendResponse(e);
} catch (IOException e1) {
logger.warn("failed to send error back to recovery source", e1);
listener.onFailure(e);
}
}
@Override
public void onClusterServiceClose() {
onFailure(new ElasticsearchException("cluster service was closed while waiting for mapping updates"));
listener.onFailure(new ElasticsearchException(
"cluster service was closed while waiting for mapping updates"));
}
@Override
public void onTimeout(TimeValue timeout) {
// note that we do not use a timeout (see comment above)
onFailure(new ElasticsearchTimeoutException("timed out waiting for mapping updates (timeout [" + timeout +
"])"));
listener.onFailure(new ElasticsearchTimeoutException("timed out waiting for mapping updates " +
"(timeout [" + timeout + "])"));
}
});
}
};
recoveryTarget.indexTranslogOperations(request.operations(), request.totalTranslogOps(),
request.maxSeenAutoIdTimestampOnPrimary(), request.maxSeqNoOfUpdatesOrDeletesOnPrimary(),
ActionListener.wrap(
checkpoint -> listener.onResponse(new RecoveryTranslogOperationsResponse(checkpoint)),
e -> {
if (e instanceof MapperException) {
retryOnMappingException.accept(e);
} else {
listener.onFailure(e);
}
})
);
}
}
}

View File

@ -33,6 +33,7 @@ import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.StepListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.StopWatch;
@ -226,25 +227,27 @@ public class RecoverySourceHandler {
logger.trace("snapshot translog for recovery; current size is [{}]",
shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo));
}
final SendSnapshotResult sendSnapshotResult;
try (Translog.Snapshot snapshot = shard.getHistoryOperations("peer-recovery", startingSeqNo)) {
// we can release the retention lock here because the snapshot itself will retain the required operations.
IOUtils.close(retentionLock, () -> resources.remove(retentionLock));
// we have to capture the max_seen_auto_id_timestamp and the max_seq_no_of_updates to make sure that these values
// are at least as high as the corresponding values on the primary when any of these operations were executed on it.
final long maxSeenAutoIdTimestamp = shard.getMaxSeenAutoIdTimestamp();
final long maxSeqNoOfUpdatesOrDeletes = shard.getMaxSeqNoOfUpdatesOrDeletes();
sendSnapshotResult = phase2(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot,
maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes);
} catch (Exception e) {
throw new RecoveryEngineException(shard.shardId(), 2, "phase2 failed", e);
}
final Translog.Snapshot phase2Snapshot = shard.getHistoryOperations("peer-recovery", startingSeqNo);
resources.add(phase2Snapshot);
// we can release the retention lock here because the snapshot itself will retain the required operations.
IOUtils.close(retentionLock);
// we have to capture the max_seen_auto_id_timestamp and the max_seq_no_of_updates to make sure that these values
// are at least as high as the corresponding values on the primary when any of these operations were executed on it.
final long maxSeenAutoIdTimestamp = shard.getMaxSeenAutoIdTimestamp();
final long maxSeqNoOfUpdatesOrDeletes = shard.getMaxSeqNoOfUpdatesOrDeletes();
final StepListener<SendSnapshotResult> sendSnapshotStep = new StepListener<>();
phase2(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, phase2Snapshot, maxSeenAutoIdTimestamp,
maxSeqNoOfUpdatesOrDeletes, sendSnapshotStep);
sendSnapshotStep.whenComplete(
r -> IOUtils.close(phase2Snapshot),
e -> onFailure.accept(new RecoveryEngineException(shard.shardId(), 2, "phase2 failed", e)));
final StepListener<Void> finalizeStep = new StepListener<>();
finalizeRecovery(sendSnapshotResult.targetLocalCheckpoint, finalizeStep);
sendSnapshotStep.whenComplete(r -> finalizeRecovery(r.targetLocalCheckpoint, finalizeStep), onFailure);
finalizeStep.whenComplete(r -> {
assert resources.isEmpty() : "not every resource is released [" + resources + "]";
final long phase1ThrottlingWaitTime = 0L; // TODO: return the actual throttle time
final SendSnapshotResult sendSnapshotResult = sendSnapshotStep.result();
final RecoveryResponse response = new RecoveryResponse(sendFileResult.phase1FileNames, sendFileResult.phase1FileSizes,
sendFileResult.phase1ExistingFileNames, sendFileResult.phase1ExistingFileSizes, sendFileResult.totalSize,
sendFileResult.existingTotalSize, sendFileResult.took.millis(), phase1ThrottlingWaitTime,
@ -507,10 +510,17 @@ public class RecoverySourceHandler {
* @param snapshot a snapshot of the translog
* @param maxSeenAutoIdTimestamp the max auto_id_timestamp of append-only requests on the primary
* @param maxSeqNoOfUpdatesOrDeletes the max seq_no of updates or deletes on the primary after these operations were executed on it.
* @return the send snapshot result
* @param listener a listener which will be notified with the local checkpoint on the target.
*/
SendSnapshotResult phase2(long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, Translog.Snapshot snapshot,
long maxSeenAutoIdTimestamp, long maxSeqNoOfUpdatesOrDeletes) throws IOException {
void phase2(long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, Translog.Snapshot snapshot, long maxSeenAutoIdTimestamp,
long maxSeqNoOfUpdatesOrDeletes, ActionListener<SendSnapshotResult> listener) throws IOException {
ActionListener.completeWith(listener, () -> sendSnapshotBlockingly(
startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes));
}
private SendSnapshotResult sendSnapshotBlockingly(long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo,
Translog.Snapshot snapshot, long maxSeenAutoIdTimestamp,
long maxSeqNoOfUpdatesOrDeletes) throws IOException {
assert requiredSeqNoRangeStart <= endingSeqNo + 1:
"requiredSeqNoRangeStart " + requiredSeqNoRangeStart + " is larger than endingSeqNo " + endingSeqNo;
assert startingSeqNo <= requiredSeqNoRangeStart :
@ -538,9 +548,11 @@ public class RecoverySourceHandler {
}
final CancellableThreads.IOInterruptible sendBatch = () -> {
final long targetCheckpoint = recoveryTarget.indexTranslogOperations(
operations, expectedTotalOps, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes);
targetLocalCheckpoint.set(targetCheckpoint);
// TODO: Make this non-blocking
final PlainActionFuture<Long> future = new PlainActionFuture<>();
recoveryTarget.indexTranslogOperations(
operations, expectedTotalOps, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes, future);
targetLocalCheckpoint.set(future.actionGet());
};
// send operations in batches

View File

@ -394,40 +394,42 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
}
@Override
public long indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps, long maxSeenAutoIdTimestampOnPrimary,
long maxSeqNoOfDeletesOrUpdatesOnPrimary) throws IOException {
final RecoveryState.Translog translog = state().getTranslog();
translog.totalOperations(totalTranslogOps);
assert indexShard().recoveryState() == state();
if (indexShard().state() != IndexShardState.RECOVERING) {
throw new IndexShardNotRecoveringException(shardId, indexShard().state());
}
/*
* The maxSeenAutoIdTimestampOnPrimary received from the primary is at least the highest auto_id_timestamp from any operation
* will be replayed. Bootstrapping this timestamp here will disable the optimization for original append-only requests
* (source of these operations) replicated via replication. Without this step, we may have duplicate documents if we
* replay these operations first (without timestamp), then optimize append-only requests (with timestamp).
*/
indexShard().updateMaxUnsafeAutoIdTimestamp(maxSeenAutoIdTimestampOnPrimary);
/*
* Bootstrap the max_seq_no_of_updates from the primary to make sure that the max_seq_no_of_updates on this replica when
* replaying any of these operations will be at least the max_seq_no_of_updates on the primary when that operation was executed on.
*/
indexShard().advanceMaxSeqNoOfUpdatesOrDeletes(maxSeqNoOfDeletesOrUpdatesOnPrimary);
for (Translog.Operation operation : operations) {
Engine.Result result = indexShard().applyTranslogOperation(operation, Engine.Operation.Origin.PEER_RECOVERY);
if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) {
throw new MapperException("mapping updates are not allowed [" + operation + "]");
public void indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps, long maxSeenAutoIdTimestampOnPrimary,
long maxSeqNoOfDeletesOrUpdatesOnPrimary, ActionListener<Long> listener) {
ActionListener.completeWith(listener, () -> {
final RecoveryState.Translog translog = state().getTranslog();
translog.totalOperations(totalTranslogOps);
assert indexShard().recoveryState() == state();
if (indexShard().state() != IndexShardState.RECOVERING) {
throw new IndexShardNotRecoveringException(shardId, indexShard().state());
}
assert result.getFailure() == null: "unexpected failure while replicating translog entry: " + result.getFailure();
ExceptionsHelper.reThrowIfNotNull(result.getFailure());
}
// update stats only after all operations completed (to ensure that mapping updates don't mess with stats)
translog.incrementRecoveredOperations(operations.size());
indexShard().sync();
// roll over / flush / trim if needed
indexShard().afterWriteOperation();
return indexShard().getLocalCheckpoint();
/*
* The maxSeenAutoIdTimestampOnPrimary received from the primary is at least the highest auto_id_timestamp from any operation
* will be replayed. Bootstrapping this timestamp here will disable the optimization for original append-only requests
* (source of these operations) replicated via replication. Without this step, we may have duplicate documents if we
* replay these operations first (without timestamp), then optimize append-only requests (with timestamp).
*/
indexShard().updateMaxUnsafeAutoIdTimestamp(maxSeenAutoIdTimestampOnPrimary);
/*
* Bootstrap the max_seq_no_of_updates from the primary to make sure that the max_seq_no_of_updates on this replica when
* replaying any of these operations will be at least the max_seq_no_of_updates on the primary when that op was executed on.
*/
indexShard().advanceMaxSeqNoOfUpdatesOrDeletes(maxSeqNoOfDeletesOrUpdatesOnPrimary);
for (Translog.Operation operation : operations) {
Engine.Result result = indexShard().applyTranslogOperation(operation, Engine.Operation.Origin.PEER_RECOVERY);
if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) {
throw new MapperException("mapping updates are not allowed [" + operation + "]");
}
assert result.getFailure() == null : "unexpected failure while replicating translog entry: " + result.getFailure();
ExceptionsHelper.reThrowIfNotNull(result.getFailure());
}
// update stats only after all operations completed (to ensure that mapping updates don't mess with stats)
translog.incrementRecoveredOperations(operations.size());
indexShard().sync();
// roll over / flush / trim if needed
indexShard().afterWriteOperation();
return indexShard().getLocalCheckpoint();
});
}
@Override

View File

@ -67,10 +67,11 @@ public interface RecoveryTargetHandler {
* @param maxSeqNoOfUpdatesOrDeletesOnPrimary the max seq_no of update operations (index operations overwrite Lucene) or delete ops on
* the primary shard when capturing these operations. This value is at least as high as the
* max_seq_no_of_updates on the primary was when any of these ops were processed on it.
* @return the local checkpoint on the target shard
* @param listener a listener which will be notified with the local checkpoint on the target
* after these operations are successfully indexed on the target.
*/
long indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps,
long maxSeenAutoIdTimestampOnPrimary, long maxSeqNoOfUpdatesOrDeletesOnPrimary) throws IOException;
void indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps, long maxSeenAutoIdTimestampOnPrimary,
long maxSeqNoOfUpdatesOrDeletesOnPrimary, ActionListener<Long> listener);
/**
* Notifies the target of the files it is going to receive

View File

@ -23,34 +23,19 @@ import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.transport.FutureTransportResponseHandler;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportResponseHandler;
import java.io.IOException;
public class RecoveryTranslogOperationsResponse extends TransportResponse {
long localCheckpoint;
RecoveryTranslogOperationsResponse() {
}
final class RecoveryTranslogOperationsResponse extends TransportResponse {
final long localCheckpoint;
RecoveryTranslogOperationsResponse(final long localCheckpoint) {
this.localCheckpoint = localCheckpoint;
}
@Override
public void writeTo(final StreamOutput out) throws IOException {
// before 6.0.0 we responded with an empty response so we have to maintain that
if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
out.writeZLong(localCheckpoint);
}
}
@Override
public void readFrom(final StreamInput in) throws IOException {
RecoveryTranslogOperationsResponse(final StreamInput in) throws IOException {
super(in);
// before 6.0.0 we received an empty response so we have to maintain that
if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
localCheckpoint = in.readZLong();
@ -60,14 +45,12 @@ public class RecoveryTranslogOperationsResponse extends TransportResponse {
}
}
static TransportResponseHandler<RecoveryTranslogOperationsResponse> HANDLER =
new FutureTransportResponseHandler<RecoveryTranslogOperationsResponse>() {
@Override
public RecoveryTranslogOperationsResponse read(StreamInput in) throws IOException {
RecoveryTranslogOperationsResponse response = new RecoveryTranslogOperationsResponse();
response.readFrom(in);
return response;
}
};
@Override
public void writeTo(final StreamOutput out) throws IOException {
super.writeTo(out);
// before 6.0.0 we responded with an empty response so we have to maintain that
if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
out.writeZLong(localCheckpoint);
}
}
}

View File

@ -32,7 +32,6 @@ import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.EmptyTransportResponseHandler;
import org.elasticsearch.transport.TransportFuture;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;
@ -113,17 +112,13 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler {
}
@Override
public long indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps,
long maxSeenAutoIdTimestampOnPrimary, long maxSeqNoOfDeletesOrUpdatesOnPrimary) {
final RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(
public void indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps, long maxSeenAutoIdTimestampOnPrimary,
long maxSeqNoOfDeletesOrUpdatesOnPrimary, ActionListener<Long> listener) {
final RecoveryTranslogOperationsRequest request = new RecoveryTranslogOperationsRequest(
recoveryId, shardId, operations, totalTranslogOps, maxSeenAutoIdTimestampOnPrimary, maxSeqNoOfDeletesOrUpdatesOnPrimary);
final TransportFuture<RecoveryTranslogOperationsResponse> future = transportService.submitRequest(
targetNode,
PeerRecoveryTargetService.Actions.TRANSLOG_OPS,
translogOperationsRequest,
translogOpsRequestOptions,
RecoveryTranslogOperationsResponse.HANDLER);
return future.txGet().localCheckpoint;
transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.TRANSLOG_OPS, request, translogOpsRequestOptions,
new ActionListenerResponseHandler<>(ActionListener.wrap(r -> listener.onResponse(r.localCheckpoint), listener::onFailure),
RecoveryTranslogOperationsResponse::new, ThreadPool.Names.GENERIC));
}
@Override

View File

@ -503,10 +503,10 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
return new RecoveryTarget(indexShard, node, recoveryListener, l -> {
}) {
@Override
public long indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps,
long maxSeenAutoIdTimestamp, long maxSeqNoOfUpdates) throws IOException {
public void indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps,
long maxSeenAutoIdTimestamp, long msu, ActionListener<Long> listener) {
opsSent.set(true);
return super.indexTranslogOperations(operations, totalTranslogOps, maxSeenAutoIdTimestamp, maxSeqNoOfUpdates);
super.indexTranslogOperations(operations, totalTranslogOps, maxSeenAutoIdTimestamp, msu, listener);
}
};
});
@ -573,9 +573,9 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
replica,
(indexShard, node) -> new RecoveryTarget(indexShard, node, recoveryListener, l -> {}) {
@Override
public long indexTranslogOperations(final List<Translog.Operation> operations, final int totalTranslogOps,
final long maxAutoIdTimestamp, long maxSeqNoOfUpdates)
throws IOException {
public void indexTranslogOperations(final List<Translog.Operation> operations, final int totalTranslogOps,
final long maxAutoIdTimestamp, long maxSeqNoOfUpdates,
ActionListener<Long> listener) {
// index a doc which is not part of the snapshot, but also does not complete on replica
replicaEngineFactory.latchIndexers(1);
threadPool.generic().submit(() -> {
@ -602,7 +602,7 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
} catch (InterruptedException e) {
throw new AssertionError(e);
}
return super.indexTranslogOperations(operations, totalTranslogOps, maxAutoIdTimestamp, maxSeqNoOfUpdates);
super.indexTranslogOperations(operations, totalTranslogOps, maxAutoIdTimestamp, maxSeqNoOfUpdates, listener);
}
});
pendingDocActiveWithExtraDocIndexed.await();
@ -833,12 +833,12 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
}
@Override
public long indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps,
long maxAutoIdTimestamp, long maxSeqNoOfUpdates) throws IOException {
public void indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps,
long maxAutoIdTimestamp, long maxSeqNoOfUpdates, ActionListener<Long> listener) {
if (hasBlocked() == false) {
blockIfNeeded(RecoveryState.Stage.TRANSLOG);
}
return super.indexTranslogOperations(operations, totalTranslogOps, maxAutoIdTimestamp, maxSeqNoOfUpdates);
super.indexTranslogOperations(operations, totalTranslogOps, maxAutoIdTimestamp, maxSeqNoOfUpdates, listener);
}
@Override

View File

@ -2422,12 +2422,10 @@ public class IndexShardTests extends IndexShardTestCase {
new RecoveryTarget(shard, discoveryNode, recoveryListener, aLong -> {
}) {
@Override
public long indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps, long maxSeenAutoIdTimestamp,
long maxSeqNoOfUpdatesOrDeletes) throws IOException {
final long localCheckpoint = super.indexTranslogOperations(
operations, totalTranslogOps, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes);
assertFalse(replica.isSyncNeeded());
return localCheckpoint;
public void indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps, long maxSeenAutoIdTimestamp,
long maxSeqNoOfUpdatesOrDeletes, ActionListener<Long> listener){
super.indexTranslogOperations(operations, totalTranslogOps, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes,
ActionListener.runAfter(listener, () -> assertFalse(replica.isSyncNeeded())));
}
}, true, true);
@ -2531,13 +2529,14 @@ public class IndexShardTests extends IndexShardTestCase {
new RecoveryTarget(shard, discoveryNode, recoveryListener, aLong -> {
}) {
@Override
public long indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps,
long maxAutoIdTimestamp, long maxSeqNoOfUpdatesOrDeletes) throws IOException {
final long localCheckpoint = super.indexTranslogOperations(
operations, totalTranslogOps, maxAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes);
// Shard should now be active since we did recover:
assertTrue(replica.isActive());
return localCheckpoint;
public void indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps, long maxAutoIdTimestamp,
long maxSeqNoOfUpdatesOrDeletes, ActionListener<Long> listener){
super.indexTranslogOperations(operations, totalTranslogOps, maxAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes,
ActionListener.wrap(checkpoint -> {
listener.onResponse(checkpoint);
// Shard should now be active since we did recover:
assertTrue(replica.isActive());
}, listener::onFailure));
}
}, false, true);
@ -2580,12 +2579,13 @@ public class IndexShardTests extends IndexShardTestCase {
}
@Override
public long indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps,
long maxAutoIdTimestamp, long maxSeqNoOfUpdatesOrDeletes) throws IOException {
final long localCheckpoint = super.indexTranslogOperations(
operations, totalTranslogOps, maxAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes);
assertListenerCalled.accept(replica);
return localCheckpoint;
public void indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps, long maxAutoIdTimestamp,
long maxSeqNoOfUpdatesOrDeletes, ActionListener<Long> listener) {
super.indexTranslogOperations(operations, totalTranslogOps, maxAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes,
ActionListener.wrap(checkpoint -> {
assertListenerCalled.accept(replica);
listener.onResponse(checkpoint);
}, listener::onFailure));
}
@Override

View File

@ -221,15 +221,18 @@ public class RecoverySourceHandlerTests extends ESTestCase {
final List<Translog.Operation> shippedOps = new ArrayList<>();
RecoveryTargetHandler recoveryTarget = new TestRecoveryTargetHandler() {
@Override
public long indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps, long timestamp, long msu) {
public void indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps, long timestamp, long msu,
ActionListener<Long> listener) {
shippedOps.addAll(operations);
return SequenceNumbers.NO_OPS_PERFORMED;
listener.onResponse(SequenceNumbers.NO_OPS_PERFORMED);
}
};
RecoverySourceHandler handler = new RecoverySourceHandler(shard, recoveryTarget, request, fileChunkSizeInBytes, between(1, 10));
RecoverySourceHandler.SendSnapshotResult result = handler.phase2(startingSeqNo, requiredStartingSeqNo,
endingSeqNo, newTranslogSnapshot(operations, Collections.emptyList()), randomNonNegativeLong(), randomNonNegativeLong());
PlainActionFuture<RecoverySourceHandler.SendSnapshotResult> future = new PlainActionFuture<>();
handler.phase2(startingSeqNo, requiredStartingSeqNo, endingSeqNo, newTranslogSnapshot(operations, Collections.emptyList()),
randomNonNegativeLong(), randomNonNegativeLong(), future);
final int expectedOps = (int) (endingSeqNo - startingSeqNo + 1);
RecoverySourceHandler.SendSnapshotResult result = future.actionGet();
assertThat(result.totalOperations, equalTo(expectedOps));
shippedOps.sort(Comparator.comparing(Translog.Operation::seqNo));
assertThat(shippedOps.size(), equalTo(expectedOps));
@ -241,8 +244,12 @@ public class RecoverySourceHandlerTests extends ESTestCase {
List<Translog.Operation> requiredOps = operations.subList(0, operations.size() - 1).stream() // remove last null marker
.filter(o -> o.seqNo() >= requiredStartingSeqNo && o.seqNo() <= endingSeqNo).collect(Collectors.toList());
List<Translog.Operation> opsToSkip = randomSubsetOf(randomIntBetween(1, requiredOps.size()), requiredOps);
expectThrows(IllegalStateException.class, () -> handler.phase2(startingSeqNo, requiredStartingSeqNo,
endingSeqNo, newTranslogSnapshot(operations, opsToSkip), randomNonNegativeLong(), randomNonNegativeLong()));
PlainActionFuture<RecoverySourceHandler.SendSnapshotResult> failedFuture = new PlainActionFuture<>();
expectThrows(IllegalStateException.class, () -> {
handler.phase2(startingSeqNo, requiredStartingSeqNo, endingSeqNo, newTranslogSnapshot(operations, opsToSkip),
randomNonNegativeLong(), randomNonNegativeLong(), failedFuture);
failedFuture.actionGet();
});
}
}
@ -438,11 +445,12 @@ public class RecoverySourceHandlerTests extends ESTestCase {
}
@Override
SendSnapshotResult phase2(long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, Translog.Snapshot snapshot,
long maxSeenAutoIdTimestamp, long maxSeqNoOfUpdatesOrDeletes) throws IOException {
void phase2(long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, Translog.Snapshot snapshot,
long maxSeenAutoIdTimestamp, long maxSeqNoOfUpdatesOrDeletes,
ActionListener<SendSnapshotResult> listener) throws IOException {
phase2Called.set(true);
return super.phase2(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot,
maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes);
super.phase2(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot,
maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes, listener);
}
};
@ -656,8 +664,8 @@ public class RecoverySourceHandlerTests extends ESTestCase {
}
@Override
public long indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps, long timestamp, long msu) {
return 0;
public void indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps, long timestamp, long msu,
ActionListener<Long> listener) {
}
@Override