skip everything between file checks and translog prepare if sync ids coincide
also throw exception instead of assert if num docs no coincide
This commit is contained in:
parent
8ef734908c
commit
8a3d504efb
|
@ -50,6 +50,7 @@ import org.elasticsearch.common.util.CancellableThreads;
|
||||||
import org.elasticsearch.common.util.CancellableThreads.Interruptable;
|
import org.elasticsearch.common.util.CancellableThreads.Interruptable;
|
||||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||||
|
import org.elasticsearch.gateway.CorruptStateException;
|
||||||
import org.elasticsearch.index.IndexService;
|
import org.elasticsearch.index.IndexService;
|
||||||
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
|
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
|
||||||
import org.elasticsearch.index.engine.Engine;
|
import org.elasticsearch.index.engine.Engine;
|
||||||
|
@ -131,7 +132,9 @@ public class RecoverySourceHandler {
|
||||||
this.response = new RecoveryResponse();
|
this.response = new RecoveryResponse();
|
||||||
}
|
}
|
||||||
|
|
||||||
/** performs the recovery from the local engine to the target */
|
/**
|
||||||
|
* performs the recovery from the local engine to the target
|
||||||
|
*/
|
||||||
public RecoveryResponse recoverToTarget() {
|
public RecoveryResponse recoverToTarget() {
|
||||||
final Engine engine = shard.engine();
|
final Engine engine = shard.engine();
|
||||||
assert engine.getTranslog() != null : "translog must not be null";
|
assert engine.getTranslog() != null : "translog must not be null";
|
||||||
|
@ -207,17 +210,13 @@ public class RecoverySourceHandler {
|
||||||
final boolean recoverWithSyncId = recoverySourceSyncId != null &&
|
final boolean recoverWithSyncId = recoverySourceSyncId != null &&
|
||||||
recoverySourceSyncId.equals(recoveryTargetSyncId);
|
recoverySourceSyncId.equals(recoveryTargetSyncId);
|
||||||
if (recoverWithSyncId) {
|
if (recoverWithSyncId) {
|
||||||
assert request.metadataSnapshot().getNumDocs() == recoverySourceMetadata.getNumDocs();
|
long numDocsTarget = request.metadataSnapshot().getNumDocs();
|
||||||
for (StoreFileMetaData md : request.metadataSnapshot()) {
|
long numDocsSource = recoverySourceMetadata.getNumDocs();
|
||||||
response.phase1ExistingFileNames.add(md.name());
|
if (numDocsTarget != numDocsSource) {
|
||||||
response.phase1ExistingFileSizes.add(md.length());
|
throw new IllegalStateException("try to recover " + request.shardId() + " from primary shard with sync id but number of docs differ: " + numDocsTarget + " (" + request.sourceNode().getName() + ", primary) vs " + numDocsSource + "(" + request.targetNode().getName() + ")");
|
||||||
existingTotalSize += md.length();
|
|
||||||
if (logger.isTraceEnabled()) {
|
|
||||||
logger.trace("[{}][{}] recovery [phase1] to {}: not recovering [{}], checksum [{}], size [{}], sync ids {} coincide, will skip file copy",
|
|
||||||
indexName, shardId, request.targetNode(), md.name(), md.checksum(), md.length(), recoverySourceMetadata.getCommitUserData().get(Engine.SYNC_COMMIT_ID));
|
|
||||||
}
|
|
||||||
totalSize += md.length();
|
|
||||||
}
|
}
|
||||||
|
// we shortcut recovery here because we have nothing to copy. but we must still start the engine on the target.
|
||||||
|
// so we don't return here
|
||||||
} else {
|
} else {
|
||||||
final Store.RecoveryDiff diff = recoverySourceMetadata.recoveryDiff(request.metadataSnapshot());
|
final Store.RecoveryDiff diff = recoverySourceMetadata.recoveryDiff(request.metadataSnapshot());
|
||||||
for (StoreFileMetaData md : diff.identical) {
|
for (StoreFileMetaData md : diff.identical) {
|
||||||
|
@ -242,224 +241,224 @@ public class RecoverySourceHandler {
|
||||||
response.phase1FileSizes.add(md.length());
|
response.phase1FileSizes.add(md.length());
|
||||||
totalSize += md.length();
|
totalSize += md.length();
|
||||||
}
|
}
|
||||||
}
|
|
||||||
response.phase1TotalSize = totalSize;
|
|
||||||
response.phase1ExistingTotalSize = existingTotalSize;
|
|
||||||
|
|
||||||
logger.trace("[{}][{}] recovery [phase1] to {}: recovering_files [{}] with total_size [{}], reusing_files [{}] with total_size [{}]",
|
response.phase1TotalSize = totalSize;
|
||||||
indexName, shardId, request.targetNode(), response.phase1FileNames.size(),
|
response.phase1ExistingTotalSize = existingTotalSize;
|
||||||
new ByteSizeValue(totalSize), response.phase1ExistingFileNames.size(), new ByteSizeValue(existingTotalSize));
|
|
||||||
cancellableThreads.execute(new Interruptable() {
|
logger.trace("[{}][{}] recovery [phase1] to {}: recovering_files [{}] with total_size [{}], reusing_files [{}] with total_size [{}]",
|
||||||
@Override
|
indexName, shardId, request.targetNode(), response.phase1FileNames.size(),
|
||||||
public void run() throws InterruptedException {
|
new ByteSizeValue(totalSize), response.phase1ExistingFileNames.size(), new ByteSizeValue(existingTotalSize));
|
||||||
RecoveryFilesInfoRequest recoveryInfoFilesRequest = new RecoveryFilesInfoRequest(request.recoveryId(), request.shardId(),
|
cancellableThreads.execute(new Interruptable() {
|
||||||
response.phase1FileNames, response.phase1FileSizes, response.phase1ExistingFileNames, response.phase1ExistingFileSizes,
|
@Override
|
||||||
translogView.totalOperations());
|
public void run() throws InterruptedException {
|
||||||
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILES_INFO, recoveryInfoFilesRequest,
|
RecoveryFilesInfoRequest recoveryInfoFilesRequest = new RecoveryFilesInfoRequest(request.recoveryId(), request.shardId(),
|
||||||
TransportRequestOptions.options().withTimeout(recoverySettings.internalActionTimeout()),
|
response.phase1FileNames, response.phase1FileSizes, response.phase1ExistingFileNames, response.phase1ExistingFileSizes,
|
||||||
EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
|
translogView.totalOperations());
|
||||||
|
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILES_INFO, recoveryInfoFilesRequest,
|
||||||
|
TransportRequestOptions.options().withTimeout(recoverySettings.internalActionTimeout()),
|
||||||
|
EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// This latch will be used to wait until all files have been transferred to the target node
|
||||||
|
final CountDownLatch latch = new CountDownLatch(response.phase1FileNames.size());
|
||||||
|
final CopyOnWriteArrayList<Throwable> exceptions = new CopyOnWriteArrayList<>();
|
||||||
|
final AtomicReference<Throwable> corruptedEngine = new AtomicReference<>();
|
||||||
|
int fileIndex = 0;
|
||||||
|
ThreadPoolExecutor pool;
|
||||||
|
|
||||||
|
// How many bytes we've copied since we last called RateLimiter.pause
|
||||||
|
final AtomicLong bytesSinceLastPause = new AtomicLong();
|
||||||
|
|
||||||
|
for (final String name : response.phase1FileNames) {
|
||||||
|
long fileSize = response.phase1FileSizes.get(fileIndex);
|
||||||
|
|
||||||
|
// Files are split into two categories, files that are "small"
|
||||||
|
// (under 5mb) and other files. Small files are transferred
|
||||||
|
// using a separate thread pool dedicated to small files.
|
||||||
|
//
|
||||||
|
// The idea behind this is that while we are transferring an
|
||||||
|
// older, large index, a user may create a new index, but that
|
||||||
|
// index will not be able to recover until the large index
|
||||||
|
// finishes, by using two different thread pools we can allow
|
||||||
|
// tiny files (like segments for a brand new index) to be
|
||||||
|
// recovered while ongoing large segment recoveries are
|
||||||
|
// happening. It also allows these pools to be configured
|
||||||
|
// separately.
|
||||||
|
if (fileSize > RecoverySettings.SMALL_FILE_CUTOFF_BYTES) {
|
||||||
|
pool = recoverySettings.concurrentStreamPool();
|
||||||
|
} else {
|
||||||
|
pool = recoverySettings.concurrentSmallFileStreamPool();
|
||||||
|
}
|
||||||
|
|
||||||
|
pool.execute(new AbstractRunnable() {
|
||||||
|
@Override
|
||||||
|
public void onFailure(Throwable t) {
|
||||||
|
// we either got rejected or the store can't be incremented / we are canceled
|
||||||
|
logger.debug("Failed to transfer file [" + name + "] on recovery");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onAfter() {
|
||||||
|
// Signify this file has completed by decrementing the latch
|
||||||
|
latch.countDown();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void doRun() {
|
||||||
|
cancellableThreads.checkForCancel();
|
||||||
|
store.incRef();
|
||||||
|
final StoreFileMetaData md = recoverySourceMetadata.get(name);
|
||||||
|
try (final IndexInput indexInput = store.directory().openInput(name, IOContext.READONCE)) {
|
||||||
|
final int BUFFER_SIZE = (int) recoverySettings.fileChunkSize().bytes();
|
||||||
|
final byte[] buf = new byte[BUFFER_SIZE];
|
||||||
|
boolean shouldCompressRequest = recoverySettings.compress();
|
||||||
|
if (CompressorFactory.isCompressed(indexInput)) {
|
||||||
|
shouldCompressRequest = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
final long len = indexInput.length();
|
||||||
|
long readCount = 0;
|
||||||
|
final TransportRequestOptions requestOptions = TransportRequestOptions.options()
|
||||||
|
.withCompress(shouldCompressRequest)
|
||||||
|
.withType(TransportRequestOptions.Type.RECOVERY)
|
||||||
|
.withTimeout(recoverySettings.internalActionTimeout());
|
||||||
|
|
||||||
|
while (readCount < len) {
|
||||||
|
if (shard.state() == IndexShardState.CLOSED) { // check if the shard got closed on us
|
||||||
|
throw new IndexShardClosedException(shard.shardId());
|
||||||
|
}
|
||||||
|
int toRead = readCount + BUFFER_SIZE > len ? (int) (len - readCount) : BUFFER_SIZE;
|
||||||
|
final long position = indexInput.getFilePointer();
|
||||||
|
|
||||||
|
// Pause using the rate limiter, if desired, to throttle the recovery
|
||||||
|
RateLimiter rl = recoverySettings.rateLimiter();
|
||||||
|
long throttleTimeInNanos = 0;
|
||||||
|
if (rl != null) {
|
||||||
|
long bytes = bytesSinceLastPause.addAndGet(toRead);
|
||||||
|
if (bytes > rl.getMinPauseCheckBytes()) {
|
||||||
|
// Time to pause
|
||||||
|
bytesSinceLastPause.addAndGet(-bytes);
|
||||||
|
throttleTimeInNanos = rl.pause(bytes);
|
||||||
|
shard.recoveryStats().addThrottleTime(throttleTimeInNanos);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
indexInput.readBytes(buf, 0, toRead, false);
|
||||||
|
final BytesArray content = new BytesArray(buf, 0, toRead);
|
||||||
|
readCount += toRead;
|
||||||
|
final boolean lastChunk = readCount == len;
|
||||||
|
final RecoveryFileChunkRequest fileChunkRequest = new RecoveryFileChunkRequest(request.recoveryId(), request.shardId(), md, position,
|
||||||
|
content, lastChunk, translogView.totalOperations(), throttleTimeInNanos);
|
||||||
|
cancellableThreads.execute(new Interruptable() {
|
||||||
|
@Override
|
||||||
|
public void run() throws InterruptedException {
|
||||||
|
// Actually send the file chunk to the target node, waiting for it to complete
|
||||||
|
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILE_CHUNK,
|
||||||
|
fileChunkRequest, requestOptions, EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
}
|
||||||
|
} catch (Throwable e) {
|
||||||
|
final Throwable corruptIndexException;
|
||||||
|
if ((corruptIndexException = ExceptionsHelper.unwrapCorruption(e)) != null) {
|
||||||
|
if (store.checkIntegrityNoException(md) == false) { // we are corrupted on the primary -- fail!
|
||||||
|
logger.warn("{} Corrupted file detected {} checksum mismatch", shard.shardId(), md);
|
||||||
|
if (corruptedEngine.compareAndSet(null, corruptIndexException) == false) {
|
||||||
|
// if we are not the first exception, add ourselves as suppressed to the main one:
|
||||||
|
corruptedEngine.get().addSuppressed(e);
|
||||||
|
}
|
||||||
|
} else { // corruption has happened on the way to replica
|
||||||
|
RemoteTransportException exception = new RemoteTransportException("File corruption occurred on recovery but checksums are ok", null);
|
||||||
|
exception.addSuppressed(e);
|
||||||
|
exceptions.add(0, exception); // last exception first
|
||||||
|
logger.warn("{} Remote file corruption on node {}, recovering {}. local checksum OK",
|
||||||
|
corruptIndexException, shard.shardId(), request.targetNode(), md);
|
||||||
|
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
exceptions.add(0, e); // last exceptions first
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
store.decRef();
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
fileIndex++;
|
||||||
}
|
}
|
||||||
});
|
|
||||||
|
|
||||||
// This latch will be used to wait until all files have been transferred to the target node
|
cancellableThreads.execute(new Interruptable() {
|
||||||
final CountDownLatch latch = new CountDownLatch(response.phase1FileNames.size());
|
@Override
|
||||||
final CopyOnWriteArrayList<Throwable> exceptions = new CopyOnWriteArrayList<>();
|
public void run() throws InterruptedException {
|
||||||
final AtomicReference<Throwable> corruptedEngine = new AtomicReference<>();
|
// Wait for all files that need to be transferred to finish transferring
|
||||||
int fileIndex = 0;
|
latch.await();
|
||||||
ThreadPoolExecutor pool;
|
}
|
||||||
|
});
|
||||||
|
|
||||||
// How many bytes we've copied since we last called RateLimiter.pause
|
if (corruptedEngine.get() != null) {
|
||||||
final AtomicLong bytesSinceLastPause = new AtomicLong();
|
shard.engine().failEngine("recovery", corruptedEngine.get());
|
||||||
|
throw corruptedEngine.get();
|
||||||
for (final String name : response.phase1FileNames) {
|
|
||||||
long fileSize = response.phase1FileSizes.get(fileIndex);
|
|
||||||
|
|
||||||
// Files are split into two categories, files that are "small"
|
|
||||||
// (under 5mb) and other files. Small files are transferred
|
|
||||||
// using a separate thread pool dedicated to small files.
|
|
||||||
//
|
|
||||||
// The idea behind this is that while we are transferring an
|
|
||||||
// older, large index, a user may create a new index, but that
|
|
||||||
// index will not be able to recover until the large index
|
|
||||||
// finishes, by using two different thread pools we can allow
|
|
||||||
// tiny files (like segments for a brand new index) to be
|
|
||||||
// recovered while ongoing large segment recoveries are
|
|
||||||
// happening. It also allows these pools to be configured
|
|
||||||
// separately.
|
|
||||||
if (fileSize > RecoverySettings.SMALL_FILE_CUTOFF_BYTES) {
|
|
||||||
pool = recoverySettings.concurrentStreamPool();
|
|
||||||
} else {
|
} else {
|
||||||
pool = recoverySettings.concurrentSmallFileStreamPool();
|
ExceptionsHelper.rethrowAndSuppress(exceptions);
|
||||||
}
|
}
|
||||||
|
|
||||||
pool.execute(new AbstractRunnable() {
|
cancellableThreads.execute(new Interruptable() {
|
||||||
@Override
|
@Override
|
||||||
public void onFailure(Throwable t) {
|
public void run() throws InterruptedException {
|
||||||
// we either got rejected or the store can't be incremented / we are canceled
|
// Send the CLEAN_FILES request, which takes all of the files that
|
||||||
logger.debug("Failed to transfer file [" + name + "] on recovery");
|
// were transferred and renames them from their temporary file
|
||||||
}
|
// names to the actual file names. It also writes checksums for
|
||||||
|
// the files after they have been renamed.
|
||||||
@Override
|
//
|
||||||
public void onAfter() {
|
// Once the files have been renamed, any other files that are not
|
||||||
// Signify this file has completed by decrementing the latch
|
// related to this recovery (out of date segments, for example)
|
||||||
latch.countDown();
|
// are deleted
|
||||||
}
|
try {
|
||||||
|
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.CLEAN_FILES,
|
||||||
@Override
|
new RecoveryCleanFilesRequest(request.recoveryId(), shard.shardId(), recoverySourceMetadata, translogView.totalOperations()),
|
||||||
protected void doRun() {
|
TransportRequestOptions.options().withTimeout(recoverySettings.internalActionTimeout()),
|
||||||
cancellableThreads.checkForCancel();
|
EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
|
||||||
store.incRef();
|
} catch (RemoteTransportException remoteException) {
|
||||||
final StoreFileMetaData md = recoverySourceMetadata.get(name);
|
final IOException corruptIndexException;
|
||||||
try (final IndexInput indexInput = store.directory().openInput(name, IOContext.READONCE)) {
|
// we realized that after the index was copied and we wanted to finalize the recovery
|
||||||
final int BUFFER_SIZE = (int) recoverySettings.fileChunkSize().bytes();
|
// the index was corrupted:
|
||||||
final byte[] buf = new byte[BUFFER_SIZE];
|
// - maybe due to a broken segments file on an empty index (transferred with no checksum)
|
||||||
boolean shouldCompressRequest = recoverySettings.compress();
|
// - maybe due to old segments without checksums or length only checks
|
||||||
if (CompressorFactory.isCompressed(indexInput)) {
|
if ((corruptIndexException = ExceptionsHelper.unwrapCorruption(remoteException)) != null) {
|
||||||
shouldCompressRequest = false;
|
try {
|
||||||
}
|
final Store.MetadataSnapshot recoverySourceMetadata = store.getMetadata(snapshot);
|
||||||
|
StoreFileMetaData[] metadata = Iterables.toArray(recoverySourceMetadata, StoreFileMetaData.class);
|
||||||
final long len = indexInput.length();
|
ArrayUtil.timSort(metadata, new Comparator<StoreFileMetaData>() {
|
||||||
long readCount = 0;
|
@Override
|
||||||
final TransportRequestOptions requestOptions = TransportRequestOptions.options()
|
public int compare(StoreFileMetaData o1, StoreFileMetaData o2) {
|
||||||
.withCompress(shouldCompressRequest)
|
return Long.compare(o1.length(), o2.length()); // check small files first
|
||||||
.withType(TransportRequestOptions.Type.RECOVERY)
|
}
|
||||||
.withTimeout(recoverySettings.internalActionTimeout());
|
});
|
||||||
|
for (StoreFileMetaData md : metadata) {
|
||||||
while (readCount < len) {
|
logger.debug("{} checking integrity for file {} after remove corruption exception", shard.shardId(), md);
|
||||||
if (shard.state() == IndexShardState.CLOSED) { // check if the shard got closed on us
|
if (store.checkIntegrityNoException(md) == false) { // we are corrupted on the primary -- fail!
|
||||||
throw new IndexShardClosedException(shard.shardId());
|
logger.warn("{} Corrupted file detected {} checksum mismatch", shard.shardId(), md);
|
||||||
}
|
throw corruptIndexException;
|
||||||
int toRead = readCount + BUFFER_SIZE > len ? (int) (len - readCount) : BUFFER_SIZE;
|
}
|
||||||
final long position = indexInput.getFilePointer();
|
|
||||||
|
|
||||||
// Pause using the rate limiter, if desired, to throttle the recovery
|
|
||||||
RateLimiter rl = recoverySettings.rateLimiter();
|
|
||||||
long throttleTimeInNanos = 0;
|
|
||||||
if (rl != null) {
|
|
||||||
long bytes = bytesSinceLastPause.addAndGet(toRead);
|
|
||||||
if (bytes > rl.getMinPauseCheckBytes()) {
|
|
||||||
// Time to pause
|
|
||||||
bytesSinceLastPause.addAndGet(-bytes);
|
|
||||||
throttleTimeInNanos = rl.pause(bytes);
|
|
||||||
shard.recoveryStats().addThrottleTime(throttleTimeInNanos);
|
|
||||||
}
|
}
|
||||||
|
} catch (IOException ex) {
|
||||||
|
remoteException.addSuppressed(ex);
|
||||||
|
throw remoteException;
|
||||||
}
|
}
|
||||||
indexInput.readBytes(buf, 0, toRead, false);
|
// corruption has happened on the way to replica
|
||||||
final BytesArray content = new BytesArray(buf, 0, toRead);
|
RemoteTransportException exception = new RemoteTransportException("File corruption occurred on recovery but checksums are ok", null);
|
||||||
readCount += toRead;
|
exception.addSuppressed(remoteException);
|
||||||
final boolean lastChunk = readCount == len;
|
logger.warn("{} Remote file corruption during finalization on node {}, recovering {}. local checksum OK",
|
||||||
final RecoveryFileChunkRequest fileChunkRequest = new RecoveryFileChunkRequest(request.recoveryId(), request.shardId(), md, position,
|
corruptIndexException, shard.shardId(), request.targetNode());
|
||||||
content, lastChunk, translogView.totalOperations(), throttleTimeInNanos);
|
|
||||||
cancellableThreads.execute(new Interruptable() {
|
|
||||||
@Override
|
|
||||||
public void run() throws InterruptedException {
|
|
||||||
// Actually send the file chunk to the target node, waiting for it to complete
|
|
||||||
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILE_CHUNK,
|
|
||||||
fileChunkRequest, requestOptions, EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
}
|
|
||||||
} catch (Throwable e) {
|
|
||||||
final Throwable corruptIndexException;
|
|
||||||
if ((corruptIndexException = ExceptionsHelper.unwrapCorruption(e)) != null) {
|
|
||||||
if (store.checkIntegrityNoException(md) == false) { // we are corrupted on the primary -- fail!
|
|
||||||
logger.warn("{} Corrupted file detected {} checksum mismatch", shard.shardId(), md);
|
|
||||||
if (corruptedEngine.compareAndSet(null, corruptIndexException) == false) {
|
|
||||||
// if we are not the first exception, add ourselves as suppressed to the main one:
|
|
||||||
corruptedEngine.get().addSuppressed(e);
|
|
||||||
}
|
|
||||||
} else { // corruption has happened on the way to replica
|
|
||||||
RemoteTransportException exception = new RemoteTransportException("File corruption occurred on recovery but checksums are ok", null);
|
|
||||||
exception.addSuppressed(e);
|
|
||||||
exceptions.add(0, exception); // last exception first
|
|
||||||
logger.warn("{} Remote file corruption on node {}, recovering {}. local checksum OK",
|
|
||||||
corruptIndexException, shard.shardId(), request.targetNode(), md);
|
|
||||||
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
exceptions.add(0, e); // last exceptions first
|
throw remoteException;
|
||||||
}
|
}
|
||||||
} finally {
|
|
||||||
store.decRef();
|
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
fileIndex++;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
cancellableThreads.execute(new Interruptable() {
|
|
||||||
@Override
|
|
||||||
public void run() throws InterruptedException {
|
|
||||||
// Wait for all files that need to be transferred to finish transferring
|
|
||||||
latch.await();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
if (corruptedEngine.get() != null) {
|
|
||||||
shard.engine().failEngine("recovery", corruptedEngine.get());
|
|
||||||
throw corruptedEngine.get();
|
|
||||||
} else {
|
|
||||||
ExceptionsHelper.rethrowAndSuppress(exceptions);
|
|
||||||
}
|
|
||||||
|
|
||||||
cancellableThreads.execute(new Interruptable() {
|
|
||||||
@Override
|
|
||||||
public void run() throws InterruptedException {
|
|
||||||
// Send the CLEAN_FILES request, which takes all of the files that
|
|
||||||
// were transferred and renames them from their temporary file
|
|
||||||
// names to the actual file names. It also writes checksums for
|
|
||||||
// the files after they have been renamed.
|
|
||||||
//
|
|
||||||
// Once the files have been renamed, any other files that are not
|
|
||||||
// related to this recovery (out of date segments, for example)
|
|
||||||
// are deleted
|
|
||||||
try {
|
|
||||||
final Store.MetadataSnapshot remainingFilesAfterCleanup = recoverWithSyncId ? request.metadataSnapshot() : recoverySourceMetadata;
|
|
||||||
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.CLEAN_FILES,
|
|
||||||
new RecoveryCleanFilesRequest(request.recoveryId(), shard.shardId(), remainingFilesAfterCleanup, translogView.totalOperations()),
|
|
||||||
TransportRequestOptions.options().withTimeout(recoverySettings.internalActionTimeout()),
|
|
||||||
EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
|
|
||||||
} catch (RemoteTransportException remoteException) {
|
|
||||||
final IOException corruptIndexException;
|
|
||||||
// we realized that after the index was copied and we wanted to finalize the recovery
|
|
||||||
// the index was corrupted:
|
|
||||||
// - maybe due to a broken segments file on an empty index (transferred with no checksum)
|
|
||||||
// - maybe due to old segments without checksums or length only checks
|
|
||||||
if ((corruptIndexException = ExceptionsHelper.unwrapCorruption(remoteException)) != null) {
|
|
||||||
try {
|
|
||||||
final Store.MetadataSnapshot recoverySourceMetadata = store.getMetadata(snapshot);
|
|
||||||
StoreFileMetaData[] metadata = Iterables.toArray(recoverySourceMetadata, StoreFileMetaData.class);
|
|
||||||
ArrayUtil.timSort(metadata, new Comparator<StoreFileMetaData>() {
|
|
||||||
@Override
|
|
||||||
public int compare(StoreFileMetaData o1, StoreFileMetaData o2) {
|
|
||||||
return Long.compare(o1.length(), o2.length()); // check small files first
|
|
||||||
}
|
|
||||||
});
|
|
||||||
for (StoreFileMetaData md : metadata) {
|
|
||||||
logger.debug("{} checking integrity for file {} after remove corruption exception", shard.shardId(), md);
|
|
||||||
if (store.checkIntegrityNoException(md) == false) { // we are corrupted on the primary -- fail!
|
|
||||||
logger.warn("{} Corrupted file detected {} checksum mismatch", shard.shardId(), md);
|
|
||||||
throw corruptIndexException;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (IOException ex) {
|
|
||||||
remoteException.addSuppressed(ex);
|
|
||||||
throw remoteException;
|
|
||||||
}
|
|
||||||
// corruption has happened on the way to replica
|
|
||||||
RemoteTransportException exception = new RemoteTransportException("File corruption occurred on recovery but checksums are ok", null);
|
|
||||||
exception.addSuppressed(remoteException);
|
|
||||||
logger.warn("{} Remote file corruption during finalization on node {}, recovering {}. local checksum OK",
|
|
||||||
corruptIndexException, shard.shardId(), request.targetNode());
|
|
||||||
} else {
|
|
||||||
throw remoteException;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
prepareTargetForTranslog(translogView);
|
prepareTargetForTranslog(translogView);
|
||||||
|
|
||||||
logger.trace("[{}][{}] recovery [phase1] to {}: took [{}]", indexName, shardId, request.targetNode(), stopWatch.totalTime());
|
logger.trace("[{}][{}] recovery [phase1] to {}: took [{}]", indexName, shardId, request.targetNode(), stopWatch.totalTime());
|
||||||
|
|
Loading…
Reference in New Issue