Merge pull request #16308 from s1monw/never_interrupt_store_checks

Prevent interruption while store checks lucene files for consistency
This commit is contained in:
Simon Willnauer 2016-01-29 13:04:55 +01:00
commit 65c0f80b13

View File

@ -252,60 +252,58 @@ public class RecoverySourceHandler {
final AtomicLong bytesSinceLastPause = new AtomicLong(); final AtomicLong bytesSinceLastPause = new AtomicLong();
final Function<StoreFileMetaData, OutputStream> outputStreamFactories = (md) -> new BufferedOutputStream(new RecoveryOutputStream(md, bytesSinceLastPause, translogView), chunkSizeInBytes); final Function<StoreFileMetaData, OutputStream> outputStreamFactories = (md) -> new BufferedOutputStream(new RecoveryOutputStream(md, bytesSinceLastPause, translogView), chunkSizeInBytes);
sendFiles(store, phase1Files.toArray(new StoreFileMetaData[phase1Files.size()]), outputStreamFactories); sendFiles(store, phase1Files.toArray(new StoreFileMetaData[phase1Files.size()]), outputStreamFactories);
cancellableThreads.execute(() -> { // Send the CLEAN_FILES request, which takes all of the files that
// Send the CLEAN_FILES request, which takes all of the files that // were transferred and renames them from their temporary file
// were transferred and renames them from their temporary file // names to the actual file names. It also writes checksums for
// names to the actual file names. It also writes checksums for // the files after they have been renamed.
// the files after they have been renamed. //
// // Once the files have been renamed, any other files that are not
// Once the files have been renamed, any other files that are not // related to this recovery (out of date segments, for example)
// related to this recovery (out of date segments, for example) // are deleted
// are deleted try {
try { cancellableThreads.execute(() -> {
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.CLEAN_FILES, transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.CLEAN_FILES,
new RecoveryCleanFilesRequest(request.recoveryId(), shard.shardId(), recoverySourceMetadata, translogView.totalOperations()), new RecoveryCleanFilesRequest(request.recoveryId(), shard.shardId(), recoverySourceMetadata, translogView.totalOperations()),
TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(), TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(),
EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
} catch (RemoteTransportException remoteException) { });
final IOException corruptIndexException; } catch (RemoteTransportException remoteException) {
// we realized that after the index was copied and we wanted to finalize the recovery final IOException corruptIndexException;
// the index was corrupted: // we realized that after the index was copied and we wanted to finalize the recovery
// - maybe due to a broken segments file on an empty index (transferred with no checksum) // the index was corrupted:
// - maybe due to old segments without checksums or length only checks // - maybe due to a broken segments file on an empty index (transferred with no checksum)
if ((corruptIndexException = ExceptionsHelper.unwrapCorruption(remoteException)) != null) { // - maybe due to old segments without checksums or length only checks
try { if ((corruptIndexException = ExceptionsHelper.unwrapCorruption(remoteException)) != null) {
final Store.MetadataSnapshot recoverySourceMetadata1 = store.getMetadata(snapshot); try {
StoreFileMetaData[] metadata = final Store.MetadataSnapshot recoverySourceMetadata1 = store.getMetadata(snapshot);
StreamSupport.stream(recoverySourceMetadata1.spliterator(), false).toArray(size -> new StoreFileMetaData[size]); StoreFileMetaData[] metadata =
ArrayUtil.timSort(metadata, new Comparator<StoreFileMetaData>() { StreamSupport.stream(recoverySourceMetadata1.spliterator(), false).toArray(size -> new StoreFileMetaData[size]);
@Override ArrayUtil.timSort(metadata, (o1, o2) -> {
public int compare(StoreFileMetaData o1, StoreFileMetaData o2) { return Long.compare(o1.length(), o2.length()); // check small files first
return Long.compare(o1.length(), o2.length()); // check small files first });
} for (StoreFileMetaData md : metadata) {
}); cancellableThreads.checkForCancel();
for (StoreFileMetaData md : metadata) { logger.debug("{} checking integrity for file {} after remove corruption exception", shard.shardId(), md);
logger.debug("{} checking integrity for file {} after remove corruption exception", shard.shardId(), md); if (store.checkIntegrityNoException(md) == false) { // we are corrupted on the primary -- fail!
if (store.checkIntegrityNoException(md) == false) { // we are corrupted on the primary -- fail! shard.failShard("recovery", corruptIndexException);
shard.failShard("recovery", corruptIndexException); logger.warn("{} Corrupted file detected {} checksum mismatch", shard.shardId(), md);
logger.warn("{} Corrupted file detected {} checksum mismatch", shard.shardId(), md); throw corruptIndexException;
throw corruptIndexException;
}
} }
} catch (IOException ex) {
remoteException.addSuppressed(ex);
throw remoteException;
} }
// corruption has happened on the way to replica } catch (IOException ex) {
RemoteTransportException exception = new RemoteTransportException("File corruption occurred on recovery but checksums are ok", null); remoteException.addSuppressed(ex);
exception.addSuppressed(remoteException);
logger.warn("{} Remote file corruption during finalization on node {}, recovering {}. local checksum OK",
corruptIndexException, shard.shardId(), request.targetNode());
throw exception;
} else {
throw remoteException; throw remoteException;
} }
// corruption has happened on the way to replica
RemoteTransportException exception = new RemoteTransportException("File corruption occurred on recovery but checksums are ok", null);
exception.addSuppressed(remoteException);
logger.warn("{} Remote file corruption during finalization on node {}, recovering {}. local checksum OK",
corruptIndexException, shard.shardId(), request.targetNode());
throw exception;
} else {
throw remoteException;
} }
}); }
} }
prepareTargetForTranslog(translogView.totalOperations()); prepareTargetForTranslog(translogView.totalOperations());