Merge pull request #13828 from ywelsch/fix-snapshot-restore-throttling

Snapshot restore operations throttle more than specified
This commit is contained in:
Yannick Welsch 2015-10-03 16:40:27 +02:00
commit cdb00371da
1 changed files with 12 additions and 4 deletions

View File

@ -26,6 +26,7 @@ import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.RateLimiter;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefBuilder;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterService;
@ -93,6 +94,8 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
private RateLimitingInputStream.Listener snapshotThrottleListener;
private RateLimitingInputStream.Listener restoreThrottleListener;
private boolean compress;
private final ParseFieldMatcher parseFieldMatcher;
@ -147,6 +150,7 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
this.restoreRateLimiter = restoreRateLimiter;
this.rateLimiterListener = rateLimiterListener;
this.snapshotThrottleListener = nanos -> rateLimiterListener.onSnapshotPause(nanos);
this.restoreThrottleListener = nanos -> rateLimiterListener.onRestorePause(nanos);
this.compress = compress;
indexShardSnapshotFormat = new ChecksumBlobStoreFormat<>(SNAPSHOT_CODEC, SNAPSHOT_NAME_FORMAT, BlobStoreIndexShardSnapshot.PROTO, parseFieldMatcher, isCompress());
indexShardSnapshotLegacyFormat = new LegacyBlobStoreFormat<>(LEGACY_SNAPSHOT_NAME_FORMAT, BlobStoreIndexShardSnapshot.PROTO, parseFieldMatcher);
@ -891,16 +895,20 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
*/
private void restoreFile(final FileInfo fileInfo) throws IOException {
boolean success = false;
try (InputStream stream = new PartSliceStream(blobContainer, fileInfo)) {
try (InputStream partSliceStream = new PartSliceStream(blobContainer, fileInfo)) {
final InputStream stream;
if (restoreRateLimiter == null) {
stream = partSliceStream;
} else {
stream = new RateLimitingInputStream(partSliceStream, restoreRateLimiter, restoreThrottleListener);
}
try (final IndexOutput indexOutput = store.createVerifyingOutput(fileInfo.physicalName(), fileInfo.metadata(), IOContext.DEFAULT)) {
final byte[] buffer = new byte[BUFFER_SIZE];
int length;
while ((length = stream.read(buffer)) > 0) {
indexOutput.writeBytes(buffer, 0, length);
recoveryState.getIndex().addRecoveredBytesToFile(fileInfo.name(), length);
if (restoreRateLimiter != null) {
rateLimiterListener.onRestorePause(restoreRateLimiter.pause(length));
}
}
Store.verify(indexOutput);
indexOutput.close();