diff --git a/core/src/main/java/org/elasticsearch/common/util/CancellableThreads.java b/core/src/main/java/org/elasticsearch/common/util/CancellableThreads.java index 781218c6610..b8c5ba09b9c 100644 --- a/core/src/main/java/org/elasticsearch/common/util/CancellableThreads.java +++ b/core/src/main/java/org/elasticsearch/common/util/CancellableThreads.java @@ -19,6 +19,7 @@ package org.elasticsearch.common.util; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; @@ -131,7 +132,7 @@ public class CancellableThreads { public interface Interruptable { - public void run() throws InterruptedException; + void run() throws InterruptedException; } public static class ExecutionCancelledException extends ElasticsearchException { diff --git a/core/src/main/java/org/elasticsearch/index/store/Store.java b/core/src/main/java/org/elasticsearch/index/store/Store.java index c8394fc7a59..3d158ff925e 100644 --- a/core/src/main/java/org/elasticsearch/index/store/Store.java +++ b/core/src/main/java/org/elasticsearch/index/store/Store.java @@ -1544,4 +1544,5 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref return estimatedSize; } } + } diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 65b58868964..854546f656d 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -33,19 +33,15 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.StopWatch; import org.elasticsearch.common.bytes.BytesArray; -import org.elasticsearch.common.compress.CompressorFactory; +import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.lucene.store.InputStreamIndexInput; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.CancellableThreads; import org.elasticsearch.common.util.CancellableThreads.Interruptable; -import org.elasticsearch.common.util.iterable.Iterables; -import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.RecoveryEngineException; -import org.elasticsearch.index.shard.IllegalIndexShardStateException; -import org.elasticsearch.index.shard.IndexShard; -import org.elasticsearch.index.shard.IndexShardClosedException; -import org.elasticsearch.index.shard.IndexShardState; +import org.elasticsearch.index.shard.*; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.StoreFileMetaData; import org.elasticsearch.index.translog.Translog; @@ -55,14 +51,15 @@ import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportService; import java.io.IOException; +import java.io.OutputStream; import java.util.ArrayList; import java.util.Comparator; import java.util.List; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; import java.util.stream.StreamSupport; /** @@ -83,6 +80,8 @@ public class RecoverySourceHandler { private final TransportService transportService; protected final RecoveryResponse response; + private final TransportRequestOptions requestOptions; + private final CancellableThreads cancellableThreads = new CancellableThreads() { @Override protected void onCancel(String reason, @Nullable Throwable suppressedException) { @@ -99,7 +98,6 @@ public class RecoverySourceHandler { } }; - public RecoverySourceHandler(final IndexShard shard, final StartRecoveryRequest request, final RecoverySettings recoverySettings, final TransportService transportService, final ESLogger logger) { this.shard = shard; @@ -111,6 +109,11 @@ public class RecoverySourceHandler { this.shardId = this.request.shardId().id(); this.response = new RecoveryResponse(); + this.requestOptions = TransportRequestOptions.options() + .withCompress(recoverySettings.compress()) + .withType(TransportRequestOptions.Type.RECOVERY) + .withTimeout(recoverySettings.internalActionTimeout()); + } /** @@ -216,7 +219,10 @@ public class RecoverySourceHandler { } totalSize += md.length(); } - for (StoreFileMetaData md : Iterables.concat(diff.different, diff.missing)) { + List phase1Files = new ArrayList<>(diff.different.size() + diff.missing.size()); + phase1Files.addAll(diff.different); + phase1Files.addAll(diff.missing); + for (StoreFileMetaData md : phase1Files) { if (request.metadataSnapshot().asMap().containsKey(md.name())) { logger.trace("[{}][{}] recovery [phase1] to {}: recovering [{}], exists in local store, but is different: remote [{}], local [{}]", indexName, shardId, request.targetNode(), md.name(), request.metadataSnapshot().asMap().get(md.name()), md); @@ -235,215 +241,69 @@ public class RecoverySourceHandler { logger.trace("[{}][{}] recovery [phase1] to {}: recovering_files [{}] with total_size [{}], reusing_files [{}] with total_size [{}]", indexName, shardId, request.targetNode(), response.phase1FileNames.size(), new ByteSizeValue(totalSize), response.phase1ExistingFileNames.size(), new ByteSizeValue(existingTotalSize)); - cancellableThreads.execute(new Interruptable() { - @Override - public void run() throws InterruptedException { - RecoveryFilesInfoRequest recoveryInfoFilesRequest = new RecoveryFilesInfoRequest(request.recoveryId(), request.shardId(), - response.phase1FileNames, response.phase1FileSizes, response.phase1ExistingFileNames, response.phase1ExistingFileSizes, - translogView.totalOperations()); - transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILES_INFO, recoveryInfoFilesRequest, - TransportRequestOptions.options().withTimeout(recoverySettings.internalActionTimeout()), - EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); - } + cancellableThreads.execute(() -> { + RecoveryFilesInfoRequest recoveryInfoFilesRequest = new RecoveryFilesInfoRequest(request.recoveryId(), request.shardId(), + response.phase1FileNames, response.phase1FileSizes, response.phase1ExistingFileNames, response.phase1ExistingFileSizes, + translogView.totalOperations()); + transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILES_INFO, recoveryInfoFilesRequest, + TransportRequestOptions.options().withTimeout(recoverySettings.internalActionTimeout()), + EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); }); - - // This latch will be used to wait until all files have been transferred to the target node - final CountDownLatch latch = new CountDownLatch(response.phase1FileNames.size()); - final CopyOnWriteArrayList exceptions = new CopyOnWriteArrayList<>(); - final AtomicReference corruptedEngine = new AtomicReference<>(); - int fileIndex = 0; - ThreadPoolExecutor pool; - // How many bytes we've copied since we last called RateLimiter.pause final AtomicLong bytesSinceLastPause = new AtomicLong(); - - for (final String name : response.phase1FileNames) { - long fileSize = response.phase1FileSizes.get(fileIndex); - - // Files are split into two categories, files that are "small" - // (under 5mb) and other files. Small files are transferred - // using a separate thread pool dedicated to small files. + final Function outputStreamFactories = (md) -> new RecoveryOutputStream(md, bytesSinceLastPause, translogView); + sendFiles(store, phase1Files.toArray(new StoreFileMetaData[phase1Files.size()]), outputStreamFactories); + cancellableThreads.execute(() -> { + // Send the CLEAN_FILES request, which takes all of the files that + // were transferred and renames them from their temporary file + // names to the actual file names. It also writes checksums for + // the files after they have been renamed. // - // The idea behind this is that while we are transferring an - // older, large index, a user may create a new index, but that - // index will not be able to recover until the large index - // finishes, by using two different thread pools we can allow - // tiny files (like segments for a brand new index) to be - // recovered while ongoing large segment recoveries are - // happening. It also allows these pools to be configured - // separately. - if (fileSize > RecoverySettings.SMALL_FILE_CUTOFF_BYTES) { - pool = recoverySettings.concurrentStreamPool(); - } else { - pool = recoverySettings.concurrentSmallFileStreamPool(); - } - - pool.execute(new AbstractRunnable() { - @Override - public void onFailure(Throwable t) { - // we either got rejected or the store can't be incremented / we are canceled - logger.debug("Failed to transfer file [" + name + "] on recovery"); - } - - @Override - public void onAfter() { - // Signify this file has completed by decrementing the latch - latch.countDown(); - } - - @Override - protected void doRun() { - cancellableThreads.checkForCancel(); - store.incRef(); - final StoreFileMetaData md = recoverySourceMetadata.get(name); - try (final IndexInput indexInput = store.directory().openInput(name, IOContext.READONCE)) { - final int BUFFER_SIZE = (int) Math.max(1, recoverySettings.fileChunkSize().bytes()); // at least one! - final byte[] buf = new byte[BUFFER_SIZE]; - boolean shouldCompressRequest = recoverySettings.compress(); - if (CompressorFactory.isCompressed(indexInput)) { - shouldCompressRequest = false; - } - - final long len = indexInput.length(); - long readCount = 0; - final TransportRequestOptions requestOptions = TransportRequestOptions.options() - .withCompress(shouldCompressRequest) - .withType(TransportRequestOptions.Type.RECOVERY) - .withTimeout(recoverySettings.internalActionTimeout()); - - while (readCount < len) { - if (shard.state() == IndexShardState.CLOSED) { // check if the shard got closed on us - throw new IndexShardClosedException(shard.shardId()); + // Once the files have been renamed, any other files that are not + // related to this recovery (out of date segments, for example) + // are deleted + try { + transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.CLEAN_FILES, + new RecoveryCleanFilesRequest(request.recoveryId(), shard.shardId(), recoverySourceMetadata, translogView.totalOperations()), + TransportRequestOptions.options().withTimeout(recoverySettings.internalActionTimeout()), + EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); + } catch (RemoteTransportException remoteException) { + final IOException corruptIndexException; + // we realized that after the index was copied and we wanted to finalize the recovery + // the index was corrupted: + // - maybe due to a broken segments file on an empty index (transferred with no checksum) + // - maybe due to old segments without checksums or length only checks + if ((corruptIndexException = ExceptionsHelper.unwrapCorruption(remoteException)) != null) { + try { + final Store.MetadataSnapshot recoverySourceMetadata1 = store.getMetadata(snapshot); + StoreFileMetaData[] metadata = + StreamSupport.stream(recoverySourceMetadata1.spliterator(), false).toArray(size -> new StoreFileMetaData[size]); + ArrayUtil.timSort(metadata, new Comparator() { + @Override + public int compare(StoreFileMetaData o1, StoreFileMetaData o2) { + return Long.compare(o1.length(), o2.length()); // check small files first } - int toRead = readCount + BUFFER_SIZE > len ? (int) (len - readCount) : BUFFER_SIZE; - final long position = indexInput.getFilePointer(); - - // Pause using the rate limiter, if desired, to throttle the recovery - RateLimiter rl = recoverySettings.rateLimiter(); - long throttleTimeInNanos = 0; - if (rl != null) { - long bytes = bytesSinceLastPause.addAndGet(toRead); - if (bytes > rl.getMinPauseCheckBytes()) { - // Time to pause - bytesSinceLastPause.addAndGet(-bytes); - throttleTimeInNanos = rl.pause(bytes); - shard.recoveryStats().addThrottleTime(throttleTimeInNanos); - } - } - indexInput.readBytes(buf, 0, toRead, false); - final BytesArray content = new BytesArray(buf, 0, toRead); - readCount += toRead; - final boolean lastChunk = readCount == len; - final RecoveryFileChunkRequest fileChunkRequest = new RecoveryFileChunkRequest(request.recoveryId(), request.shardId(), md, position, - content, lastChunk, translogView.totalOperations(), throttleTimeInNanos); - cancellableThreads.execute(new Interruptable() { - @Override - public void run() throws InterruptedException { - // Actually send the file chunk to the target node, waiting for it to complete - transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILE_CHUNK, - fileChunkRequest, requestOptions, EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); - } - }); - - } - } catch (Throwable e) { - final Throwable corruptIndexException; - if ((corruptIndexException = ExceptionsHelper.unwrapCorruption(e)) != null) { + }); + for (StoreFileMetaData md : metadata) { + logger.debug("{} checking integrity for file {} after remove corruption exception", shard.shardId(), md); if (store.checkIntegrityNoException(md) == false) { // we are corrupted on the primary -- fail! + shard.engine().failEngine("recovery", corruptIndexException); logger.warn("{} Corrupted file detected {} checksum mismatch", shard.shardId(), md); - if (corruptedEngine.compareAndSet(null, corruptIndexException) == false) { - // if we are not the first exception, add ourselves as suppressed to the main one: - corruptedEngine.get().addSuppressed(e); - } - } else { // corruption has happened on the way to replica - RemoteTransportException exception = new RemoteTransportException("File corruption occurred on recovery but checksums are ok", null); - exception.addSuppressed(e); - exceptions.add(0, exception); // last exception first - logger.warn("{} Remote file corruption on node {}, recovering {}. local checksum OK", - corruptIndexException, shard.shardId(), request.targetNode(), md); - + throw corruptIndexException; } - } else { - exceptions.add(0, e); // last exceptions first } - } finally { - store.decRef(); - - } - } - }); - fileIndex++; - } - - cancellableThreads.execute(new Interruptable() { - @Override - public void run() throws InterruptedException { - // Wait for all files that need to be transferred to finish transferring - latch.await(); - } - }); - - if (corruptedEngine.get() != null) { - shard.engine().failEngine("recovery", corruptedEngine.get()); - throw corruptedEngine.get(); - } else { - ExceptionsHelper.rethrowAndSuppress(exceptions); - } - - cancellableThreads.execute(new Interruptable() { - @Override - public void run() throws InterruptedException { - // Send the CLEAN_FILES request, which takes all of the files that - // were transferred and renames them from their temporary file - // names to the actual file names. It also writes checksums for - // the files after they have been renamed. - // - // Once the files have been renamed, any other files that are not - // related to this recovery (out of date segments, for example) - // are deleted - try { - transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.CLEAN_FILES, - new RecoveryCleanFilesRequest(request.recoveryId(), shard.shardId(), recoverySourceMetadata, translogView.totalOperations()), - TransportRequestOptions.options().withTimeout(recoverySettings.internalActionTimeout()), - EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); - } catch (RemoteTransportException remoteException) { - final IOException corruptIndexException; - // we realized that after the index was copied and we wanted to finalize the recovery - // the index was corrupted: - // - maybe due to a broken segments file on an empty index (transferred with no checksum) - // - maybe due to old segments without checksums or length only checks - if ((corruptIndexException = ExceptionsHelper.unwrapCorruption(remoteException)) != null) { - try { - final Store.MetadataSnapshot recoverySourceMetadata = store.getMetadata(snapshot); - StoreFileMetaData[] metadata = - StreamSupport.stream(recoverySourceMetadata.spliterator(), false).toArray(size -> new StoreFileMetaData[size]); - ArrayUtil.timSort(metadata, new Comparator() { - @Override - public int compare(StoreFileMetaData o1, StoreFileMetaData o2) { - return Long.compare(o1.length(), o2.length()); // check small files first - } - }); - for (StoreFileMetaData md : metadata) { - logger.debug("{} checking integrity for file {} after remove corruption exception", shard.shardId(), md); - if (store.checkIntegrityNoException(md) == false) { // we are corrupted on the primary -- fail! - shard.engine().failEngine("recovery", corruptIndexException); - logger.warn("{} Corrupted file detected {} checksum mismatch", shard.shardId(), md); - throw corruptIndexException; - } - } - } catch (IOException ex) { - remoteException.addSuppressed(ex); - throw remoteException; - } - // corruption has happened on the way to replica - RemoteTransportException exception = new RemoteTransportException("File corruption occurred on recovery but checksums are ok", null); - exception.addSuppressed(remoteException); - logger.warn("{} Remote file corruption during finalization on node {}, recovering {}. local checksum OK", - corruptIndexException, shard.shardId(), request.targetNode()); - throw exception; - } else { + } catch (IOException ex) { + remoteException.addSuppressed(ex); throw remoteException; } + // corruption has happened on the way to replica + RemoteTransportException exception = new RemoteTransportException("File corruption occurred on recovery but checksums are ok", null); + exception.addSuppressed(remoteException); + logger.warn("{} Remote file corruption during finalization on node {}, recovering {}. local checksum OK", + corruptIndexException, shard.shardId(), request.targetNode()); + throw exception; + } else { + throw remoteException; } } }); @@ -460,6 +320,8 @@ public class RecoverySourceHandler { } } + + protected void prepareTargetForTranslog(final Translog.View translogView) { StopWatch stopWatch = new StopWatch().start(); logger.trace("{} recovery [phase1] to {}: prepare remote engine for translog", request.shardId(), request.targetNode()); @@ -603,14 +465,11 @@ public class RecoverySourceHandler { // recoverySettings.rateLimiter().pause(size); // } - cancellableThreads.execute(new Interruptable() { - @Override - public void run() throws InterruptedException { - final RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest( - request.recoveryId(), request.shardId(), operations, snapshot.estimatedTotalOperations()); - transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest, - recoveryOptions, EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); - } + cancellableThreads.execute(() -> { + final RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest( + request.recoveryId(), request.shardId(), operations, snapshot.estimatedTotalOperations()); + transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest, + recoveryOptions, EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); }); if (logger.isTraceEnabled()) { logger.trace("[{}][{}] sent batch of [{}][{}] (total: [{}]) translog operations to {}", @@ -631,14 +490,11 @@ public class RecoverySourceHandler { } // send the leftover if (!operations.isEmpty()) { - cancellableThreads.execute(new Interruptable() { - @Override - public void run() throws InterruptedException { - RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest( - request.recoveryId(), request.shardId(), operations, snapshot.estimatedTotalOperations()); - transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest, - recoveryOptions, EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); - } + cancellableThreads.execute(() -> { + RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest( + request.recoveryId(), request.shardId(), operations, snapshot.estimatedTotalOperations()); + transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest, + recoveryOptions, EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); }); } @@ -667,4 +523,165 @@ public class RecoverySourceHandler { '}'; } + + final class RecoveryOutputStream extends OutputStream { + private final StoreFileMetaData md; + private final AtomicLong bytesSinceLastPause; + private final Translog.View translogView; + private long position = 0; + + RecoveryOutputStream(StoreFileMetaData md, AtomicLong bytesSinceLastPause, Translog.View translogView) { + this.md = md; + this.bytesSinceLastPause = bytesSinceLastPause; + this.translogView = translogView; + } + + @Override + public final void write(int b) throws IOException { + write(new byte[]{(byte) b}, 0, 1); + } + + @Override + public final void write(byte[] b, int offset, int length) throws IOException { + sendNextChunk(position, new BytesArray(b, offset, length), md.length() == position + length); + position += length; + assert md.length() >= position : "length: " + md.length() + " but positions was: " + position; + } + + private void sendNextChunk(long position, BytesArray content, boolean lastChunk) throws IOException { + cancellableThreads.execute(() -> { + // Pause using the rate limiter, if desired, to throttle the recovery + final long throttleTimeInNanos; + final RateLimiter rl = recoverySettings.rateLimiter(); + if (rl != null) { + long bytes = bytesSinceLastPause.addAndGet(content.length()); + if (bytes > rl.getMinPauseCheckBytes()) { + // Time to pause + bytesSinceLastPause.addAndGet(-bytes); + try { + throttleTimeInNanos = rl.pause(bytes); + shard.recoveryStats().addThrottleTime(throttleTimeInNanos); + } catch (IOException e) { + throw new ElasticsearchException("failed to pause recovery", e); + } + } else { + throttleTimeInNanos = 0; + } + } else { + throttleTimeInNanos = 0; + } + // Actually send the file chunk to the target node, waiting for it to complete + transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILE_CHUNK, + new RecoveryFileChunkRequest(request.recoveryId(), request.shardId(), md, position, content, lastChunk, + translogView.totalOperations(), + /* we send totalOperations with every request since we collect stats on the target and that way we can + * see how many translog ops we accumulate while copying files across the network. A future optimization + * would be in to restart file copy again (new deltas) if we have too many translog ops are piling up. + */ + throttleTimeInNanos), requestOptions, EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); + }); + if (shard.state() == IndexShardState.CLOSED) { // check if the shard got closed on us + throw new IndexShardClosedException(request.shardId()); + } + } + } + + void sendFiles(Store store, StoreFileMetaData[] files, Function outputStreamFactory) throws Throwable { + store.incRef(); + try { + Future[] runners = asyncSendFiles(store, files, outputStreamFactory); + IOException corruptedEngine = null; + final List exceptions = new ArrayList<>(); + for (int i = 0; i < runners.length; i++) { + StoreFileMetaData md = files[i]; + try { + runners[i].get(); + } catch (ExecutionException t) { + corruptedEngine = handleExecutionException(store, corruptedEngine, exceptions, md, t.getCause()); + } catch (InterruptedException t) { + corruptedEngine = handleExecutionException(store, corruptedEngine, exceptions, md, t); + } + } + if (corruptedEngine != null) { + failEngine(corruptedEngine); + throw corruptedEngine; + } else { + ExceptionsHelper.rethrowAndSuppress(exceptions); + } + } finally { + store.decRef(); + } + } + + private IOException handleExecutionException(Store store, IOException corruptedEngine, List exceptions, StoreFileMetaData md, Throwable t) { + logger.debug("Failed to transfer file [" + md + "] on recovery"); + final IOException corruptIndexException; + final boolean checkIntegrity = corruptedEngine == null; + if ((corruptIndexException = ExceptionsHelper.unwrapCorruption(t)) != null) { + if (checkIntegrity && store.checkIntegrityNoException(md) == false) { // we are corrupted on the primary -- fail! + logger.warn("{} Corrupted file detected {} checksum mismatch", shardId, md); + corruptedEngine = corruptIndexException; + } else { // corruption has happened on the way to replica + RemoteTransportException exception = new RemoteTransportException("File corruption occurred on recovery but checksums are ok", null); + exception.addSuppressed(t); + if (checkIntegrity) { + logger.warn("{} Remote file corruption on node {}, recovering {}. local checksum OK", + corruptIndexException, shardId, request.targetNode(), md); + } else { + logger.warn("{} Remote file corruption on node {}, recovering {}. local checksum are skipped", + corruptIndexException, shardId, request.targetNode(), md); + } + exceptions.add(exception); + + } + } else { + exceptions.add(t); + } + return corruptedEngine; + } + + protected void failEngine(IOException cause) { + shard.engine().failEngine("recovery", cause); + } + + Future[] asyncSendFiles(Store store, StoreFileMetaData[] files, Function outputStreamFactory) { + store.incRef(); + try { + final Future[] futures = new Future[files.length]; + for (int i = 0; i < files.length; i++) { + final StoreFileMetaData md = files[i]; + long fileSize = md.length(); + + // Files are split into two categories, files that are "small" + // (under 5mb) and other files. Small files are transferred + // using a separate thread pool dedicated to small files. + // + // The idea behind this is that while we are transferring an + // older, large index, a user may create a new index, but that + // index will not be able to recover until the large index + // finishes, by using two different thread pools we can allow + // tiny files (like segments for a brand new index) to be + // recovered while ongoing large segment recoveries are + // happening. It also allows these pools to be configured + // separately. + ThreadPoolExecutor pool; + if (fileSize > RecoverySettings.SMALL_FILE_CUTOFF_BYTES) { + pool = recoverySettings.concurrentStreamPool(); + } else { + pool = recoverySettings.concurrentSmallFileStreamPool(); + } + Future future = pool.submit(() -> { + try (final OutputStream outputStream = outputStreamFactory.apply(md); + final IndexInput indexInput = store.directory().openInput(md.name(), IOContext.READONCE)) { + Streams.copy(new InputStreamIndexInput(indexInput, md.length()), outputStream); + } + return null; + }); + futures[i] = future; + } + return futures; + } finally { + store.decRef(); + } + } } diff --git a/core/src/test/java/org/elasticsearch/index/store/CorruptedFileIT.java b/core/src/test/java/org/elasticsearch/index/store/CorruptedFileIT.java index 77b7c37a9ca..f32b6b0b994 100644 --- a/core/src/test/java/org/elasticsearch/index/store/CorruptedFileIT.java +++ b/core/src/test/java/org/elasticsearch/index/store/CorruptedFileIT.java @@ -20,14 +20,9 @@ package org.elasticsearch.index.store; import com.carrotsearch.randomizedtesting.generators.RandomPicks; import java.nio.charset.StandardCharsets; -import org.apache.lucene.codecs.CodecUtil; + import org.apache.lucene.index.CheckIndex; import org.apache.lucene.index.IndexFileNames; -import org.apache.lucene.store.ChecksumIndexInput; -import org.apache.lucene.store.Directory; -import org.apache.lucene.store.FSDirectory; -import org.apache.lucene.store.IOContext; -import org.apache.lucene.store.IndexInput; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus; import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; @@ -71,6 +66,7 @@ import org.elasticsearch.monitor.fs.FsInfo; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.snapshots.SnapshotState; import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.CorruptionUtils; import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.test.store.MockFSDirectoryService; import org.elasticsearch.test.transport.MockTransportService; @@ -83,12 +79,9 @@ import org.junit.Test; import java.io.IOException; import java.io.OutputStream; import java.io.PrintStream; -import java.nio.ByteBuffer; -import java.nio.channels.FileChannel; import java.nio.file.DirectoryStream; import java.nio.file.Files; import java.nio.file.Path; -import java.nio.file.StandardOpenOption; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -649,54 +642,7 @@ public class CorruptedFileIT extends ESIntegTestCase { } } pruneOldDeleteGenerations(files); - Path fileToCorrupt = null; - if (!files.isEmpty()) { - fileToCorrupt = RandomPicks.randomFrom(getRandom(), files); - try (Directory dir = FSDirectory.open(fileToCorrupt.toAbsolutePath().getParent())) { - long checksumBeforeCorruption; - try (IndexInput input = dir.openInput(fileToCorrupt.getFileName().toString(), IOContext.DEFAULT)) { - checksumBeforeCorruption = CodecUtil.retrieveChecksum(input); - } - try (FileChannel raf = FileChannel.open(fileToCorrupt, StandardOpenOption.READ, StandardOpenOption.WRITE)) { - // read - raf.position(randomIntBetween(0, (int) Math.min(Integer.MAX_VALUE, raf.size() - 1))); - long filePointer = raf.position(); - ByteBuffer bb = ByteBuffer.wrap(new byte[1]); - raf.read(bb); - bb.flip(); - - // corrupt - byte oldValue = bb.get(0); - byte newValue = (byte) (oldValue + 1); - bb.put(0, newValue); - - // rewrite - raf.position(filePointer); - raf.write(bb); - logger.info("Corrupting file for shard {} -- flipping at position {} from {} to {} file: {}", shardRouting, filePointer, Integer.toHexString(oldValue), Integer.toHexString(newValue), fileToCorrupt.getFileName()); - } - long checksumAfterCorruption; - long actualChecksumAfterCorruption; - try (ChecksumIndexInput input = dir.openChecksumInput(fileToCorrupt.getFileName().toString(), IOContext.DEFAULT)) { - assertThat(input.getFilePointer(), is(0l)); - input.seek(input.length() - 8); // one long is the checksum... 8 bytes - checksumAfterCorruption = input.getChecksum(); - actualChecksumAfterCorruption = input.readLong(); - } - // we need to add assumptions here that the checksums actually really don't match there is a small chance to get collisions - // in the checksum which is ok though.... - StringBuilder msg = new StringBuilder(); - msg.append("Checksum before: [").append(checksumBeforeCorruption).append("]"); - msg.append(" after: [").append(checksumAfterCorruption).append("]"); - msg.append(" checksum value after corruption: ").append(actualChecksumAfterCorruption).append("]"); - msg.append(" file: ").append(fileToCorrupt.getFileName()).append(" length: ").append(dir.fileLength(fileToCorrupt.getFileName().toString())); - logger.info(msg.toString()); - assumeTrue("Checksum collision - " + msg.toString(), - checksumAfterCorruption != checksumBeforeCorruption // collision - || actualChecksumAfterCorruption != checksumBeforeCorruption); // checksum corrupted - } - } - assertThat("no file corrupted", fileToCorrupt, notNullValue()); + CorruptionUtils.corruptFile(getRandom(), files.toArray(new Path[0])); return shardRouting; } diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java new file mode 100644 index 00000000000..76867ec089a --- /dev/null +++ b/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -0,0 +1,227 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.indices.recovery; + +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field; +import org.apache.lucene.document.StringField; +import org.apache.lucene.document.TextField; +import org.apache.lucene.index.*; +import org.apache.lucene.store.*; +import org.apache.lucene.util.IOUtils; +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.io.FileSystemUtils; +import org.elasticsearch.common.lucene.store.IndexOutputOutputStream; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.DummyTransportAddress; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.store.DirectoryService; +import org.elasticsearch.index.store.Store; +import org.elasticsearch.index.store.StoreFileMetaData; +import org.elasticsearch.node.settings.NodeSettingsService; +import org.elasticsearch.test.DummyShardLock; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.CorruptionUtils; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.hamcrest.Matchers.is; + +public class RecoverySourceHandlerTests extends ESTestCase { + + private final ShardId shardId = new ShardId(new Index("index"), 1); + private final NodeSettingsService service = new NodeSettingsService(Settings.EMPTY); + + public void testSendFiles() throws Throwable { + final RecoverySettings recoverySettings = new RecoverySettings(Settings.EMPTY, service); + StartRecoveryRequest request = new StartRecoveryRequest(shardId, + new DiscoveryNode("b", DummyTransportAddress.INSTANCE, Version.CURRENT), + new DiscoveryNode("b", DummyTransportAddress.INSTANCE, Version.CURRENT), + randomBoolean(), null, RecoveryState.Type.STORE, randomLong()); + Store store = newStore(createTempDir()); + RecoverySourceHandler handler = new RecoverySourceHandler(null, request, recoverySettings, null, logger); + Directory dir = store.directory(); + RandomIndexWriter writer = new RandomIndexWriter(random(), dir, newIndexWriterConfig()); + int numDocs = randomIntBetween(10, 100); + for (int i = 0; i < numDocs; i++) { + Document document = new Document(); + document.add(new StringField("id", Integer.toString(i), Field.Store.YES)); + document.add(newField("field", randomUnicodeOfCodepointLengthBetween(1, 10), TextField.TYPE_STORED)); + writer.addDocument(document); + } + writer.commit(); + Store.MetadataSnapshot metadata = store.getMetadata(); + List metas = new ArrayList<>(); + for (StoreFileMetaData md : metadata) { + metas.add(md); + } + Store targetStore = newStore(createTempDir()); + handler.sendFiles(store, metas.toArray(new StoreFileMetaData[0]), (md) -> { + try { + return new IndexOutputOutputStream(targetStore.createVerifyingOutput(md.name(), md, IOContext.DEFAULT)); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + Store.MetadataSnapshot targetStoreMetadata = targetStore.getMetadata(); + Store.RecoveryDiff recoveryDiff = targetStoreMetadata.recoveryDiff(metadata); + assertEquals(metas.size(), recoveryDiff.identical.size()); + assertEquals(0, recoveryDiff.different.size()); + assertEquals(0, recoveryDiff.missing.size()); + IndexReader reader = DirectoryReader.open(targetStore.directory()); + assertEquals(numDocs, reader.maxDoc()); + IOUtils.close(reader, writer, store, targetStore, recoverySettings); + } + + public void testHandleCorruptedIndexOnSendSendFiles() throws Throwable { + final RecoverySettings recoverySettings = new RecoverySettings(Settings.EMPTY, service); + StartRecoveryRequest request = new StartRecoveryRequest(shardId, + new DiscoveryNode("b", DummyTransportAddress.INSTANCE, Version.CURRENT), + new DiscoveryNode("b", DummyTransportAddress.INSTANCE, Version.CURRENT), + randomBoolean(), null, RecoveryState.Type.STORE, randomLong()); + Path tempDir = createTempDir(); + Store store = newStore(tempDir); + AtomicBoolean failedEngine = new AtomicBoolean(false); + RecoverySourceHandler handler = new RecoverySourceHandler(null, request, recoverySettings, null, logger) { + @Override + protected void failEngine(IOException cause) { + assertFalse(failedEngine.get()); + failedEngine.set(true); + } + }; + Directory dir = store.directory(); + RandomIndexWriter writer = new RandomIndexWriter(random(), dir, newIndexWriterConfig()); + int numDocs = randomIntBetween(10, 100); + for (int i = 0; i < numDocs; i++) { + Document document = new Document(); + document.add(new StringField("id", Integer.toString(i), Field.Store.YES)); + document.add(newField("field", randomUnicodeOfCodepointLengthBetween(1, 10), TextField.TYPE_STORED)); + writer.addDocument(document); + } + writer.commit(); + writer.close(); + + Store.MetadataSnapshot metadata = store.getMetadata(); + List metas = new ArrayList<>(); + for (StoreFileMetaData md : metadata) { + metas.add(md); + } + CorruptionUtils.corruptFile(getRandom(), FileSystemUtils.files(tempDir, (p) -> + (p.getFileName().toString().equals("write.lock") || + Files.isDirectory(p)) == false)); + Store targetStore = newStore(createTempDir()); + try { + handler.sendFiles(store, metas.toArray(new StoreFileMetaData[0]), (md) -> { + try { + return new IndexOutputOutputStream(targetStore.createVerifyingOutput(md.name(), md, IOContext.DEFAULT)); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + fail("corrupted index"); + } catch (IOException ex) { + assertNotNull(ExceptionsHelper.unwrapCorruption(ex)); + } + assertTrue(failedEngine.get()); + IOUtils.close(store, targetStore, recoverySettings); + } + + public void testHandleExceptinoOnSendSendFiles() throws Throwable { + final RecoverySettings recoverySettings = new RecoverySettings(Settings.EMPTY, service); + StartRecoveryRequest request = new StartRecoveryRequest(shardId, + new DiscoveryNode("b", DummyTransportAddress.INSTANCE, Version.CURRENT), + new DiscoveryNode("b", DummyTransportAddress.INSTANCE, Version.CURRENT), + randomBoolean(), null, RecoveryState.Type.STORE, randomLong()); + Path tempDir = createTempDir(); + Store store = newStore(tempDir); + AtomicBoolean failedEngine = new AtomicBoolean(false); + RecoverySourceHandler handler = new RecoverySourceHandler(null, request, recoverySettings, null, logger) { + @Override + protected void failEngine(IOException cause) { + assertFalse(failedEngine.get()); + failedEngine.set(true); + } + }; + Directory dir = store.directory(); + RandomIndexWriter writer = new RandomIndexWriter(random(), dir, newIndexWriterConfig()); + int numDocs = randomIntBetween(10, 100); + for (int i = 0; i < numDocs; i++) { + Document document = new Document(); + document.add(new StringField("id", Integer.toString(i), Field.Store.YES)); + document.add(newField("field", randomUnicodeOfCodepointLengthBetween(1, 10), TextField.TYPE_STORED)); + writer.addDocument(document); + } + writer.commit(); + writer.close(); + + Store.MetadataSnapshot metadata = store.getMetadata(); + List metas = new ArrayList<>(); + for (StoreFileMetaData md : metadata) { + metas.add(md); + } + final boolean throwCorruptedIndexException = randomBoolean(); + Store targetStore = newStore(createTempDir()); + try { + handler.sendFiles(store, metas.toArray(new StoreFileMetaData[0]), (md) -> { + if (throwCorruptedIndexException) { + throw new RuntimeException(new CorruptIndexException("foo", "bar")); + } else { + throw new RuntimeException("boom"); + } + }); + fail("exception index"); + } catch (RuntimeException ex) { + assertNull(ExceptionsHelper.unwrapCorruption(ex)); + if (throwCorruptedIndexException) { + assertEquals(ex.getMessage(), "[File corruption occurred on recovery but checksums are ok]"); + } else { + assertEquals(ex.getMessage(), "boom"); + } + } catch (CorruptIndexException ex) { + fail("not expected here"); + } + assertFalse(failedEngine.get()); + IOUtils.close(store, targetStore, recoverySettings); + } + + private Store newStore(Path path) throws IOException { + DirectoryService directoryService = new DirectoryService(shardId, Settings.EMPTY) { + @Override + public long throttleTimeInNanos() { + return 0; + } + + @Override + public Directory newDirectory() throws IOException { + return RecoverySourceHandlerTests.newFSDirectory(path); + } + }; + return new Store(shardId, Settings.EMPTY, directoryService, new DummyShardLock(shardId)); + } + + +} diff --git a/core/src/test/java/org/elasticsearch/test/CorruptionUtils.java b/core/src/test/java/org/elasticsearch/test/CorruptionUtils.java new file mode 100644 index 00000000000..bf9ccc957bc --- /dev/null +++ b/core/src/test/java/org/elasticsearch/test/CorruptionUtils.java @@ -0,0 +1,100 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.test; + +import com.carrotsearch.randomizedtesting.generators.RandomPicks; +import org.apache.lucene.codecs.CodecUtil; +import org.apache.lucene.store.*; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.logging.ESLoggerFactory; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.Random; + +import static org.apache.lucene.util.LuceneTestCase.assumeTrue; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + + +public final class CorruptionUtils { + private static ESLogger logger = ESLoggerFactory.getLogger("test"); + private CorruptionUtils() {} + + /** + * Corrupts a random file at a random position + */ + public static void corruptFile(Random random, Path... files) throws IOException { + assertTrue("files must be non-empty", files.length > 0); + final Path fileToCorrupt = RandomPicks.randomFrom(random, files); + assertTrue(fileToCorrupt + " is not a file", Files.isRegularFile(fileToCorrupt)); + try (Directory dir = FSDirectory.open(fileToCorrupt.toAbsolutePath().getParent())) { + long checksumBeforeCorruption; + try (IndexInput input = dir.openInput(fileToCorrupt.getFileName().toString(), IOContext.DEFAULT)) { + checksumBeforeCorruption = CodecUtil.retrieveChecksum(input); + } + try (FileChannel raf = FileChannel.open(fileToCorrupt, StandardOpenOption.READ, StandardOpenOption.WRITE)) { + // read + raf.position(random.nextInt((int) Math.min(Integer.MAX_VALUE, raf.size()))); + long filePointer = raf.position(); + ByteBuffer bb = ByteBuffer.wrap(new byte[1]); + raf.read(bb); + bb.flip(); + + // corrupt + byte oldValue = bb.get(0); + byte newValue = (byte) (oldValue + 1); + bb.put(0, newValue); + + // rewrite + raf.position(filePointer); + raf.write(bb); + logger.info("Corrupting file -- flipping at position {} from {} to {} file: {}", filePointer, Integer.toHexString(oldValue), Integer.toHexString(newValue), fileToCorrupt.getFileName()); + } + long checksumAfterCorruption; + long actualChecksumAfterCorruption; + try (ChecksumIndexInput input = dir.openChecksumInput(fileToCorrupt.getFileName().toString(), IOContext.DEFAULT)) { + assertThat(input.getFilePointer(), is(0l)); + input.seek(input.length() - 8); // one long is the checksum... 8 bytes + checksumAfterCorruption = input.getChecksum(); + actualChecksumAfterCorruption = input.readLong(); + } + // we need to add assumptions here that the checksums actually really don't match there is a small chance to get collisions + // in the checksum which is ok though.... + StringBuilder msg = new StringBuilder(); + msg.append("Checksum before: [").append(checksumBeforeCorruption).append("]"); + msg.append(" after: [").append(checksumAfterCorruption).append("]"); + msg.append(" checksum value after corruption: ").append(actualChecksumAfterCorruption).append("]"); + msg.append(" file: ").append(fileToCorrupt.getFileName()).append(" length: ").append(dir.fileLength(fileToCorrupt.getFileName().toString())); + logger.info(msg.toString()); + assumeTrue("Checksum collision - " + msg.toString(), + checksumAfterCorruption != checksumBeforeCorruption // collision + || actualChecksumAfterCorruption != checksumBeforeCorruption); // checksum corrupted + assertThat("no file corrupted", fileToCorrupt, notNullValue()); + } + } + + +}