Allow read operations to be executed without waiting for full range to be written in cache (#58728) (#58829)
This commit changes CacheFile and CachedBlobContainerIndexInput so that the read operations made by these classes are now progressively executed and do not wait for full range to be written in cache. It relies on the change introduced in #58477 and it is the last change extracted from #58164. Relates #58164
This commit is contained in:
parent
2c275913b9
commit
d35e8f45da
|
@ -46,6 +46,7 @@ import org.elasticsearch.repositories.Repository;
|
|||
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
|
||||
import org.elasticsearch.snapshots.SnapshotId;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants;
|
||||
import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
|
@ -317,6 +318,14 @@ public class SearchableSnapshotDirectory extends BaseDirectory {
|
|||
return cacheService.get(cacheKey, fileLength, cacheDir);
|
||||
}
|
||||
|
||||
public Executor cacheFetchAsyncExecutor() {
|
||||
return threadPool.executor(SearchableSnapshotsConstants.SEARCHABLE_SNAPSHOTS_THREAD_POOL_NAME);
|
||||
}
|
||||
|
||||
public Executor prewarmExecutor() {
|
||||
return threadPool.executor(ThreadPool.Names.SAME);
|
||||
}
|
||||
|
||||
@Override
|
||||
public IndexInput openInput(final String name, final IOContext context) throws IOException {
|
||||
ensureOpen();
|
||||
|
|
|
@ -7,12 +7,11 @@ package org.elasticsearch.index.store.cache;
|
|||
|
||||
import org.apache.lucene.store.AlreadyClosedException;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.common.CheckedBiConsumer;
|
||||
import org.elasticsearch.common.CheckedBiFunction;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.lease.Releasable;
|
||||
import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
|
||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.UncheckedIOException;
|
||||
|
@ -26,7 +25,9 @@ import java.util.List;
|
|||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
public class CacheFile {
|
||||
|
||||
|
@ -259,38 +260,70 @@ public class CacheFile {
|
|||
}
|
||||
}
|
||||
|
||||
CompletableFuture<Integer> fetchRange(
|
||||
long start,
|
||||
long end,
|
||||
CheckedBiFunction<Long, Long, Integer, IOException> onRangeAvailable,
|
||||
CheckedBiConsumer<Long, Long, IOException> onRangeMissing
|
||||
@FunctionalInterface
|
||||
interface RangeAvailableHandler {
|
||||
int onRangeAvailable(FileChannel channel) throws IOException;
|
||||
}
|
||||
|
||||
@FunctionalInterface
|
||||
interface RangeMissingHandler {
|
||||
void fillCacheRange(FileChannel channel, long from, long to, Consumer<Long> progressUpdater) throws IOException;
|
||||
}
|
||||
|
||||
CompletableFuture<Integer> fetchAsync(
|
||||
final Tuple<Long, Long> rangeToWrite,
|
||||
final Tuple<Long, Long> rangeToRead,
|
||||
final RangeAvailableHandler reader,
|
||||
final RangeMissingHandler writer,
|
||||
final Executor executor
|
||||
) {
|
||||
final CompletableFuture<Integer> future = new CompletableFuture<>();
|
||||
try {
|
||||
if (start < 0 || start > tracker.getLength() || start > end || end > tracker.getLength()) {
|
||||
throw new IllegalArgumentException(
|
||||
"Invalid range [start=" + start + ", end=" + end + "] for length [" + tracker.getLength() + ']'
|
||||
);
|
||||
}
|
||||
ensureOpen();
|
||||
final List<SparseFileTracker.Gap> gaps = tracker.waitForRange(
|
||||
Tuple.tuple(start, end),
|
||||
Tuple.tuple(start, end), // TODO use progressive sub range to trigger read operations sooner
|
||||
ActionListener.wrap(
|
||||
rangeReady -> future.complete(onRangeAvailable.apply(start, end)),
|
||||
rangeFailure -> future.completeExceptionally(rangeFailure)
|
||||
)
|
||||
);
|
||||
final List<SparseFileTracker.Gap> gaps = tracker.waitForRange(rangeToWrite, rangeToRead, ActionListener.wrap(success -> {
|
||||
final int read = reader.onRangeAvailable(channel);
|
||||
assert read == rangeToRead.v2() - rangeToRead.v1() : "partial read ["
|
||||
+ read
|
||||
+ "] does not match the range to read ["
|
||||
+ rangeToRead.v2()
|
||||
+ '-'
|
||||
+ rangeToRead.v1()
|
||||
+ ']';
|
||||
future.complete(read);
|
||||
}, future::completeExceptionally));
|
||||
|
||||
for (SparseFileTracker.Gap gap : gaps) {
|
||||
try {
|
||||
ensureOpen();
|
||||
onRangeMissing.accept(gap.start(), gap.end());
|
||||
gap.onProgress(gap.end()); // TODO update progress in onRangeMissing
|
||||
gap.onCompletion();
|
||||
} catch (Exception e) {
|
||||
gap.onFailure(e);
|
||||
}
|
||||
if (gaps.isEmpty() == false) {
|
||||
executor.execute(new AbstractRunnable() {
|
||||
|
||||
@Override
|
||||
protected void doRun() {
|
||||
for (SparseFileTracker.Gap gap : gaps) {
|
||||
try {
|
||||
ensureOpen();
|
||||
if (readLock.tryLock() == false) {
|
||||
throw new AlreadyClosedException("Cache file channel is being evicted, writing attempt cancelled");
|
||||
}
|
||||
try {
|
||||
ensureOpen();
|
||||
if (channel == null) {
|
||||
throw new AlreadyClosedException("Cache file channel has been released and closed");
|
||||
}
|
||||
writer.fillCacheRange(channel, gap.start(), gap.end(), gap::onProgress);
|
||||
gap.onCompletion();
|
||||
} finally {
|
||||
readLock.unlock();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
gap.onFailure(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
gaps.forEach(gap -> gap.onFailure(e));
|
||||
}
|
||||
});
|
||||
}
|
||||
} catch (Exception e) {
|
||||
future.completeExceptionally(e);
|
||||
|
|
|
@ -21,6 +21,7 @@ import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.F
|
|||
import org.elasticsearch.index.store.BaseSearchableSnapshotIndexInput;
|
||||
import org.elasticsearch.index.store.IndexInputStats;
|
||||
import org.elasticsearch.index.store.SearchableSnapshotDirectory;
|
||||
import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants;
|
||||
|
||||
import java.io.EOFException;
|
||||
import java.io.IOException;
|
||||
|
@ -30,6 +31,7 @@ import java.nio.channels.FileChannel;
|
|||
import java.util.Locale;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
|
@ -141,13 +143,22 @@ public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn
|
|||
try {
|
||||
final CacheFile cacheFile = getCacheFileSafe();
|
||||
try (Releasable ignored = cacheFile.fileLock()) {
|
||||
final Tuple<Long, Long> range = computeRange(pos);
|
||||
bytesRead = cacheFile.fetchRange(
|
||||
range.v1(),
|
||||
range.v2(),
|
||||
(start, end) -> readCacheFile(cacheFile.getChannel(), end, pos, b, len),
|
||||
(start, end) -> writeCacheFile(cacheFile.getChannel(), start, end)
|
||||
).get();
|
||||
final Tuple<Long, Long> rangeToWrite = computeRange(pos);
|
||||
final Tuple<Long, Long> rangeToRead = Tuple.tuple(pos, Math.min(pos + len, rangeToWrite.v2()));
|
||||
|
||||
bytesRead = cacheFile.fetchAsync(rangeToWrite, rangeToRead, (channel) -> {
|
||||
final int read;
|
||||
if ((rangeToRead.v2() - rangeToRead.v1()) < b.remaining()) {
|
||||
final ByteBuffer duplicate = b.duplicate();
|
||||
duplicate.limit(duplicate.position() + Math.toIntExact(rangeToRead.v2() - rangeToRead.v1()));
|
||||
read = readCacheFile(channel, pos, duplicate);
|
||||
assert duplicate.position() <= b.limit();
|
||||
b.position(duplicate.position());
|
||||
} else {
|
||||
read = readCacheFile(channel, pos, b);
|
||||
}
|
||||
return read;
|
||||
}, this::writeCacheFile, directory.cacheFetchAsyncExecutor()).get();
|
||||
}
|
||||
} catch (final Exception e) {
|
||||
if (e instanceof AlreadyClosedException || (e.getCause() != null && e.getCause() instanceof AlreadyClosedException)) {
|
||||
|
@ -224,22 +235,24 @@ public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn
|
|||
while (remainingBytes > 0L) {
|
||||
assert totalBytesRead + remainingBytes == rangeLength;
|
||||
final int bytesRead = readSafe(input, copyBuffer, rangeStart, rangeEnd, remainingBytes, cacheFileReference);
|
||||
|
||||
// The range to prewarm in cache
|
||||
final long readStart = rangeStart + totalBytesRead;
|
||||
cacheFile.fetchRange(readStart, readStart + bytesRead, (start, end) -> {
|
||||
logger.trace(
|
||||
"prefetchPart: range [{}-{}] of file [{}] is available in cache",
|
||||
start,
|
||||
end,
|
||||
fileInfo.physicalName()
|
||||
);
|
||||
return Math.toIntExact(end - start);
|
||||
}, (start, end) -> {
|
||||
final Tuple<Long, Long> rangeToWrite = Tuple.tuple(readStart, readStart + bytesRead);
|
||||
|
||||
// Prewarming don't need to read the cached data after it been written in cache; so the range to read is empty. In
|
||||
// case where the range is actively being (or about to be) written in cache by a concurrent search the sync fetching
|
||||
// returns immediately and the next range can be prewarmed. If the range is not available in cache then the range
|
||||
// will be written by this prewarming task and blocks until fully written to disk.
|
||||
final Tuple<Long, Long> rangeToRead = Tuple.tuple(readStart, readStart);
|
||||
|
||||
cacheFile.fetchAsync(rangeToWrite, rangeToRead, (channel) -> 0, (channel, start, end, progressUpdater) -> {
|
||||
final ByteBuffer byteBuffer = ByteBuffer.wrap(
|
||||
copyBuffer,
|
||||
Math.toIntExact(start - readStart),
|
||||
Math.toIntExact(end - start)
|
||||
);
|
||||
final int writtenBytes = positionalWrite(fc, start, byteBuffer);
|
||||
final int writtenBytes = positionalWrite(channel, start, byteBuffer);
|
||||
logger.trace(
|
||||
"prefetchPart: writing range [{}-{}] of file [{}], [{}] bytes written",
|
||||
start,
|
||||
|
@ -248,7 +261,8 @@ public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn
|
|||
writtenBytes
|
||||
);
|
||||
totalBytesWritten.addAndGet(writtenBytes);
|
||||
});
|
||||
progressUpdater.accept(start + writtenBytes);
|
||||
}, directory.prewarmExecutor());
|
||||
totalBytesRead += bytesRead;
|
||||
remainingBytes -= bytesRead;
|
||||
}
|
||||
|
@ -265,6 +279,7 @@ public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn
|
|||
|
||||
@SuppressForbidden(reason = "Use positional writes on purpose")
|
||||
private static int positionalWrite(FileChannel fc, long start, ByteBuffer byteBuffer) throws IOException {
|
||||
assert assertSearchableSnapshotsThread();
|
||||
return fc.write(byteBuffer, start);
|
||||
}
|
||||
|
||||
|
@ -318,31 +333,28 @@ public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn
|
|||
return true;
|
||||
}
|
||||
|
||||
private int readCacheFile(FileChannel fc, long end, long position, ByteBuffer b, long length) throws IOException {
|
||||
private int readCacheFile(final FileChannel fc, final long position, final ByteBuffer buffer) throws IOException {
|
||||
assert assertFileChannelOpen(fc);
|
||||
final int bytesRead;
|
||||
|
||||
assert b.remaining() == length;
|
||||
if (end - position < b.remaining()) {
|
||||
final ByteBuffer duplicate = b.duplicate();
|
||||
duplicate.limit(b.position() + Math.toIntExact(end - position));
|
||||
bytesRead = Channels.readFromFileChannel(fc, position, duplicate);
|
||||
assert duplicate.position() < b.limit();
|
||||
b.position(duplicate.position());
|
||||
} else {
|
||||
bytesRead = Channels.readFromFileChannel(fc, position, b);
|
||||
}
|
||||
final int bytesRead = Channels.readFromFileChannel(fc, position, buffer);
|
||||
if (bytesRead == -1) {
|
||||
throw new EOFException(
|
||||
String.format(Locale.ROOT, "unexpected EOF reading [%d-%d] from %s", position, position + length, cacheFileReference)
|
||||
String.format(
|
||||
Locale.ROOT,
|
||||
"unexpected EOF reading [%d-%d] from %s",
|
||||
position,
|
||||
position + buffer.remaining(),
|
||||
cacheFileReference
|
||||
)
|
||||
);
|
||||
}
|
||||
stats.addCachedBytesRead(bytesRead);
|
||||
return bytesRead;
|
||||
}
|
||||
|
||||
private void writeCacheFile(FileChannel fc, long start, long end) throws IOException {
|
||||
private void writeCacheFile(final FileChannel fc, final long start, final long end, final Consumer<Long> progressUpdater)
|
||||
throws IOException {
|
||||
assert assertFileChannelOpen(fc);
|
||||
assert assertSearchableSnapshotsThread();
|
||||
final long length = end - start;
|
||||
final byte[] copyBuffer = new byte[Math.toIntExact(Math.min(COPY_BUFFER_SIZE, length))];
|
||||
logger.trace(() -> new ParameterizedMessage("writing range [{}-{}] to cache file [{}]", start, end, cacheFileReference));
|
||||
|
@ -356,6 +368,7 @@ public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn
|
|||
positionalWrite(fc, start + bytesCopied, ByteBuffer.wrap(copyBuffer, 0, bytesRead));
|
||||
bytesCopied += bytesRead;
|
||||
remaining -= bytesRead;
|
||||
progressUpdater.accept(start + bytesCopied);
|
||||
}
|
||||
final long endTimeNanos = stats.currentTimeNanos();
|
||||
stats.addCachedBytesWritten(bytesCopied, endTimeNanos - startTimeNanos);
|
||||
|
@ -532,4 +545,12 @@ public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn
|
|||
assert fileChannel.isOpen();
|
||||
return true;
|
||||
}
|
||||
|
||||
private static boolean assertSearchableSnapshotsThread() {
|
||||
final String threadName = Thread.currentThread().getName();
|
||||
assert threadName.contains(
|
||||
'[' + SearchableSnapshotsConstants.SEARCHABLE_SNAPSHOTS_THREAD_POOL_NAME + ']'
|
||||
) : "expected the current thread [" + threadName + "] to belong to the searchable snapshots thread pool";
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,6 +24,7 @@ import org.elasticsearch.repositories.IndexId;
|
|||
import org.elasticsearch.snapshots.SnapshotId;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots;
|
||||
import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -384,15 +385,18 @@ public class SearchableSnapshotDirectoryStatsTests extends ESIndexInputTestCase
|
|||
input.seek(0L);
|
||||
}
|
||||
|
||||
// cache file has been written in a single chunk
|
||||
assertCounter(inputStats.getCachedBytesWritten(), input.length(), 1L, input.length(), input.length());
|
||||
assertThat(inputStats.getCachedBytesWritten().totalNanoseconds(), equalTo(FAKE_CLOCK_ADVANCE_NANOS));
|
||||
// Use assertBusy() here to wait for the cache write to be processed in the searchable snapshot thread pool
|
||||
assertBusy(() -> {
|
||||
// cache file has been written in a single chunk
|
||||
assertCounter(inputStats.getCachedBytesWritten(), input.length(), 1L, input.length(), input.length());
|
||||
assertThat(inputStats.getCachedBytesWritten().totalNanoseconds(), equalTo(FAKE_CLOCK_ADVANCE_NANOS));
|
||||
});
|
||||
|
||||
assertCounter(inputStats.getContiguousReads(), 0L, 0L, 0L, 0L);
|
||||
assertCounter(inputStats.getDirectBytesRead(), 0L, 0L, 0L, 0L);
|
||||
assertThat(inputStats.getDirectBytesRead().totalNanoseconds(), equalTo(0L));
|
||||
|
||||
} catch (IOException e) {
|
||||
} catch (Exception e) {
|
||||
throw new AssertionError(e);
|
||||
}
|
||||
});
|
||||
|
@ -580,7 +584,7 @@ public class SearchableSnapshotDirectoryStatsTests extends ESIndexInputTestCase
|
|||
final ShardId shardId = new ShardId("_name", "_uuid", 0);
|
||||
final AtomicLong fakeClock = new AtomicLong();
|
||||
final LongSupplier statsCurrentTimeNanos = () -> fakeClock.addAndGet(FAKE_CLOCK_ADVANCE_NANOS);
|
||||
final ThreadPool threadPool = new TestThreadPool(getTestClass().getSimpleName());
|
||||
final ThreadPool threadPool = new TestThreadPool(getTestClass().getSimpleName(), SearchableSnapshots.executorBuilder());
|
||||
|
||||
final Long seekingThreshold = randomBoolean() ? randomLongBetween(1L, fileContent.length) : null;
|
||||
|
||||
|
|
Loading…
Reference in New Issue