diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 563d3c91fd5..ad8dcf7085b 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -22,9 +22,13 @@ package org.elasticsearch.repositories.blobstore; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.IndexCommit; +import org.apache.lucene.index.IndexFormatTooNewException; +import org.apache.lucene.index.IndexFormatTooOldException; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.IndexOutput; import org.apache.lucene.store.RateLimiter; import org.apache.lucene.util.SetOnce; import org.elasticsearch.ExceptionsHelper; @@ -36,6 +40,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.RepositoryMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Numbers; import org.elasticsearch.common.Strings; import org.elasticsearch.common.UUIDs; @@ -1192,17 +1197,51 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp final BlobContainer container = shardContainer(indexId, snapshotShardId); BlobStoreIndexShardSnapshot snapshot = loadShardSnapshot(container, snapshotId); SnapshotFiles snapshotFiles = new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles()); - new FileRestoreContext(metadata.name(), shardId, snapshotId, recoveryState, BUFFER_SIZE) { + new FileRestoreContext(metadata.name(), shardId, snapshotId, recoveryState) { @Override - protected InputStream fileInputStream(BlobStoreIndexShardSnapshot.FileInfo fileInfo) { - final InputStream dataBlobCompositeStream = new SlicedInputStream(fileInfo.numberOfParts()) { - @Override - protected InputStream openSlice(long slice) throws IOException { - return container.readBlob(fileInfo.partName(slice)); + protected void restoreFiles(List filesToRecover, Store store) throws IOException { + // restore the files from the snapshot to the Lucene store + for (final BlobStoreIndexShardSnapshot.FileInfo fileToRecover : filesToRecover) { + logger.trace("[{}] [{}] restoring file [{}]", shardId, snapshotId, fileToRecover.name()); + restoreFile(fileToRecover, store); + } + } + + private void restoreFile(BlobStoreIndexShardSnapshot.FileInfo fileInfo, Store store) throws IOException { + boolean success = false; + + try (InputStream stream = maybeRateLimit(new SlicedInputStream(fileInfo.numberOfParts()) { + @Override + protected InputStream openSlice(long slice) throws IOException { + return container.readBlob(fileInfo.partName(slice)); + } + }, + restoreRateLimiter, restoreRateLimitingTimeInNanos)) { + try (IndexOutput indexOutput = + store.createVerifyingOutput(fileInfo.physicalName(), fileInfo.metadata(), IOContext.DEFAULT)) { + final byte[] buffer = new byte[BUFFER_SIZE]; + int length; + while ((length = stream.read(buffer)) > 0) { + indexOutput.writeBytes(buffer, 0, length); + recoveryState.getIndex().addRecoveredBytesToFile(fileInfo.physicalName(), length); + } + Store.verify(indexOutput); + indexOutput.close(); + store.directory().sync(Collections.singleton(fileInfo.physicalName())); + success = true; + } catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) { + try { + store.markStoreCorrupted(ex); + } catch (IOException e) { + logger.warn("store cannot be marked as corrupted", e); + } + throw ex; + } finally { + if (success == false) { + store.deleteQuiet(fileInfo.physicalName()); + } } - }; - return restoreRateLimiter == null ? dataBlobCompositeStream - : new RateLimitingInputStream(dataBlobCompositeStream, restoreRateLimiter, restoreRateLimitingTimeInNanos::inc); + } } }.restore(snapshotFiles, store); } catch (Exception e) { @@ -1210,6 +1249,10 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp } } + private static InputStream maybeRateLimit(InputStream stream, @Nullable RateLimiter rateLimiter, CounterMetric metric) { + return rateLimiter == null ? stream : new RateLimitingInputStream(stream, rateLimiter, metric::inc); + } + @Override public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, IndexId indexId, ShardId shardId) { BlobStoreIndexShardSnapshot snapshot = loadShardSnapshot(shardContainer(indexId, shardId), snapshotId); @@ -1371,13 +1414,9 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp for (int i = 0; i < fileInfo.numberOfParts(); i++) { final long partBytes = fileInfo.partBytes(i); - InputStream inputStream = new InputStreamIndexInput(indexInput, partBytes); - if (snapshotRateLimiter != null) { - inputStream = new RateLimitingInputStream(inputStream, snapshotRateLimiter, - snapshotRateLimitingTimeInNanos::inc); - } // Make reads abortable by mutating the snapshotStatus object - inputStream = new FilterInputStream(inputStream) { + final InputStream inputStream = new FilterInputStream(maybeRateLimit( + new InputStreamIndexInput(indexInput, partBytes), snapshotRateLimiter, snapshotRateLimitingTimeInNanos)) { @Override public int read() throws IOException { checkAborted(); diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/FileRestoreContext.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/FileRestoreContext.java index 7a848bf0f3b..914d87202c6 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/FileRestoreContext.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/FileRestoreContext.java @@ -21,11 +21,6 @@ package org.elasticsearch.repositories.blobstore; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; -import org.apache.lucene.index.CorruptIndexException; -import org.apache.lucene.index.IndexFormatTooNewException; -import org.apache.lucene.index.IndexFormatTooOldException; -import org.apache.lucene.store.IOContext; -import org.apache.lucene.store.IndexOutput; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.util.iterable.Iterables; import org.elasticsearch.index.shard.ShardId; @@ -38,10 +33,8 @@ import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.snapshots.SnapshotId; import java.io.IOException; -import java.io.InputStream; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -63,7 +56,6 @@ public abstract class FileRestoreContext { protected final RecoveryState recoveryState; protected final SnapshotId snapshotId; protected final ShardId shardId; - protected final int bufferSize; /** * Constructs new restore context @@ -71,15 +63,12 @@ public abstract class FileRestoreContext { * @param shardId shard id to restore into * @param snapshotId snapshot id * @param recoveryState recovery state to report progress - * @param bufferSize buffer size for restore */ - protected FileRestoreContext(String repositoryName, ShardId shardId, SnapshotId snapshotId, RecoveryState recoveryState, - int bufferSize) { + protected FileRestoreContext(String repositoryName, ShardId shardId, SnapshotId snapshotId, RecoveryState recoveryState) { this.repositoryName = repositoryName; this.recoveryState = recoveryState; this.snapshotId = snapshotId; this.shardId = shardId; - this.bufferSize = bufferSize; } /** @@ -194,54 +183,16 @@ public abstract class FileRestoreContext { } } - protected void restoreFiles(List filesToRecover, Store store) throws IOException { - // restore the files from the snapshot to the Lucene store - for (final BlobStoreIndexShardSnapshot.FileInfo fileToRecover : filesToRecover) { - logger.trace("[{}] [{}] restoring file [{}]", shardId, snapshotId, fileToRecover.name()); - restoreFile(fileToRecover, store); - } - } - - protected abstract InputStream fileInputStream(BlobStoreIndexShardSnapshot.FileInfo fileInfo); + /** + * Restores given list of {@link BlobStoreIndexShardSnapshot.FileInfo} to the given {@link Store}. + * + * @param filesToRecover List of files to restore + * @param store Store to restore into + */ + protected abstract void restoreFiles(List filesToRecover, Store store) throws IOException; @SuppressWarnings("unchecked") private static Iterable concat(Store.RecoveryDiff diff) { return Iterables.concat(diff.different, diff.missing); } - - /** - * Restores a file - * - * @param fileInfo file to be restored - */ - private void restoreFile(final BlobStoreIndexShardSnapshot.FileInfo fileInfo, final Store store) throws IOException { - boolean success = false; - - try (InputStream stream = fileInputStream(fileInfo)) { - try (IndexOutput indexOutput = store.createVerifyingOutput(fileInfo.physicalName(), fileInfo.metadata(), IOContext.DEFAULT)) { - final byte[] buffer = new byte[bufferSize]; - int length; - while ((length = stream.read(buffer)) > 0) { - indexOutput.writeBytes(buffer, 0, length); - recoveryState.getIndex().addRecoveredBytesToFile(fileInfo.physicalName(), length); - } - Store.verify(indexOutput); - indexOutput.close(); - store.directory().sync(Collections.singleton(fileInfo.physicalName())); - success = true; - } catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) { - try { - store.markStoreCorrupted(ex); - } catch (IOException e) { - logger.warn("store cannot be marked as corrupted", e); - } - throw ex; - } finally { - if (success == false) { - store.deleteQuiet(fileInfo.physicalName()); - } - } - } - } - } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index a162e331177..1c474f25eca 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -80,7 +80,6 @@ import org.elasticsearch.xpack.ccr.action.repositories.PutCcrRestoreSessionReque import java.io.Closeable; import java.io.IOException; -import java.io.InputStream; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -456,7 +455,7 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit RestoreSession(String repositoryName, Client remoteClient, String sessionUUID, DiscoveryNode node, ShardId shardId, RecoveryState recoveryState, Store.MetadataSnapshot sourceMetaData, long mappingVersion, ThreadPool threadPool, CcrSettings ccrSettings, LongConsumer throttleListener) { - super(repositoryName, shardId, SNAPSHOT_ID, recoveryState, Math.toIntExact(ccrSettings.getChunkSize().getBytes())); + super(repositoryName, shardId, SNAPSHOT_ID, recoveryState); this.remoteClient = remoteClient; this.sessionUUID = sessionUUID; this.node = node; @@ -571,11 +570,6 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit } } - @Override - protected InputStream fileInputStream(FileInfo fileInfo) { - throw new UnsupportedOperationException(); - } - @Override public void close() { ClearCcrRestoreSessionRequest clearRequest = new ClearCcrRestoreSessionRequest(sessionUUID, node);