Prevent interruption while store checks lucene files for consistency

This can cause premature closing of the underlying file descriptor immediately
if at the same time the thread is blocked on IO. The file descriptor will remain closed
and subsequent access to NIOFSDirectory will throw a ClosedChannelException.
This commit is contained in:
Simon Willnauer 2016-01-29 12:09:00 +01:00
parent ce89039926
commit f6b922a97a
1 changed files with 46 additions and 48 deletions

View File

@ -252,60 +252,58 @@ public class RecoverySourceHandler {
final AtomicLong bytesSinceLastPause = new AtomicLong(); final AtomicLong bytesSinceLastPause = new AtomicLong();
final Function<StoreFileMetaData, OutputStream> outputStreamFactories = (md) -> new BufferedOutputStream(new RecoveryOutputStream(md, bytesSinceLastPause, translogView), chunkSizeInBytes); final Function<StoreFileMetaData, OutputStream> outputStreamFactories = (md) -> new BufferedOutputStream(new RecoveryOutputStream(md, bytesSinceLastPause, translogView), chunkSizeInBytes);
sendFiles(store, phase1Files.toArray(new StoreFileMetaData[phase1Files.size()]), outputStreamFactories); sendFiles(store, phase1Files.toArray(new StoreFileMetaData[phase1Files.size()]), outputStreamFactories);
cancellableThreads.execute(() -> { // Send the CLEAN_FILES request, which takes all of the files that
// Send the CLEAN_FILES request, which takes all of the files that // were transferred and renames them from their temporary file
// were transferred and renames them from their temporary file // names to the actual file names. It also writes checksums for
// names to the actual file names. It also writes checksums for // the files after they have been renamed.
// the files after they have been renamed. //
// // Once the files have been renamed, any other files that are not
// Once the files have been renamed, any other files that are not // related to this recovery (out of date segments, for example)
// related to this recovery (out of date segments, for example) // are deleted
// are deleted try {
try { cancellableThreads.execute(() -> {
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.CLEAN_FILES, transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.CLEAN_FILES,
new RecoveryCleanFilesRequest(request.recoveryId(), shard.shardId(), recoverySourceMetadata, translogView.totalOperations()), new RecoveryCleanFilesRequest(request.recoveryId(), shard.shardId(), recoverySourceMetadata, translogView.totalOperations()),
TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(), TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(),
EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
} catch (RemoteTransportException remoteException) { });
final IOException corruptIndexException; } catch (RemoteTransportException remoteException) {
// we realized that after the index was copied and we wanted to finalize the recovery final IOException corruptIndexException;
// the index was corrupted: // we realized that after the index was copied and we wanted to finalize the recovery
// - maybe due to a broken segments file on an empty index (transferred with no checksum) // the index was corrupted:
// - maybe due to old segments without checksums or length only checks // - maybe due to a broken segments file on an empty index (transferred with no checksum)
if ((corruptIndexException = ExceptionsHelper.unwrapCorruption(remoteException)) != null) { // - maybe due to old segments without checksums or length only checks
try { if ((corruptIndexException = ExceptionsHelper.unwrapCorruption(remoteException)) != null) {
final Store.MetadataSnapshot recoverySourceMetadata1 = store.getMetadata(snapshot); try {
StoreFileMetaData[] metadata = final Store.MetadataSnapshot recoverySourceMetadata1 = store.getMetadata(snapshot);
StreamSupport.stream(recoverySourceMetadata1.spliterator(), false).toArray(size -> new StoreFileMetaData[size]); StoreFileMetaData[] metadata =
ArrayUtil.timSort(metadata, new Comparator<StoreFileMetaData>() { StreamSupport.stream(recoverySourceMetadata1.spliterator(), false).toArray(size -> new StoreFileMetaData[size]);
@Override ArrayUtil.timSort(metadata, (o1, o2) -> {
public int compare(StoreFileMetaData o1, StoreFileMetaData o2) { return Long.compare(o1.length(), o2.length()); // check small files first
return Long.compare(o1.length(), o2.length()); // check small files first });
} for (StoreFileMetaData md : metadata) {
}); cancellableThreads.checkForCancel();
for (StoreFileMetaData md : metadata) { logger.debug("{} checking integrity for file {} after remove corruption exception", shard.shardId(), md);
logger.debug("{} checking integrity for file {} after remove corruption exception", shard.shardId(), md); if (store.checkIntegrityNoException(md) == false) { // we are corrupted on the primary -- fail!
if (store.checkIntegrityNoException(md) == false) { // we are corrupted on the primary -- fail! shard.failShard("recovery", corruptIndexException);
shard.failShard("recovery", corruptIndexException); logger.warn("{} Corrupted file detected {} checksum mismatch", shard.shardId(), md);
logger.warn("{} Corrupted file detected {} checksum mismatch", shard.shardId(), md); throw corruptIndexException;
throw corruptIndexException;
}
} }
} catch (IOException ex) {
remoteException.addSuppressed(ex);
throw remoteException;
} }
// corruption has happened on the way to replica } catch (IOException ex) {
RemoteTransportException exception = new RemoteTransportException("File corruption occurred on recovery but checksums are ok", null); remoteException.addSuppressed(ex);
exception.addSuppressed(remoteException);
logger.warn("{} Remote file corruption during finalization on node {}, recovering {}. local checksum OK",
corruptIndexException, shard.shardId(), request.targetNode());
throw exception;
} else {
throw remoteException; throw remoteException;
} }
// corruption has happened on the way to replica
RemoteTransportException exception = new RemoteTransportException("File corruption occurred on recovery but checksums are ok", null);
exception.addSuppressed(remoteException);
logger.warn("{} Remote file corruption during finalization on node {}, recovering {}. local checksum OK",
corruptIndexException, shard.shardId(), request.targetNode());
throw exception;
} else {
throw remoteException;
} }
}); }
} }
prepareTargetForTranslog(translogView.totalOperations()); prepareTargetForTranslog(translogView.totalOperations());