Make peer recovery clean files step async (#43787)

Relates #36195
This commit is contained in:
Nhat Nguyen 2019-06-29 18:26:08 -04:00
parent 5e17bc5dcc
commit 55b3ec8d7b
11 changed files with 184 additions and 175 deletions

View File

@ -544,10 +544,10 @@ public class PeerRecoveryTargetService implements IndexEventListener {
@Override
public void messageReceived(RecoveryCleanFilesRequest request, TransportChannel channel, Task task) throws Exception {
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId()
)) {
recoveryRef.target().cleanFiles(request.totalTranslogOps(), request.getGlobalCheckpoint(), request.sourceMetaSnapshot());
channel.sendResponse(TransportResponse.Empty.INSTANCE);
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) {
final ActionListener<TransportResponse> listener = new ChannelActionListener<>(channel, Actions.CLEAN_FILES, request);
recoveryRef.target().cleanFiles(request.totalTranslogOps(), request.getGlobalCheckpoint(), request.sourceMetaSnapshot(),
ActionListener.map(listener, nullVal -> TransportResponse.Empty.INSTANCE));
}
}
}

View File

@ -40,6 +40,7 @@ import org.elasticsearch.common.StopWatch;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
import org.elasticsearch.common.unit.ByteSizeValue;
@ -75,7 +76,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.function.IntSupplier;
import java.util.stream.StreamSupport;
import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED;
@ -160,15 +161,21 @@ public class RecoverySourceHandler {
final long startingSeqNo;
final boolean isSequenceNumberBasedRecovery = request.startingSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO &&
isTargetSameHistory() && shard.hasCompleteHistoryOperations("peer-recovery", request.startingSeqNo());
final SendFileResult sendFileResult;
final StepListener<SendFileResult> sendFileStep = new StepListener<>();
final StepListener<TimeValue> prepareEngineStep = new StepListener<>();
final StepListener<SendSnapshotResult> sendSnapshotStep = new StepListener<>();
final StepListener<Void> finalizeStep = new StepListener<>();
if (isSequenceNumberBasedRecovery) {
logger.trace("performing sequence numbers based recovery. starting at [{}]", request.startingSeqNo());
startingSeqNo = request.startingSeqNo();
sendFileResult = SendFileResult.EMPTY;
sendFileStep.onResponse(SendFileResult.EMPTY);
} else {
final Engine.IndexCommitRef phase1Snapshot;
final Engine.IndexCommitRef safeCommitRef;
try {
phase1Snapshot = shard.acquireSafeIndexCommit();
safeCommitRef = shard.acquireSafeIndexCommit();
resources.add(safeCommitRef);
} catch (final Exception e) {
throw new RecoveryEngineException(shard.shardId(), 1, "snapshot failed", e);
}
@ -177,24 +184,29 @@ public class RecoverySourceHandler {
startingSeqNo = 0;
try {
final int estimateNumOps = shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo);
sendFileResult = phase1(phase1Snapshot.getIndexCommit(), shard.getLastKnownGlobalCheckpoint(), () -> estimateNumOps);
shard.store().incRef();
final Releasable releaseStore = Releasables.releaseOnce(shard.store()::decRef);
resources.add(releaseStore);
sendFileStep.whenComplete(r -> IOUtils.close(safeCommitRef, releaseStore), e -> {
try {
IOUtils.close(safeCommitRef, releaseStore);
} catch (final IOException ex) {
logger.warn("releasing snapshot caused exception", ex);
}
});
phase1(safeCommitRef.getIndexCommit(), shard.getLastKnownGlobalCheckpoint(), () -> estimateNumOps, sendFileStep);
} catch (final Exception e) {
throw new RecoveryEngineException(shard.shardId(), 1, "phase1 failed", e);
} finally {
try {
IOUtils.close(phase1Snapshot);
} catch (final IOException ex) {
logger.warn("releasing snapshot caused exception", ex);
}
throw new RecoveryEngineException(shard.shardId(), 1, "sendFileStep failed", e);
}
}
assert startingSeqNo >= 0 : "startingSeqNo must be non negative. got: " + startingSeqNo;
final StepListener<TimeValue> prepareEngineStep = new StepListener<>();
// For a sequence based recovery, the target can keep its local translog
prepareTargetForTranslog(isSequenceNumberBasedRecovery == false,
shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo), prepareEngineStep);
final StepListener<SendSnapshotResult> sendSnapshotStep = new StepListener<>();
sendFileStep.whenComplete(r -> {
// For a sequence based recovery, the target can keep its local translog
prepareTargetForTranslog(isSequenceNumberBasedRecovery == false,
shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo), prepareEngineStep);
}, onFailure);
prepareEngineStep.whenComplete(prepareEngineTime -> {
/*
* add shard to replication group (shard will receive replication requests from this point on) now that engine is open.
@ -231,12 +243,12 @@ public class RecoverySourceHandler {
}, onFailure);
final StepListener<Void> finalizeStep = new StepListener<>();
sendSnapshotStep.whenComplete(r -> finalizeRecovery(r.targetLocalCheckpoint, finalizeStep), onFailure);
finalizeStep.whenComplete(r -> {
final long phase1ThrottlingWaitTime = 0L; // TODO: return the actual throttle time
final SendSnapshotResult sendSnapshotResult = sendSnapshotStep.result();
final SendFileResult sendFileResult = sendFileStep.result();
final RecoveryResponse response = new RecoveryResponse(sendFileResult.phase1FileNames, sendFileResult.phase1FileSizes,
sendFileResult.phase1ExistingFileNames, sendFileResult.phase1ExistingFileSizes, sendFileResult.totalSize,
sendFileResult.existingTotalSize, sendFileResult.took.millis(), phase1ThrottlingWaitTime,
@ -333,18 +345,17 @@ public class RecoverySourceHandler {
* segments that are missing. Only segments that have the same size and
* checksum can be reused
*/
public SendFileResult phase1(final IndexCommit snapshot, final long globalCheckpoint, final Supplier<Integer> translogOps) {
void phase1(IndexCommit snapshot, long globalCheckpoint, IntSupplier translogOps, ActionListener<SendFileResult> listener) {
cancellableThreads.checkForCancel();
// Total size of segment files that are recovered
long totalSize = 0;
long totalSizeInBytes = 0;
// Total size of segment files that were able to be re-used
long existingTotalSize = 0;
long existingTotalSizeInBytes = 0;
final List<String> phase1FileNames = new ArrayList<>();
final List<Long> phase1FileSizes = new ArrayList<>();
final List<String> phase1ExistingFileNames = new ArrayList<>();
final List<Long> phase1ExistingFileSizes = new ArrayList<>();
final Store store = shard.store();
store.incRef();
try {
StopWatch stopWatch = new StopWatch().start();
final Store.MetadataSnapshot recoverySourceMetadata;
@ -370,12 +381,12 @@ public class RecoverySourceHandler {
for (StoreFileMetaData md : diff.identical) {
phase1ExistingFileNames.add(md.name());
phase1ExistingFileSizes.add(md.length());
existingTotalSize += md.length();
existingTotalSizeInBytes += md.length();
if (logger.isTraceEnabled()) {
logger.trace("recovery [phase1]: not recovering [{}], exist in local store and has checksum [{}]," +
" size [{}]", md.name(), md.checksum(), md.length());
}
totalSize += md.length();
totalSizeInBytes += md.length();
}
List<StoreFileMetaData> phase1Files = new ArrayList<>(diff.different.size() + diff.missing.size());
phase1Files.addAll(diff.different);
@ -389,75 +400,33 @@ public class RecoverySourceHandler {
}
phase1FileNames.add(md.name());
phase1FileSizes.add(md.length());
totalSize += md.length();
totalSizeInBytes += md.length();
}
logger.trace("recovery [phase1]: recovering_files [{}] with total_size [{}], reusing_files [{}] with total_size [{}]",
phase1FileNames.size(), new ByteSizeValue(totalSize),
phase1ExistingFileNames.size(), new ByteSizeValue(existingTotalSize));
phase1FileNames.size(), new ByteSizeValue(totalSizeInBytes),
phase1ExistingFileNames.size(), new ByteSizeValue(existingTotalSizeInBytes));
cancellableThreads.execute(() -> recoveryTarget.receiveFileInfo(
phase1FileNames, phase1FileSizes, phase1ExistingFileNames, phase1ExistingFileSizes, translogOps.get()));
phase1FileNames, phase1FileSizes, phase1ExistingFileNames, phase1ExistingFileSizes, translogOps.getAsInt()));
sendFiles(store, phase1Files.toArray(new StoreFileMetaData[0]), translogOps);
// Send the CLEAN_FILES request, which takes all of the files that
// were transferred and renames them from their temporary file
// names to the actual file names. It also writes checksums for
// the files after they have been renamed.
//
// Once the files have been renamed, any other files that are not
// related to this recovery (out of date segments, for example)
// are deleted
try {
cancellableThreads.executeIO(() ->
recoveryTarget.cleanFiles(translogOps.get(), globalCheckpoint, recoverySourceMetadata));
} catch (RemoteTransportException | IOException targetException) {
final IOException corruptIndexException;
// we realized that after the index was copied and we wanted to finalize the recovery
// the index was corrupted:
// - maybe due to a broken segments file on an empty index (transferred with no checksum)
// - maybe due to old segments without checksums or length only checks
if ((corruptIndexException = ExceptionsHelper.unwrapCorruption(targetException)) != null) {
try {
final Store.MetadataSnapshot recoverySourceMetadata1 = store.getMetadata(snapshot);
StoreFileMetaData[] metadata =
StreamSupport.stream(recoverySourceMetadata1.spliterator(), false).toArray(StoreFileMetaData[]::new);
ArrayUtil.timSort(metadata, Comparator.comparingLong(StoreFileMetaData::length)); // check small files first
for (StoreFileMetaData md : metadata) {
cancellableThreads.checkForCancel();
logger.debug("checking integrity for file {} after remove corruption exception", md);
if (store.checkIntegrityNoException(md) == false) { // we are corrupted on the primary -- fail!
shard.failShard("recovery", corruptIndexException);
logger.warn("Corrupted file detected {} checksum mismatch", md);
throw corruptIndexException;
}
}
} catch (IOException ex) {
targetException.addSuppressed(ex);
throw targetException;
}
// corruption has happened on the way to replica
RemoteTransportException exception = new RemoteTransportException("File corruption occurred on recovery but " +
"checksums are ok", null);
exception.addSuppressed(targetException);
logger.warn(() -> new ParameterizedMessage(
"{} Remote file corruption during finalization of recovery on node {}. local checksum OK",
shard.shardId(), request.targetNode()), corruptIndexException);
throw exception;
} else {
throw targetException;
}
}
final long totalSize = totalSizeInBytes;
final long existingTotalSize = existingTotalSizeInBytes;
cleanFiles(store, recoverySourceMetadata, translogOps, globalCheckpoint, ActionListener.map(listener, aVoid -> {
final TimeValue took = stopWatch.totalTime();
logger.trace("recovery [phase1]: took [{}]", took);
return new SendFileResult(phase1FileNames, phase1FileSizes, totalSize, phase1ExistingFileNames,
phase1ExistingFileSizes, existingTotalSize, took);
}));
} else {
logger.trace("skipping [phase1]- identical sync id [{}] found on both source and target",
recoverySourceMetadata.getSyncId());
final TimeValue took = stopWatch.totalTime();
logger.trace("recovery [phase1]: took [{}]", took);
listener.onResponse(new SendFileResult(phase1FileNames, phase1FileSizes, totalSizeInBytes, phase1ExistingFileNames,
phase1ExistingFileSizes, existingTotalSizeInBytes, took));
}
final TimeValue took = stopWatch.totalTime();
logger.trace("recovery [phase1]: took [{}]", took);
return new SendFileResult(phase1FileNames, phase1FileSizes, totalSize, phase1ExistingFileNames,
phase1ExistingFileSizes, existingTotalSize, took);
} catch (Exception e) {
throw new RecoverFilesRecoveryException(request.shardId(), phase1FileNames.size(), new ByteSizeValue(totalSize), e);
} finally {
store.decRef();
throw new RecoverFilesRecoveryException(request.shardId(), phase1FileNames.size(), new ByteSizeValue(totalSizeInBytes), e);
}
}
@ -695,7 +664,7 @@ public class RecoverySourceHandler {
'}';
}
void sendFiles(Store store, StoreFileMetaData[] files, Supplier<Integer> translogOps) throws Exception {
void sendFiles(Store store, StoreFileMetaData[] files, IntSupplier translogOps) throws Exception {
ArrayUtil.timSort(files, Comparator.comparingLong(StoreFileMetaData::length)); // send smallest first
final LocalCheckpointTracker requestSeqIdTracker = new LocalCheckpointTracker(NO_OPS_PERFORMED, NO_OPS_PERFORMED);
final AtomicReference<Tuple<StoreFileMetaData, Exception>> error = new AtomicReference<>();
@ -720,7 +689,7 @@ public class RecoverySourceHandler {
}
final long requestFilePosition = position;
cancellableThreads.executeIO(() ->
recoveryTarget.writeFileChunk(md, requestFilePosition, content, lastChunk, translogOps.get(),
recoveryTarget.writeFileChunk(md, requestFilePosition, content, lastChunk, translogOps.getAsInt(),
ActionListener.wrap(
r -> requestSeqIdTracker.markSeqNoAsProcessed(requestSeqId),
e -> {
@ -741,24 +710,53 @@ public class RecoverySourceHandler {
cancellableThreads.execute(() -> requestSeqIdTracker.waitForProcessedOpsToComplete(requestSeqIdTracker.getMaxSeqNo()));
}
if (error.get() != null) {
handleErrorOnSendFiles(store, error.get().v1(), error.get().v2());
handleErrorOnSendFiles(store, error.get().v2(), new StoreFileMetaData[]{error.get().v1()});
}
}
private void handleErrorOnSendFiles(Store store, StoreFileMetaData md, Exception e) throws Exception {
final IOException corruptIndexException;
if ((corruptIndexException = ExceptionsHelper.unwrapCorruption(e)) != null) {
if (store.checkIntegrityNoException(md) == false) { // we are corrupted on the primary -- fail!
logger.warn("{} Corrupted file detected {} checksum mismatch", shardId, md);
failEngine(corruptIndexException);
throw corruptIndexException;
private void cleanFiles(Store store, Store.MetadataSnapshot sourceMetadata, IntSupplier translogOps,
long globalCheckpoint, ActionListener<Void> listener) {
// Send the CLEAN_FILES request, which takes all of the files that
// were transferred and renames them from their temporary file
// names to the actual file names. It also writes checksums for
// the files after they have been renamed.
//
// Once the files have been renamed, any other files that are not
// related to this recovery (out of date segments, for example)
// are deleted
cancellableThreads.execute(() -> recoveryTarget.cleanFiles(translogOps.getAsInt(), globalCheckpoint, sourceMetadata,
ActionListener.delegateResponse(listener, (l, e) -> ActionListener.completeWith(l, () -> {
StoreFileMetaData[] mds = StreamSupport.stream(sourceMetadata.spliterator(), false).toArray(StoreFileMetaData[]::new);
ArrayUtil.timSort(mds, Comparator.comparingLong(StoreFileMetaData::length)); // check small files first
handleErrorOnSendFiles(store, e, mds);
throw e;
}))));
}
private void handleErrorOnSendFiles(Store store, Exception e, StoreFileMetaData[] mds) throws Exception {
final IOException corruptIndexException = ExceptionsHelper.unwrapCorruption(e);
if (corruptIndexException != null) {
Exception localException = null;
for (StoreFileMetaData md : mds) {
cancellableThreads.checkForCancel();
logger.debug("checking integrity for file {} after remove corruption exception", md);
if (store.checkIntegrityNoException(md) == false) { // we are corrupted on the primary -- fail!
logger.warn("{} Corrupted file detected {} checksum mismatch", shardId, md);
if (localException == null) {
localException = corruptIndexException;
}
failEngine(corruptIndexException);
}
}
if (localException != null) {
throw localException;
} else { // corruption has happened on the way to replica
RemoteTransportException exception = new RemoteTransportException(
RemoteTransportException remoteException = new RemoteTransportException(
"File corruption occurred on recovery but checksums are ok", null);
exception.addSuppressed(e);
remoteException.addSuppressed(e);
logger.warn(() -> new ParameterizedMessage("{} Remote file corruption on node {}, recovering {}. local checksum OK",
shardId, request.targetNode(), md), corruptIndexException);
throw exception;
shardId, request.targetNode(), mds), corruptIndexException);
throw remoteException;
}
} else {
throw e;

View File

@ -392,57 +392,61 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
}
@Override
public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetaData) throws IOException {
state().getTranslog().totalOperations(totalTranslogOps);
// first, we go and move files that were created with the recovery id suffix to
// the actual names, its ok if we have a corrupted index here, since we have replicas
// to recover from in case of a full cluster shutdown just when this code executes...
multiFileWriter.renameAllTempFiles();
final Store store = store();
store.incRef();
try {
store.cleanupAndVerify("recovery CleanFilesRequestHandler", sourceMetaData);
if (indexShard.indexSettings().getIndexVersionCreated().before(Version.V_6_0_0_rc1)) {
store.ensureIndexHasHistoryUUID();
}
final String translogUUID = Translog.createEmptyTranslog(
indexShard.shardPath().resolveTranslog(), globalCheckpoint, shardId, indexShard.getPendingPrimaryTerm());
store.associateIndexWithNewTranslog(translogUUID);
if (indexShard.getRetentionLeases().leases().isEmpty()) {
// if empty, may be a fresh IndexShard, so write an empty leases file to disk
indexShard.persistRetentionLeases();
assert indexShard.loadRetentionLeases().leases().isEmpty();
} else {
assert indexShard.assertRetentionLeasesPersisted();
}
} catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) {
// this is a fatal exception at this stage.
// this means we transferred files from the remote that have not be checksummed and they are
// broken. We have to clean up this shard entirely, remove all files and bubble it up to the
// source shard since this index might be broken there as well? The Source can handle this and checks
// its content on disk if possible.
public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetaData,
ActionListener<Void> listener) {
ActionListener.completeWith(listener, () -> {
state().getTranslog().totalOperations(totalTranslogOps);
// first, we go and move files that were created with the recovery id suffix to
// the actual names, its ok if we have a corrupted index here, since we have replicas
// to recover from in case of a full cluster shutdown just when this code executes...
multiFileWriter.renameAllTempFiles();
final Store store = store();
store.incRef();
try {
try {
store.removeCorruptionMarker();
} finally {
Lucene.cleanLuceneIndex(store.directory()); // clean up and delete all files
store.cleanupAndVerify("recovery CleanFilesRequestHandler", sourceMetaData);
if (indexShard.indexSettings().getIndexVersionCreated().before(Version.V_6_0_0_rc1)) {
store.ensureIndexHasHistoryUUID();
}
} catch (Exception e) {
logger.debug("Failed to clean lucene index", e);
ex.addSuppressed(e);
final String translogUUID = Translog.createEmptyTranslog(
indexShard.shardPath().resolveTranslog(), globalCheckpoint, shardId, indexShard.getPendingPrimaryTerm());
store.associateIndexWithNewTranslog(translogUUID);
if (indexShard.getRetentionLeases().leases().isEmpty()) {
// if empty, may be a fresh IndexShard, so write an empty leases file to disk
indexShard.persistRetentionLeases();
assert indexShard.loadRetentionLeases().leases().isEmpty();
} else {
assert indexShard.assertRetentionLeasesPersisted();
}
} catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) {
// this is a fatal exception at this stage.
// this means we transferred files from the remote that have not be checksummed and they are
// broken. We have to clean up this shard entirely, remove all files and bubble it up to the
// source shard since this index might be broken there as well? The Source can handle this and checks
// its content on disk if possible.
try {
try {
store.removeCorruptionMarker();
} finally {
Lucene.cleanLuceneIndex(store.directory()); // clean up and delete all files
}
} catch (Exception e) {
logger.debug("Failed to clean lucene index", e);
ex.addSuppressed(e);
}
RecoveryFailedException rfe = new RecoveryFailedException(state(), "failed to clean after recovery", ex);
fail(rfe, true);
throw rfe;
} catch (Exception ex) {
RecoveryFailedException rfe = new RecoveryFailedException(state(), "failed to clean after recovery", ex);
fail(rfe, true);
throw rfe;
} finally {
store.decRef();
}
RecoveryFailedException rfe = new RecoveryFailedException(state(), "failed to clean after recovery", ex);
fail(rfe, true);
throw rfe;
} catch (Exception ex) {
RecoveryFailedException rfe = new RecoveryFailedException(state(), "failed to clean after recovery", ex);
fail(rfe, true);
throw rfe;
} finally {
store.decRef();
}
return null;
});
}
@Override

View File

@ -26,7 +26,6 @@ import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.index.translog.Translog;
import java.io.IOException;
import java.util.List;
public interface RecoveryTargetHandler {
@ -99,7 +98,7 @@ public interface RecoveryTargetHandler {
* @param globalCheckpoint the global checkpoint on the primary
* @param sourceMetaData meta data of the source store
*/
void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetaData) throws IOException;
void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetaData, ActionListener<Void> listener);
/** writes a partial file chunk to the target store */
void writeFileChunk(StoreFileMetaData fileMetaData, long position, BytesReference content,

View File

@ -140,11 +140,13 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler {
}
@Override
public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetaData) throws IOException {
public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetaData,
ActionListener<Void> listener) {
transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.CLEAN_FILES,
new RecoveryCleanFilesRequest(recoveryId, shardId, sourceMetaData, totalTranslogOps, globalCheckpoint),
TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(),
EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
new ActionListenerResponseHandler<>(ActionListener.map(listener, r -> null),
in -> TransportResponse.Empty.INSTANCE, ThreadPool.Names.GENERIC));
}
@Override

View File

@ -122,14 +122,15 @@ public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase
(indexShard, node) -> new RecoveryTarget(indexShard, node, recoveryListener) {
@Override
public void cleanFiles(int totalTranslogOps, long globalCheckpoint,
Store.MetadataSnapshot sourceMetaData) throws IOException {
super.cleanFiles(totalTranslogOps, globalCheckpoint, sourceMetaData);
latch.countDown();
try {
latch.await();
} catch (InterruptedException e) {
throw new AssertionError(e);
}
Store.MetadataSnapshot sourceMetaData, ActionListener<Void> listener) {
super.cleanFiles(totalTranslogOps, globalCheckpoint, sourceMetaData, ActionListener.runAfter(listener, () -> {
latch.countDown();
try {
latch.await();
} catch (InterruptedException e) {
throw new AssertionError(e);
}
}));
}
});
future.get();

View File

@ -848,9 +848,10 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
}
@Override
public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetaData) throws IOException {
public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetaData,
ActionListener<Void> listener) {
blockIfNeeded(RecoveryState.Stage.INDEX);
super.cleanFiles(totalTranslogOps, globalCheckpoint, sourceMetaData);
super.cleanFiles(totalTranslogOps, globalCheckpoint, sourceMetaData, listener);
}
@Override

View File

@ -28,6 +28,7 @@ import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.UUIDs;
@ -189,7 +190,10 @@ public class PeerRecoveryTargetServiceTests extends IndexShardTestCase {
for (Thread sender : senders) {
sender.join();
}
recoveryTarget.cleanFiles(0, Long.parseLong(sourceSnapshot.getCommitUserData().get(SequenceNumbers.MAX_SEQ_NO)), sourceSnapshot);
PlainActionFuture<Void> cleanFilesFuture = new PlainActionFuture<>();
recoveryTarget.cleanFiles(0, Long.parseLong(sourceSnapshot.getCommitUserData().get(SequenceNumbers.MAX_SEQ_NO)),
sourceSnapshot, cleanFilesFuture);
cleanFilesFuture.actionGet();
recoveryTarget.decRef();
Store.MetadataSnapshot targetSnapshot = targetShard.snapshotStoreMetadata();
Store.RecoveryDiff diff = sourceSnapshot.recoveryDiff(targetSnapshot);

View File

@ -98,7 +98,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.IntSupplier;
import java.util.function.Supplier;
import java.util.zip.CRC32;
import static java.util.Collections.emptyMap;
@ -478,9 +477,9 @@ public class RecoverySourceHandlerTests extends ESTestCase {
between(1, 8)) {
@Override
public SendFileResult phase1(final IndexCommit snapshot, final long globalCheckpoint, final Supplier<Integer> translogOps) {
void phase1(IndexCommit snapshot, long globalCheckpoint, IntSupplier translogOps, ActionListener<SendFileResult> listener) {
phase1Called.set(true);
return super.phase1(snapshot, globalCheckpoint, translogOps);
super.phase1(snapshot, globalCheckpoint, translogOps, listener);
}
@Override
@ -758,7 +757,8 @@ public class RecoverySourceHandlerTests extends ESTestCase {
}
@Override
public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetaData) {
public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetaData,
ActionListener<Void> listener) {
}
@Override

View File

@ -47,7 +47,6 @@ import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.SnapshotMatchers;
import org.elasticsearch.index.translog.Translog;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -335,9 +334,10 @@ public class RecoveryTests extends ESIndexLevelReplicationTestCase {
assertThat(replicaShard.getLastKnownGlobalCheckpoint(), equalTo(primaryShard.getLastKnownGlobalCheckpoint()));
}
@Override
public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetaData) throws IOException {
public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetaData,
ActionListener<Void> listener) {
assertThat(globalCheckpoint, equalTo(primaryShard.getLastKnownGlobalCheckpoint()));
super.cleanFiles(totalTranslogOps, globalCheckpoint, sourceMetaData);
super.cleanFiles(totalTranslogOps, globalCheckpoint, sourceMetaData, listener);
}
}, true, true);
List<IndexCommit> commits = DirectoryReader.listCommits(replicaShard.store().directory());

View File

@ -29,7 +29,6 @@ import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.index.translog.Translog;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.Executor;
@ -75,8 +74,9 @@ public class AsyncRecoveryTarget implements RecoveryTargetHandler {
}
@Override
public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetaData) throws IOException {
target.cleanFiles(totalTranslogOps, globalCheckpoint, sourceMetaData);
public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetaData,
ActionListener<Void> listener) {
executor.execute(() -> target.cleanFiles(totalTranslogOps, globalCheckpoint, sourceMetaData, listener));
}
@Override