Snapshot restore operations throttle more than specified
Lucene's RateLimiter can do too much sleeping on small values (see also #6018). The issue here is that calls to "pause" are not properly guarded in "restoreFile". Instead of simply adding the guard, this commit uses the RateLimitingInputStream similar as for "snapshotFile". Closes #13828
This commit is contained in:
parent
148265bd16
commit
03a4e226f1
|
@ -26,6 +26,7 @@ import org.apache.lucene.store.IndexOutput;
|
|||
import org.apache.lucene.store.RateLimiter;
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.apache.lucene.util.BytesRefBuilder;
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
|
@ -93,6 +94,8 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
|
|||
|
||||
private RateLimitingInputStream.Listener snapshotThrottleListener;
|
||||
|
||||
private RateLimitingInputStream.Listener restoreThrottleListener;
|
||||
|
||||
private boolean compress;
|
||||
|
||||
private final ParseFieldMatcher parseFieldMatcher;
|
||||
|
@ -147,6 +150,7 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
|
|||
this.restoreRateLimiter = restoreRateLimiter;
|
||||
this.rateLimiterListener = rateLimiterListener;
|
||||
this.snapshotThrottleListener = nanos -> rateLimiterListener.onSnapshotPause(nanos);
|
||||
this.restoreThrottleListener = nanos -> rateLimiterListener.onRestorePause(nanos);
|
||||
this.compress = compress;
|
||||
indexShardSnapshotFormat = new ChecksumBlobStoreFormat<>(SNAPSHOT_CODEC, SNAPSHOT_NAME_FORMAT, BlobStoreIndexShardSnapshot.PROTO, parseFieldMatcher, isCompress());
|
||||
indexShardSnapshotLegacyFormat = new LegacyBlobStoreFormat<>(LEGACY_SNAPSHOT_NAME_FORMAT, BlobStoreIndexShardSnapshot.PROTO, parseFieldMatcher);
|
||||
|
@ -890,16 +894,20 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
|
|||
*/
|
||||
private void restoreFile(final FileInfo fileInfo) throws IOException {
|
||||
boolean success = false;
|
||||
try (InputStream stream = new PartSliceStream(blobContainer, fileInfo)) {
|
||||
|
||||
try (InputStream partSliceStream = new PartSliceStream(blobContainer, fileInfo)) {
|
||||
final InputStream stream;
|
||||
if (restoreRateLimiter == null) {
|
||||
stream = partSliceStream;
|
||||
} else {
|
||||
stream = new RateLimitingInputStream(partSliceStream, restoreRateLimiter, restoreThrottleListener);
|
||||
}
|
||||
try (final IndexOutput indexOutput = store.createVerifyingOutput(fileInfo.physicalName(), fileInfo.metadata(), IOContext.DEFAULT)) {
|
||||
final byte[] buffer = new byte[BUFFER_SIZE];
|
||||
int length;
|
||||
while ((length = stream.read(buffer)) > 0) {
|
||||
indexOutput.writeBytes(buffer, 0, length);
|
||||
recoveryState.getIndex().addRecoveredBytesToFile(fileInfo.name(), length);
|
||||
if (restoreRateLimiter != null) {
|
||||
rateLimiterListener.onRestorePause(restoreRateLimiter.pause(length));
|
||||
}
|
||||
}
|
||||
Store.verify(indexOutput);
|
||||
indexOutput.close();
|
||||
|
|
Loading…
Reference in New Issue