Reduce locking in prewarming (#61837) (#61967)

During prewarming of a Lucene file a CacheFile is acquired and 
then locked for the duration of the prewarming, ie locked until all 
the part of the file has been downloaded and written to cache on 
disk. The locking (executed with CacheFile#fileLock()) is here to 
prevent the cache file to be evicted while it is prewarming.

But holding the lock may take a while for large files, specially since
 restoring snapshot files now respects the 
indices.recovery.max_bytes_per_sec setting of 40mb (#58658), 
and this can have bad consequences like preventing the CacheFile 
to be evicted, opened or closed. In manual tests this bug slow 
downs various requests like mounting a new searchable snapshot
 index or deleting an existing one that is still prewarming.

This commit reduces the time the lock is held during prewarming so
 that the read lock is only required when actively writing to the CacheFile.
This commit is contained in:
Tanguy Leroux 2020-09-04 15:06:50 +02:00 committed by GitHub
parent 3e6e81c993
commit 289b1f4ae7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 42 additions and 43 deletions

View File

@ -453,54 +453,54 @@ public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn
try { try {
final CacheFile cacheFile = getCacheFileSafe(); final CacheFile cacheFile = getCacheFileSafe();
try (Releasable ignored = cacheFile.fileLock()) {
final Tuple<Long, Long> range = cacheFile.getAbsentRangeWithin(partRange.v1(), partRange.v2());
if (range == null) {
logger.trace(
"prefetchPart: part [{}] bytes [{}-{}] is already fully available for cache file [{}]",
part,
partRange.v1(),
partRange.v2(),
cacheFileReference
);
return;
}
final long rangeStart = range.v1();
final long rangeEnd = range.v2();
final long rangeLength = rangeEnd - rangeStart;
final Tuple<Long, Long> range = cacheFile.getAbsentRangeWithin(partRange.v1(), partRange.v2());
if (range == null) {
logger.trace( logger.trace(
"prefetchPart: prewarming part [{}] bytes [{}-{}] by fetching bytes [{}-{}] for cache file [{}]", "prefetchPart: part [{}] bytes [{}-{}] is already fully available for cache file [{}]",
part, part,
partRange.v1(), partRange.v1(),
partRange.v2(), partRange.v2(),
rangeStart,
rangeEnd,
cacheFileReference cacheFileReference
); );
return;
}
final FileChannel fc = cacheFile.getChannel(); final long rangeStart = range.v1();
assert assertFileChannelOpen(fc); final long rangeEnd = range.v2();
final byte[] copyBuffer = new byte[toIntBytes(Math.min(COPY_BUFFER_SIZE, rangeLength))]; final long rangeLength = rangeEnd - rangeStart;
long totalBytesRead = 0L; logger.trace(
final AtomicLong totalBytesWritten = new AtomicLong(); "prefetchPart: prewarming part [{}] bytes [{}-{}] by fetching bytes [{}-{}] for cache file [{}]",
long remainingBytes = rangeEnd - rangeStart; part,
final long startTimeNanos = stats.currentTimeNanos(); partRange.v1(),
try (InputStream input = openInputStreamFromBlobStore(rangeStart, rangeLength)) { partRange.v2(),
while (remainingBytes > 0L) { rangeStart,
assert totalBytesRead + remainingBytes == rangeLength; rangeEnd,
final int bytesRead = readSafe(input, copyBuffer, rangeStart, rangeEnd, remainingBytes, cacheFileReference); cacheFileReference
);
// The range to prewarm in cache final byte[] copyBuffer = new byte[toIntBytes(Math.min(COPY_BUFFER_SIZE, rangeLength))];
final long readStart = rangeStart + totalBytesRead;
final Tuple<Long, Long> rangeToWrite = Tuple.tuple(readStart, readStart + bytesRead);
// We do not actually read anything, but we want to wait for the write to complete before proceeding. long totalBytesRead = 0L;
// noinspection UnnecessaryLocalVariable final AtomicLong totalBytesWritten = new AtomicLong();
final Tuple<Long, Long> rangeToRead = rangeToWrite; long remainingBytes = rangeEnd - rangeStart;
final long startTimeNanos = stats.currentTimeNanos();
try (InputStream input = openInputStreamFromBlobStore(rangeStart, rangeLength)) {
while (remainingBytes > 0L) {
assert totalBytesRead + remainingBytes == rangeLength;
final int bytesRead = readSafe(input, copyBuffer, rangeStart, rangeEnd, remainingBytes, cacheFileReference);
// The range to prewarm in cache
final long readStart = rangeStart + totalBytesRead;
final Tuple<Long, Long> rangeToWrite = Tuple.tuple(readStart, readStart + bytesRead);
// We do not actually read anything, but we want to wait for the write to complete before proceeding.
// noinspection UnnecessaryLocalVariable
final Tuple<Long, Long> rangeToRead = rangeToWrite;
try (Releasable ignored = cacheFile.fileLock()) {
assert assertFileChannelOpen(cacheFile.getChannel());
cacheFile.populateAndRead( cacheFile.populateAndRead(
rangeToWrite, rangeToWrite,
@ -525,15 +525,14 @@ public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn
}, },
directory.cacheFetchAsyncExecutor() directory.cacheFetchAsyncExecutor()
).get(); ).get();
totalBytesRead += bytesRead;
remainingBytes -= bytesRead;
} }
final long endTimeNanos = stats.currentTimeNanos(); totalBytesRead += bytesRead;
stats.addCachedBytesWritten(totalBytesWritten.get(), endTimeNanos - startTimeNanos); remainingBytes -= bytesRead;
} }
final long endTimeNanos = stats.currentTimeNanos();
assert totalBytesRead == rangeLength; stats.addCachedBytesWritten(totalBytesWritten.get(), endTimeNanos - startTimeNanos);
} }
assert totalBytesRead == rangeLength;
} catch (final Exception e) { } catch (final Exception e) {
throw new IOException("Failed to prefetch file part in cache", e); throw new IOException("Failed to prefetch file part in cache", e);
} }