diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInput.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInput.java index e53a28ccd90..7984f930b03 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInput.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInput.java @@ -453,54 +453,54 @@ public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn try { final CacheFile cacheFile = getCacheFileSafe(); - try (Releasable ignored = cacheFile.fileLock()) { - - final Tuple range = cacheFile.getAbsentRangeWithin(partRange.v1(), partRange.v2()); - if (range == null) { - logger.trace( - "prefetchPart: part [{}] bytes [{}-{}] is already fully available for cache file [{}]", - part, - partRange.v1(), - partRange.v2(), - cacheFileReference - ); - return; - } - - final long rangeStart = range.v1(); - final long rangeEnd = range.v2(); - final long rangeLength = rangeEnd - rangeStart; + final Tuple range = cacheFile.getAbsentRangeWithin(partRange.v1(), partRange.v2()); + if (range == null) { logger.trace( - "prefetchPart: prewarming part [{}] bytes [{}-{}] by fetching bytes [{}-{}] for cache file [{}]", + "prefetchPart: part [{}] bytes [{}-{}] is already fully available for cache file [{}]", part, partRange.v1(), partRange.v2(), - rangeStart, - rangeEnd, cacheFileReference ); + return; + } - final FileChannel fc = cacheFile.getChannel(); - assert assertFileChannelOpen(fc); - final byte[] copyBuffer = new byte[toIntBytes(Math.min(COPY_BUFFER_SIZE, rangeLength))]; + final long rangeStart = range.v1(); + final long rangeEnd = range.v2(); + final long rangeLength = rangeEnd - rangeStart; - long totalBytesRead = 0L; - final AtomicLong totalBytesWritten = new AtomicLong(); - long remainingBytes = rangeEnd - rangeStart; - final long startTimeNanos = stats.currentTimeNanos(); - try (InputStream input = openInputStreamFromBlobStore(rangeStart, rangeLength)) { - while (remainingBytes > 0L) { - assert totalBytesRead + remainingBytes == rangeLength; - final int bytesRead = readSafe(input, copyBuffer, rangeStart, rangeEnd, remainingBytes, cacheFileReference); + logger.trace( + "prefetchPart: prewarming part [{}] bytes [{}-{}] by fetching bytes [{}-{}] for cache file [{}]", + part, + partRange.v1(), + partRange.v2(), + rangeStart, + rangeEnd, + cacheFileReference + ); - // The range to prewarm in cache - final long readStart = rangeStart + totalBytesRead; - final Tuple rangeToWrite = Tuple.tuple(readStart, readStart + bytesRead); + final byte[] copyBuffer = new byte[toIntBytes(Math.min(COPY_BUFFER_SIZE, rangeLength))]; - // We do not actually read anything, but we want to wait for the write to complete before proceeding. - // noinspection UnnecessaryLocalVariable - final Tuple rangeToRead = rangeToWrite; + long totalBytesRead = 0L; + final AtomicLong totalBytesWritten = new AtomicLong(); + long remainingBytes = rangeEnd - rangeStart; + final long startTimeNanos = stats.currentTimeNanos(); + try (InputStream input = openInputStreamFromBlobStore(rangeStart, rangeLength)) { + while (remainingBytes > 0L) { + assert totalBytesRead + remainingBytes == rangeLength; + final int bytesRead = readSafe(input, copyBuffer, rangeStart, rangeEnd, remainingBytes, cacheFileReference); + + // The range to prewarm in cache + final long readStart = rangeStart + totalBytesRead; + final Tuple rangeToWrite = Tuple.tuple(readStart, readStart + bytesRead); + + // We do not actually read anything, but we want to wait for the write to complete before proceeding. + // noinspection UnnecessaryLocalVariable + final Tuple rangeToRead = rangeToWrite; + + try (Releasable ignored = cacheFile.fileLock()) { + assert assertFileChannelOpen(cacheFile.getChannel()); cacheFile.populateAndRead( rangeToWrite, @@ -525,15 +525,14 @@ public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn }, directory.cacheFetchAsyncExecutor() ).get(); - totalBytesRead += bytesRead; - remainingBytes -= bytesRead; } - final long endTimeNanos = stats.currentTimeNanos(); - stats.addCachedBytesWritten(totalBytesWritten.get(), endTimeNanos - startTimeNanos); + totalBytesRead += bytesRead; + remainingBytes -= bytesRead; } - - assert totalBytesRead == rangeLength; + final long endTimeNanos = stats.currentTimeNanos(); + stats.addCachedBytesWritten(totalBytesWritten.get(), endTimeNanos - startTimeNanos); } + assert totalBytesRead == rangeLength; } catch (final Exception e) { throw new IOException("Failed to prefetch file part in cache", e); }