Permit searches to be concurrent to prewarming (#55795)

Today when prewarming a searchable snapshot we use the `SparseFileTracker` to
lock each (part of a) snapshotted blob, blocking any other readers from
accessing this data until the whole part is available.

This commit changes this strategy: instead we optimistically start to download
the blob without any locking, and then lock much smaller ranges after each
individual `read()` call. This may mean that some bytes are downloaded twice,
but reduces the time that other readers may need to wait before the data they
need is available.

As a best-effort optimisation we try to request the smallest possible single
range of missing bytes in the part by first checking how many of the initial
and terminal bytes of the part are already present in cache. In particular if
the part is already fully cached before prewarming then this check means we
skip the part entirely.
This commit is contained in:
David Turner 2020-04-28 10:43:28 +01:00
parent 126e4acca8
commit 3f2d10d8fc
7 changed files with 252 additions and 42 deletions

View File

@ -74,7 +74,7 @@ public class IndexInputStats {
cachedBytesRead.add(bytesRead);
}
public void addCachedBytesWritten(int bytesWritten, long nanoseconds) {
public void addCachedBytesWritten(long bytesWritten, long nanoseconds) {
cachedBytesWritten.add(bytesWritten, nanoseconds);
}

View File

@ -381,8 +381,7 @@ public class SearchableSnapshotDirectory extends BaseDirectory {
final long startTimeInNanos = statsCurrentTimeNanosSupplier.getAsLong();
final CachedBlobContainerIndexInput cachedIndexInput = (CachedBlobContainerIndexInput) input.clone();
final int bytesRead = cachedIndexInput.prefetchPart(part); // TODO does not include any rate limitation
assert bytesRead == cacheFile.partBytes(part);
cachedIndexInput.prefetchPart(part); // TODO does not include any rate limitation
logger.trace(
() -> new ParameterizedMessage(

View File

@ -10,6 +10,7 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.CheckedBiConsumer;
import org.elasticsearch.common.CheckedBiFunction;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
import org.elasticsearch.common.util.concurrent.ReleasableLock;
@ -283,4 +284,9 @@ public class CacheFile {
}
return future;
}
public Tuple<Long, Long> getAbsentRangeWithin(long start, long end) {
ensureOpen();
return tracker.getAbsentRangeWithin(start, end);
}
}

View File

@ -28,6 +28,7 @@ import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.Locale;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.stream.IntStream;
@ -174,29 +175,132 @@ public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn
/**
* Prefetches a complete part and writes it in cache. This method is used to prewarm the cache.
*/
public int prefetchPart(final int part) throws IOException {
public void prefetchPart(final int part) throws IOException {
ensureContext(ctx -> ctx == CACHE_WARMING_CONTEXT);
if (part >= fileInfo.numberOfParts()) {
throw new IllegalArgumentException("Unexpected part number [" + part + "]");
}
final Tuple<Long, Long> range = computeRange(IntStream.range(0, part).mapToLong(fileInfo::partBytes).sum());
assert assertRangeIsAlignedWithPart(range);
final Tuple<Long, Long> partRange = computeRange(IntStream.range(0, part).mapToLong(fileInfo::partBytes).sum());
assert assertRangeIsAlignedWithPart(partRange);
try {
final CacheFile cacheFile = getCacheFileSafe();
try (ReleasableLock ignored = cacheFile.fileLock()) {
final int bytesRead = cacheFile.fetchRange(range.v1(), range.v2(), (start, end) -> {
logger.trace("range [{}-{}] of file [{}] is now available in cache", start, end, fileInfo.physicalName());
return Math.toIntExact(end - start);
}, (start, end) -> writeCacheFile(cacheFile.getChannel(), start, end)).get();
assert bytesRead == (range.v2() - range.v1());
return bytesRead;
final Tuple<Long, Long> range = cacheFile.getAbsentRangeWithin(partRange.v1(), partRange.v2());
if (range == null) {
logger.trace(
"prefetchPart: part [{}] bytes [{}-{}] is already fully available for cache file [{}]",
part,
partRange.v1(),
partRange.v2(),
cacheFileReference
);
return;
}
final long rangeStart = range.v1();
final long rangeEnd = range.v2();
final long rangeLength = rangeEnd - rangeStart;
logger.trace(
"prefetchPart: prewarming part [{}] bytes [{}-{}] by fetching bytes [{}-{}] for cache file [{}]",
part,
partRange.v1(),
partRange.v2(),
rangeStart,
rangeEnd,
cacheFileReference
);
final FileChannel fc = cacheFile.getChannel();
assert assertFileChannelOpen(fc);
final byte[] copyBuffer = new byte[Math.toIntExact(Math.min(COPY_BUFFER_SIZE, rangeLength))];
long totalBytesRead = 0L;
final AtomicLong totalBytesWritten = new AtomicLong();
long remainingBytes = rangeEnd - rangeStart;
final long startTimeNanos = stats.currentTimeNanos();
try (InputStream input = openInputStream(rangeStart, rangeLength)) {
while (remainingBytes > 0L) {
assert totalBytesRead + remainingBytes == rangeLength;
final int bytesRead = readSafe(input, copyBuffer, rangeStart, rangeEnd, remainingBytes, cacheFileReference);
final long readStart = rangeStart + totalBytesRead;
cacheFile.fetchRange(readStart, readStart + bytesRead, (start, end) -> {
logger.trace(
"prefetchPart: range [{}-{}] of file [{}] is available in cache",
start,
end,
fileInfo.physicalName()
);
return Math.toIntExact(end - start);
}, (start, end) -> {
final ByteBuffer byteBuffer = ByteBuffer.wrap(
copyBuffer,
Math.toIntExact(start - readStart),
Math.toIntExact(end - start)
);
final int writtenBytes = positionalWrite(fc, start, byteBuffer);
logger.trace(
"prefetchPart: writing range [{}-{}] of file [{}], [{}] bytes written",
start,
end,
fileInfo.physicalName(),
writtenBytes
);
totalBytesWritten.addAndGet(writtenBytes);
});
totalBytesRead += bytesRead;
remainingBytes -= bytesRead;
}
final long endTimeNanos = stats.currentTimeNanos();
stats.addCachedBytesWritten(totalBytesWritten.get(), endTimeNanos - startTimeNanos);
}
assert totalBytesRead == rangeLength;
}
} catch (final Exception e) {
throw new IOException("Failed to prefetch file part in cache", e);
}
}
@SuppressForbidden(reason = "Use positional writes on purpose")
private static int positionalWrite(FileChannel fc, long start, ByteBuffer byteBuffer) throws IOException {
return fc.write(byteBuffer, start);
}
/**
* Perform a single {@code read()} from {@code inputStream} into {@code copyBuffer}, handling an EOF by throwing an {@link EOFException}
* rather than returning {@code -1}. Returns the number of bytes read, which is always positive.
*
* Most of its arguments are there simply to make the message of the {@link EOFException} more informative.
*/
private static int readSafe(
InputStream inputStream,
byte[] copyBuffer,
long rangeStart,
long rangeEnd,
long remaining,
CacheFileReference cacheFileReference
) throws IOException {
final int len = (remaining < copyBuffer.length) ? Math.toIntExact(remaining) : copyBuffer.length;
final int bytesRead = inputStream.read(copyBuffer, 0, len);
if (bytesRead == -1) {
throw new EOFException(
String.format(
Locale.ROOT,
"unexpected EOF reading [%d-%d] ([%d] bytes remaining) from %s",
rangeStart,
rangeEnd,
remaining,
cacheFileReference
)
);
}
assert bytesRead > 0 : bytesRead;
return bytesRead;
}
/**
* Asserts that the range of bytes to warm in cache is aligned with {@link #fileInfo}'s part size.
*/
@ -218,37 +322,28 @@ public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn
private int readCacheFile(FileChannel fc, long end, long position, byte[] buffer, int offset, long length) throws IOException {
assert assertFileChannelOpen(fc);
int bytesRead = Channels.readFromFileChannel(fc, position, buffer, offset, Math.toIntExact(Math.min(length, end - position)));
if (bytesRead == -1) {
throw new EOFException(
String.format(Locale.ROOT, "unexpected EOF reading [%d-%d] from %s", position, position + length, cacheFileReference)
);
}
stats.addCachedBytesRead(bytesRead);
return bytesRead;
}
@SuppressForbidden(reason = "Use positional writes on purpose")
private void writeCacheFile(FileChannel fc, long start, long end) throws IOException {
assert assertFileChannelOpen(fc);
final long length = end - start;
final byte[] copyBuffer = new byte[Math.toIntExact(Math.min(COPY_BUFFER_SIZE, length))];
logger.trace(() -> new ParameterizedMessage("writing range [{}-{}] to cache file [{}]", start, end, cacheFileReference));
int bytesCopied = 0;
long bytesCopied = 0L;
long remaining = end - start;
final long startTimeNanos = stats.currentTimeNanos();
try (InputStream input = openInputStream(start, length)) {
long remaining = end - start;
while (remaining > 0) {
final int len = (remaining < copyBuffer.length) ? Math.toIntExact(remaining) : copyBuffer.length;
int bytesRead = input.read(copyBuffer, 0, len);
if (bytesRead == -1) {
throw new EOFException(
String.format(
Locale.ROOT,
"unexpected EOF reading [%d-%d] ([%d] bytes remaining) from %s",
start,
end,
remaining,
cacheFileReference
)
);
}
fc.write(ByteBuffer.wrap(copyBuffer, 0, bytesRead), start + bytesCopied);
while (remaining > 0L) {
final int bytesRead = readSafe(input, copyBuffer, start, end, remaining, cacheFileReference);
positionalWrite(fc, start + bytesCopied, ByteBuffer.wrap(copyBuffer, 0, bytesRead));
bytesCopied += bytesRead;
remaining -= bytesRead;
}

View File

@ -9,6 +9,7 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.GroupedActionListener;
import org.elasticsearch.action.support.PlainListenableActionFuture;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.Tuple;
import java.util.ArrayList;
import java.util.Collections;
@ -149,6 +150,58 @@ public class SparseFileTracker {
return Collections.emptyList();
}
/**
* Returns a range that contains all bytes of the target range which are absent (possibly pending). The returned range may include
* some ranges of present bytes. It tries to return the smallest possible range, but does so on a best-effort basis. This method does
* not acquire anything, which means that another thread may concurrently fill in some of the returned range.
*
* @param start The (inclusive) start of the target range
* @param end The (exclusive) end of the target range
* @return a range that contains all bytes of the target range which are absent, or {@code null} if there are no such bytes.
*/
public Tuple<Long, Long> getAbsentRangeWithin(final long start, final long end) {
synchronized (mutex) {
// Find the first absent byte in the range
final SortedSet<Range> startRanges = ranges.headSet(new Range(start, start, null), true); // ranges which start <= 'start'
long resultStart;
if (startRanges.isEmpty()) {
resultStart = start;
} else {
final Range lastStartRange = startRanges.last();
// last range which starts <= 'start' and which therefore may contain the first byte of the range
if (lastStartRange.end < start) {
resultStart = start;
} else if (lastStartRange.isPending()) {
resultStart = start;
} else {
resultStart = lastStartRange.end;
}
}
assert resultStart >= start;
// Find the last absent byte in the range
final SortedSet<Range> endRanges = ranges.headSet(new Range(end, end, null), false); // ranges which start < 'end'
final long resultEnd;
if (endRanges.isEmpty()) {
resultEnd = end;
} else {
final Range lastEndRange = endRanges.last();
// last range which starts < 'end' and which therefore may contain the last byte of the range
if (lastEndRange.end < end) {
resultEnd = end;
} else if (lastEndRange.isPending()) {
resultEnd = end;
} else {
resultEnd = lastEndRange.start;
}
}
assert resultEnd <= end;
return resultStart < resultEnd ? Tuple.tuple(resultStart, resultEnd) : null;
}
}
private void onGapSuccess(final long start, final long end) {
final PlainListenableActionFuture<Void> completionListener;

View File

@ -18,6 +18,9 @@ import org.elasticsearch.index.store.StoreFileMetadata;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.mockstore.BlobContainerWrapper;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots;
import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService;
import java.io.EOFException;
@ -28,6 +31,7 @@ import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.HashSet;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import static java.util.Collections.singletonList;
@ -35,6 +39,7 @@ import static org.elasticsearch.index.store.cache.TestUtils.createCacheService;
import static org.elasticsearch.index.store.cache.TestUtils.singleBlobContainer;
import static org.elasticsearch.index.store.cache.TestUtils.singleSplitBlobContainer;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_CACHE_ENABLED_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_CACHE_PREWARM_ENABLED_SETTING;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
@ -42,6 +47,7 @@ import static org.hamcrest.Matchers.notNullValue;
public class CachedBlobContainerIndexInputTests extends ESIndexInputTestCase {
public void testRandomReads() throws IOException {
final ThreadPool threadPool = new TestThreadPool(getTestName(), SearchableSnapshots.executorBuilder());
try (CacheService cacheService = createCacheService(random())) {
cacheService.start();
@ -68,9 +74,10 @@ public class CachedBlobContainerIndexInputTests extends ESIndexInputTestCase {
0L
);
final boolean prewarmEnabled = randomBoolean();
final BlobContainer singleBlobContainer = singleSplitBlobContainer(blobName, input, partSize);
final BlobContainer blobContainer;
if (input.length == partSize && input.length <= cacheService.getCacheSize()) {
if (input.length == partSize && input.length <= cacheService.getCacheSize() && prewarmEnabled == false) {
blobContainer = new CountingBlobContainer(singleBlobContainer, cacheService.getRangeSize());
} else {
blobContainer = singleBlobContainer;
@ -84,11 +91,14 @@ public class CachedBlobContainerIndexInputTests extends ESIndexInputTestCase {
snapshotId,
indexId,
shardId,
Settings.builder().put(SNAPSHOT_CACHE_ENABLED_SETTING.getKey(), true).build(),
Settings.builder()
.put(SNAPSHOT_CACHE_ENABLED_SETTING.getKey(), true)
.put(SNAPSHOT_CACHE_PREWARM_ENABLED_SETTING.getKey(), prewarmEnabled)
.build(),
() -> 0L,
cacheService,
cacheDir,
null
threadPool
)
) {
final boolean loaded = directory.loadSnapshot();
@ -118,6 +128,8 @@ public class CachedBlobContainerIndexInputTests extends ESIndexInputTestCase {
);
}
}
} finally {
ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS);
}
}

View File

@ -8,6 +8,7 @@ package org.elasticsearch.index.store.cache;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.coordination.DeterministicTaskQueue;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESTestCase;
@ -87,6 +88,7 @@ public class SparseFileTrackerTests extends ESTestCase {
for (long i = start; i < end; i++) {
if (fileContents[Math.toIntExact(i)] == UNAVAILABLE) {
pending = true;
break;
}
}
@ -134,14 +136,18 @@ public class SparseFileTrackerTests extends ESTestCase {
deterministicTaskQueue.setExecutionDelayVariabilityMillis(1000);
for (int i = between(1, 1000); i > 0; i--) {
deterministicTaskQueue.scheduleNow(
() -> waitForRandomRange(
fileContents,
sparseFileTracker,
listenersCalled::add,
gap -> deterministicTaskQueue.scheduleNow(() -> processGap(fileContents, gap))
)
);
if (rarely() && fileContents.length > 0) {
deterministicTaskQueue.scheduleNow(() -> checkRandomAbsentRange(fileContents, sparseFileTracker, true));
} else {
deterministicTaskQueue.scheduleNow(
() -> waitForRandomRange(
fileContents,
sparseFileTracker,
listenersCalled::add,
gap -> deterministicTaskQueue.scheduleNow(() -> processGap(fileContents, gap))
)
);
}
}
deterministicTaskQueue.runAllTasks();
@ -174,6 +180,13 @@ public class SparseFileTrackerTests extends ESTestCase {
thread.start();
}
final Thread checkThread = new Thread(() -> {
while (countDown.availablePermits() > 0 && fileContents.length > 0) {
checkRandomAbsentRange(fileContents, sparseFileTracker, false);
}
});
checkThread.start();
startLatch.countDown();
for (Thread thread : threads) {
@ -182,6 +195,35 @@ public class SparseFileTrackerTests extends ESTestCase {
assertThat(countDown.availablePermits(), equalTo(0));
assertTrue(listenersCalled.stream().allMatch(AtomicBoolean::get));
checkThread.join();
}
private static void checkRandomAbsentRange(byte[] fileContents, SparseFileTracker sparseFileTracker, boolean expectExact) {
final long checkStart = randomLongBetween(0, fileContents.length - 1);
final long checkEnd = randomLongBetween(0, fileContents.length);
final Tuple<Long, Long> freeRange = sparseFileTracker.getAbsentRangeWithin(checkStart, checkEnd);
if (freeRange == null) {
for (long i = checkStart; i < checkEnd; i++) {
assertThat(fileContents[Math.toIntExact(i)], equalTo(AVAILABLE));
}
} else {
assertThat(freeRange.v1(), greaterThanOrEqualTo(checkStart));
assertTrue(freeRange.toString(), freeRange.v1() < freeRange.v2());
assertThat(freeRange.v2(), lessThanOrEqualTo(checkEnd));
for (long i = checkStart; i < freeRange.v1(); i++) {
assertThat(fileContents[Math.toIntExact(i)], equalTo(AVAILABLE));
}
for (long i = freeRange.v2(); i < checkEnd; i++) {
assertThat(fileContents[Math.toIntExact(i)], equalTo(AVAILABLE));
}
if (expectExact) {
// without concurrent activity, the returned range is as small as possible
assertThat(fileContents[Math.toIntExact(freeRange.v1())], equalTo(UNAVAILABLE));
assertThat(fileContents[Math.toIntExact(freeRange.v2() - 1)], equalTo(UNAVAILABLE));
}
}
}
private static void waitForRandomRange(
@ -211,6 +253,9 @@ public class SparseFileTrackerTests extends ESTestCase {
});
for (final SparseFileTracker.Gap gap : gaps) {
for (long i = gap.start; i < gap.end; i++) {
assertThat(Long.toString(i), fileContents[Math.toIntExact(i)], equalTo(UNAVAILABLE));
}
gapConsumer.accept(gap);
}
}