diff --git a/core/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardRepository.java b/core/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardRepository.java index 991af6d8f2c..0dbbd12834e 100644 --- a/core/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardRepository.java +++ b/core/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardRepository.java @@ -26,6 +26,7 @@ import org.apache.lucene.store.IndexOutput; import org.apache.lucene.store.RateLimiter; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRefBuilder; +import org.apache.lucene.util.IOUtils; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterService; @@ -93,6 +94,8 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements private RateLimitingInputStream.Listener snapshotThrottleListener; + private RateLimitingInputStream.Listener restoreThrottleListener; + private boolean compress; private final ParseFieldMatcher parseFieldMatcher; @@ -147,6 +150,7 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements this.restoreRateLimiter = restoreRateLimiter; this.rateLimiterListener = rateLimiterListener; this.snapshotThrottleListener = nanos -> rateLimiterListener.onSnapshotPause(nanos); + this.restoreThrottleListener = nanos -> rateLimiterListener.onRestorePause(nanos); this.compress = compress; indexShardSnapshotFormat = new ChecksumBlobStoreFormat<>(SNAPSHOT_CODEC, SNAPSHOT_NAME_FORMAT, BlobStoreIndexShardSnapshot.PROTO, parseFieldMatcher, isCompress()); indexShardSnapshotLegacyFormat = new LegacyBlobStoreFormat<>(LEGACY_SNAPSHOT_NAME_FORMAT, BlobStoreIndexShardSnapshot.PROTO, parseFieldMatcher); @@ -891,16 +895,20 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements */ private void restoreFile(final FileInfo fileInfo) throws IOException { boolean success = false; - try (InputStream stream = new PartSliceStream(blobContainer, fileInfo)) { + + try (InputStream partSliceStream = new PartSliceStream(blobContainer, fileInfo)) { + final InputStream stream; + if (restoreRateLimiter == null) { + stream = partSliceStream; + } else { + stream = new RateLimitingInputStream(partSliceStream, restoreRateLimiter, restoreThrottleListener); + } try (final IndexOutput indexOutput = store.createVerifyingOutput(fileInfo.physicalName(), fileInfo.metadata(), IOContext.DEFAULT)) { final byte[] buffer = new byte[BUFFER_SIZE]; int length; while ((length = stream.read(buffer)) > 0) { indexOutput.writeBytes(buffer, 0, length); recoveryState.getIndex().addRecoveredBytesToFile(fileInfo.name(), length); - if (restoreRateLimiter != null) { - rateLimiterListener.onRestorePause(restoreRateLimiter.pause(length)); - } } Store.verify(indexOutput); indexOutput.close();