From 41ddbd4188c964c10467b9e74e4d3ebbae5163a2 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Fri, 24 Apr 2020 17:47:50 +0200 Subject: [PATCH] Allow to prewarm the cache for searchable snapshot shards (#55322) Relates #50999 --- .../SearchableSnapshotsConstants.java | 2 + .../BaseSearchableSnapshotIndexInput.java | 4 + .../store/SearchableSnapshotDirectory.java | 104 +++++- .../index/store/cache/CacheFile.java | 36 +- .../cache/CachedBlobContainerIndexInput.java | 107 +++++- .../SearchableSnapshots.java | 33 +- .../cache/CacheService.java | 2 +- ...SearchableSnapshotDirectoryStatsTests.java | 8 +- .../SearchableSnapshotDirectoryTests.java | 23 +- .../index/store/cache/CacheFileTests.java | 8 +- .../store/cache/CachePreWarmingTests.java | 321 ++++++++++++++++++ .../CachedBlobContainerIndexInputTests.java | 6 +- .../SearchableSnapshotsIntegTests.java | 3 + 13 files changed, 606 insertions(+), 51 deletions(-) create mode 100644 x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CachePreWarmingTests.java diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsConstants.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsConstants.java index e4504aa1389..a5492d8a740 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsConstants.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsConstants.java @@ -34,4 +34,6 @@ public class SearchableSnapshotsConstants { return SEARCHABLE_SNAPSHOTS_FEATURE_ENABLED && SNAPSHOT_DIRECTORY_FACTORY_KEY.equals(INDEX_STORE_TYPE_SETTING.get(indexSettings)); } + + public static final String SEARCHABLE_SNAPSHOTS_THREAD_POOL_NAME = "searchable_snapshots"; } diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BaseSearchableSnapshotIndexInput.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BaseSearchableSnapshotIndexInput.java index 280474fbb7a..f8b79b989e5 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BaseSearchableSnapshotIndexInput.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BaseSearchableSnapshotIndexInput.java @@ -12,6 +12,7 @@ import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo; import org.elasticsearch.index.snapshots.blobstore.SlicedInputStream; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants; import java.io.IOException; import java.io.InputStream; @@ -136,6 +137,9 @@ public abstract class BaseSearchableSnapshotIndexInput extends BufferedIndexInpu || threadName.contains('[' + ThreadPool.Names.SEARCH + ']') || threadName.contains('[' + ThreadPool.Names.SEARCH_THROTTLED + ']') + // Cache prewarming runs on a dedicated thread pool. + || threadName.contains('[' + SearchableSnapshotsConstants.SEARCHABLE_SNAPSHOTS_THREAD_POOL_NAME + ']') + // Today processExistingRecoveries considers all shards and constructs a shard store snapshot on this thread, this needs // addressing. TODO NORELEASE || threadName.contains('[' + ThreadPool.Names.FETCH_SHARD_STORE + ']') diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java index 9ffdd341921..101e3554091 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java @@ -5,6 +5,9 @@ */ package org.elasticsearch.index.store; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.index.IndexFileNames; import org.apache.lucene.store.BaseDirectory; import org.apache.lucene.store.Directory; @@ -18,8 +21,12 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.lucene.store.ByteArrayIndexInput; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.LazyInitializable; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.common.util.concurrent.CountDown; +import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardPath; @@ -47,18 +54,22 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.LongSupplier; import java.util.function.Supplier; +import java.util.stream.Collectors; import static org.apache.lucene.store.BufferedIndexInput.bufferSize; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_CACHE_ENABLED_SETTING; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_CACHE_EXCLUDED_FILE_TYPES_SETTING; +import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_CACHE_PREWARM_ENABLED_SETTING; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_ID_SETTING; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_REPOSITORY_SETTING; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_ID_SETTING; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_NAME_SETTING; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_UNCACHED_CHUNK_SIZE_SETTING; +import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.SEARCHABLE_SNAPSHOTS_THREAD_POOL_NAME; /** * Implementation of {@link Directory} that exposes files from a snapshot as a Lucene directory. Because snapshot are immutable this @@ -73,6 +84,8 @@ import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SN */ public class SearchableSnapshotDirectory extends BaseDirectory { + private static final Logger logger = LogManager.getLogger(SearchableSnapshotDirectory.class); + private final Supplier blobContainerSupplier; private final Supplier snapshotSupplier; private final SnapshotId snapshotId; @@ -80,8 +93,10 @@ public class SearchableSnapshotDirectory extends BaseDirectory { private final ShardId shardId; private final LongSupplier statsCurrentTimeNanosSupplier; private final Map stats; + private final ThreadPool threadPool; private final CacheService cacheService; private final boolean useCache; + private final boolean prewarmCache; private final Set excludedFileTypes; private final long uncachedChunkSize; // if negative use BlobContainer#readBlobPreferredLength, see #getUncachedChunkSize() private final Path cacheDir; @@ -101,7 +116,8 @@ public class SearchableSnapshotDirectory extends BaseDirectory { Settings indexSettings, LongSupplier currentTimeNanosSupplier, CacheService cacheService, - Path cacheDir + Path cacheDir, + ThreadPool threadPool ) { super(new SingleInstanceLockFactory()); this.snapshotSupplier = Objects.requireNonNull(snapshot); @@ -115,8 +131,10 @@ public class SearchableSnapshotDirectory extends BaseDirectory { this.cacheDir = Objects.requireNonNull(cacheDir); this.closed = new AtomicBoolean(false); this.useCache = SNAPSHOT_CACHE_ENABLED_SETTING.get(indexSettings); + this.prewarmCache = useCache ? SNAPSHOT_CACHE_PREWARM_ENABLED_SETTING.get(indexSettings) : false; this.excludedFileTypes = new HashSet<>(SNAPSHOT_CACHE_EXCLUDED_FILE_TYPES_SETTING.get(indexSettings)); this.uncachedChunkSize = SNAPSHOT_UNCACHED_CHUNK_SIZE_SETTING.get(indexSettings).getBytes(); + this.threadPool = threadPool; this.loaded = false; assert invariant(); } @@ -142,6 +160,7 @@ public class SearchableSnapshotDirectory extends BaseDirectory { * @return true if the snapshot was loaded by executing this method, false otherwise */ public boolean loadSnapshot() { + assert assertCurrentThreadMayLoadSnapshot(); boolean alreadyLoaded = this.loaded; if (alreadyLoaded == false) { synchronized (this) { @@ -150,10 +169,10 @@ public class SearchableSnapshotDirectory extends BaseDirectory { this.blobContainer = blobContainerSupplier.get(); this.snapshot = snapshotSupplier.get(); this.loaded = true; + prewarmCache(); } } } - assert assertCurrentThreadMayLoadSnapshot(); assert invariant(); return alreadyLoaded == false; } @@ -300,7 +319,7 @@ public class SearchableSnapshotDirectory extends BaseDirectory { final IndexInputStats inputStats = stats.computeIfAbsent(name, n -> createIndexInputStats(fileInfo.length())); if (useCache && isExcludedFromCache(name) == false) { - return new CachedBlobContainerIndexInput(this, fileInfo, context, inputStats); + return new CachedBlobContainerIndexInput(this, fileInfo, context, inputStats, cacheService.getRangeSize()); } else { return new DirectBlobContainerIndexInput( blobContainer(), @@ -331,12 +350,86 @@ public class SearchableSnapshotDirectory extends BaseDirectory { return this.getClass().getSimpleName() + "@snapshotId=" + snapshotId + " lockFactory=" + lockFactory; } + private void prewarmCache() { + if (prewarmCache) { + final List cacheFiles = snapshot().indexFiles() + .stream() + .filter(file -> file.metadata().hashEqualsContents() == false) + .filter(file -> isExcludedFromCache(file.physicalName()) == false) + .collect(Collectors.toList()); + + final Executor executor = threadPool.executor(SEARCHABLE_SNAPSHOTS_THREAD_POOL_NAME); + logger.debug("{} warming shard cache for [{}] files", shardId, cacheFiles.size()); + + for (BlobStoreIndexShardSnapshot.FileInfo cacheFile : cacheFiles) { + final String fileName = cacheFile.physicalName(); + try { + final IndexInput input = openInput(fileName, CachedBlobContainerIndexInput.CACHE_WARMING_CONTEXT); + assert input instanceof CachedBlobContainerIndexInput : "expected cached index input but got " + input.getClass(); + + final long numberOfParts = cacheFile.numberOfParts(); + final CountDown countDown = new CountDown(Math.toIntExact(numberOfParts)); + for (long p = 0; p < numberOfParts; p++) { + final int part = Math.toIntExact(p); + // TODO use multiple workers to warm each part instead of filling the thread pool + executor.execute(new AbstractRunnable() { + @Override + protected void doRun() throws Exception { + ensureOpen(); + + logger.trace("warming cache for [{}] part [{}/{}]", fileName, part, numberOfParts); + final long startTimeInNanos = statsCurrentTimeNanosSupplier.getAsLong(); + + final CachedBlobContainerIndexInput cachedIndexInput = (CachedBlobContainerIndexInput) input.clone(); + final int bytesRead = cachedIndexInput.prefetchPart(part); // TODO does not include any rate limitation + assert bytesRead == cacheFile.partBytes(part); + + logger.trace( + () -> new ParameterizedMessage( + "part [{}/{}] of [{}] warmed in [{}] ms", + part, + numberOfParts, + fileName, + TimeValue.timeValueNanos(statsCurrentTimeNanosSupplier.getAsLong() - startTimeInNanos).millis() + ) + ); + } + + @Override + public void onFailure(Exception e) { + logger.trace( + () -> new ParameterizedMessage( + "failed to warm cache for [{}] part [{}/{}]", + fileName, + part, + numberOfParts + ), + e + ); + } + + @Override + public void onAfter() { + if (countDown.countDown()) { + IOUtils.closeWhileHandlingException(input); + } + } + }); + } + } catch (IOException e) { + logger.trace(() -> new ParameterizedMessage("failed to warm cache for [{}]", fileName), e); + } + } + } + } + public static Directory create( RepositoriesService repositories, CacheService cache, IndexSettings indexSettings, ShardPath shardPath, - LongSupplier currentTimeNanosSupplier + LongSupplier currentTimeNanosSupplier, + ThreadPool threadPool ) throws IOException { final Repository repository = repositories.repository(SNAPSHOT_REPOSITORY_SETTING.get(indexSettings.getSettings())); @@ -371,7 +464,8 @@ public class SearchableSnapshotDirectory extends BaseDirectory { indexSettings.getSettings(), currentTimeNanosSupplier, cache, - cacheDir + cacheDir, + threadPool ) ); } diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CacheFile.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CacheFile.java index dc6662c939e..6b20d70c3d7 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CacheFile.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CacheFile.java @@ -51,7 +51,6 @@ public class CacheFile { private final ReleasableLock readLock; private final SparseFileTracker tracker; - private final int rangeSize; private final String description; private final Path file; @@ -61,12 +60,11 @@ public class CacheFile { @Nullable // if evicted, or there are no listeners private volatile FileChannel channel; - public CacheFile(String description, long length, Path file, int rangeSize) { + public CacheFile(String description, long length, Path file) { this.tracker = new SparseFileTracker(file.toString(), length); this.description = Objects.requireNonNull(description); this.file = Objects.requireNonNull(file); this.listeners = new HashSet<>(); - this.rangeSize = rangeSize; this.evicted = false; final ReentrantReadWriteLock cacheLock = new ReentrantReadWriteLock(); @@ -249,41 +247,35 @@ public class CacheFile { } CompletableFuture fetchRange( - long position, + long start, + long end, CheckedBiFunction onRangeAvailable, CheckedBiConsumer onRangeMissing ) { final CompletableFuture future = new CompletableFuture<>(); try { - if (position < 0 || position > tracker.getLength()) { - throw new IllegalArgumentException("Wrong read position [" + position + "]"); + if (start < 0 || start > tracker.getLength() || start > end || end > tracker.getLength()) { + throw new IllegalArgumentException( + "Invalid range [start=" + start + ", end=" + end + "] for length [" + tracker.getLength() + ']' + ); } - ensureOpen(); - final long rangeStart = (position / rangeSize) * rangeSize; - final long rangeEnd = Math.min(rangeStart + rangeSize, tracker.getLength()); - final List gaps = tracker.waitForRange( - rangeStart, - rangeEnd, + start, + end, ActionListener.wrap( - rangeReady -> future.complete(onRangeAvailable.apply(rangeStart, rangeEnd)), + rangeReady -> future.complete(onRangeAvailable.apply(start, end)), rangeFailure -> future.completeExceptionally(rangeFailure) ) ); - if (gaps.size() > 0) { - final SparseFileTracker.Gap range = gaps.get(0); - assert gaps.size() == 1 : "expected 1 range to fetch but got " + gaps.size(); - assert range.start == rangeStart : "range/gap start mismatch (" + range.start + ',' + rangeStart + ')'; - assert range.end == rangeEnd : "range/gap end mismatch (" + range.end + ',' + rangeEnd + ')'; - + for (SparseFileTracker.Gap gap : gaps) { try { ensureOpen(); - onRangeMissing.accept(rangeStart, rangeEnd); - range.onResponse(null); + onRangeMissing.accept(gap.start, gap.end); + gap.onResponse(null); } catch (Exception e) { - range.onFailure(e); + gap.onFailure(e); } } } catch (Exception e) { diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInput.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInput.java index 5e43183b4e8..6de753be3d9 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInput.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInput.java @@ -14,6 +14,7 @@ import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.SuppressForbidden; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.Channels; import org.elasticsearch.common.util.concurrent.ReleasableLock; import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo; @@ -28,14 +29,24 @@ import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.util.Locale; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Predicate; +import java.util.stream.IntStream; public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexInput { + /** + * Specific IOContext used for prewarming the cache. This context allows to write + * a complete part of the {@link #fileInfo} at once in the cache and should not be + * used for anything else than what the {@link #prefetchPart(int)} method does. + */ + public static final IOContext CACHE_WARMING_CONTEXT = new IOContext(); + private static final Logger logger = LogManager.getLogger(CachedBlobContainerIndexInput.class); private static final int COPY_BUFFER_SIZE = 8192; private final SearchableSnapshotDirectory directory; private final CacheFileReference cacheFileReference; + private final int defaultRangeSize; // last read position is kept around in order to detect (non)contiguous reads for stats private long lastReadPosition; @@ -46,7 +57,8 @@ public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn SearchableSnapshotDirectory directory, FileInfo fileInfo, IOContext context, - IndexInputStats stats + IndexInputStats stats, + int rangeSize ) { this( "CachedBlobContainerIndexInput(" + fileInfo.physicalName() + ")", @@ -56,7 +68,8 @@ public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn stats, 0L, fileInfo.length(), - new CacheFileReference(directory, fileInfo.physicalName(), fileInfo.length()) + new CacheFileReference(directory, fileInfo.physicalName(), fileInfo.length()), + rangeSize ); stats.incrementOpenCount(); } @@ -69,13 +82,15 @@ public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn IndexInputStats stats, long offset, long length, - CacheFileReference cacheFileReference + CacheFileReference cacheFileReference, + int rangeSize ) { super(resourceDesc, directory.blobContainer(), fileInfo, context, stats, offset, length); this.directory = directory; this.cacheFileReference = cacheFileReference; this.lastReadPosition = this.offset; this.lastSeekPosition = this.offset; + this.defaultRangeSize = rangeSize; } @Override @@ -85,8 +100,35 @@ public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn } } + private void ensureContext(Predicate predicate) throws IOException { + if (predicate.test(context) == false) { + assert false : "this method should not be used with this context " + context; + throw new IOException("Cannot read the index input using context [context=" + context + ", input=" + this + ']'); + } + } + + private long getDefaultRangeSize() { + return (context != CACHE_WARMING_CONTEXT) ? defaultRangeSize : fileInfo.partSize().getBytes(); + } + + private Tuple computeRange(long position) { + final long rangeSize = getDefaultRangeSize(); + long start = (position / rangeSize) * rangeSize; + long end = Math.min(start + rangeSize, fileInfo.length()); + return Tuple.tuple(start, end); + } + + private CacheFile getCacheFileSafe() throws Exception { + final CacheFile cacheFile = cacheFileReference.get(); + if (cacheFile == null) { + throw new AlreadyClosedException("Failed to acquire a non-evicted cache file"); + } + return cacheFile; + } + @Override protected void readInternal(final byte[] buffer, final int offset, final int length) throws IOException { + ensureContext(ctx -> ctx != CACHE_WARMING_CONTEXT); final long position = getFilePointer() + this.offset; int totalBytesRead = 0; @@ -97,14 +139,12 @@ public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn int bytesRead = 0; try { - final CacheFile cacheFile = cacheFileReference.get(); - if (cacheFile == null) { - throw new AlreadyClosedException("Failed to acquire a non-evicted cache file"); - } - + final CacheFile cacheFile = getCacheFileSafe(); try (ReleasableLock ignored = cacheFile.fileLock()) { + final Tuple range = computeRange(pos); bytesRead = cacheFile.fetchRange( - pos, + range.v1(), + range.v2(), (start, end) -> readCacheFile(cacheFile.getChannel(), end, pos, buffer, off, len), (start, end) -> writeCacheFile(cacheFile.getChannel(), start, end) ).get(); @@ -131,6 +171,50 @@ public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn lastSeekPosition = lastReadPosition; } + /** + * Prefetches a complete part and writes it in cache. This method is used to prewarm the cache. + */ + public int prefetchPart(final int part) throws IOException { + ensureContext(ctx -> ctx == CACHE_WARMING_CONTEXT); + if (part >= fileInfo.numberOfParts()) { + throw new IllegalArgumentException("Unexpected part number [" + part + "]"); + } + final Tuple range = computeRange(IntStream.range(0, part).mapToLong(fileInfo::partBytes).sum()); + assert assertRangeIsAlignedWithPart(range); + try { + final CacheFile cacheFile = getCacheFileSafe(); + try (ReleasableLock ignored = cacheFile.fileLock()) { + final int bytesRead = cacheFile.fetchRange(range.v1(), range.v2(), (start, end) -> { + logger.trace("range [{}-{}] of file [{}] is now available in cache", start, end, fileInfo.physicalName()); + return Math.toIntExact(end - start); + }, (start, end) -> writeCacheFile(cacheFile.getChannel(), start, end)).get(); + + assert bytesRead == (range.v2() - range.v1()); + return bytesRead; + } + } catch (final Exception e) { + throw new IOException("Failed to prefetch file part in cache", e); + } + } + + /** + * Asserts that the range of bytes to warm in cache is aligned with {@link #fileInfo}'s part size. + */ + private boolean assertRangeIsAlignedWithPart(Tuple range) { + if (fileInfo.numberOfParts() == 1L) { + final long length = fileInfo.length(); + assert range.v1() == 0L : "start of range [" + range.v1() + "] is not aligned with zero"; + assert range.v2() == length : "end of range [" + range.v2() + "] is not aligned with file length [" + length + ']'; + } else { + final long length = fileInfo.partSize().getBytes(); + assert range.v1() % length == 0L : "start of range [" + range.v1() + "] is not aligned with part start"; + assert range.v2() % length == 0L || (range.v2() == fileInfo.length()) : "end of range [" + + range.v2() + + "] is not aligned with part end or with file length"; + } + return true; + } + private int readCacheFile(FileChannel fc, long end, long position, byte[] buffer, int offset, long length) throws IOException { assert assertFileChannelOpen(fc); int bytesRead = Channels.readFromFileChannel(fc, position, buffer, offset, Math.toIntExact(Math.min(length, end - position))); @@ -214,7 +298,8 @@ public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn stats, this.offset + offset, length, - cacheFileReference + cacheFileReference, + defaultRangeSize ); slice.isClone = true; return slice; @@ -231,6 +316,8 @@ public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn + length() + ", position=" + getFilePointer() + + ", rangeSize=" + + getDefaultRangeSize() + '}'; } diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java index 81b44d993a5..83e89a0296c 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java @@ -21,6 +21,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.SettingsFilter; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; @@ -42,6 +43,8 @@ import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestHandler; import org.elasticsearch.script.ScriptService; +import org.elasticsearch.threadpool.ExecutorBuilder; +import org.elasticsearch.threadpool.ScalingExecutorBuilder; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.watcher.ResourceWatcherService; import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotAction; @@ -68,6 +71,7 @@ import java.util.function.Supplier; import static java.util.Collections.emptyList; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.SEARCHABLE_SNAPSHOTS_FEATURE_ENABLED; +import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.SEARCHABLE_SNAPSHOTS_THREAD_POOL_NAME; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.SNAPSHOT_DIRECTORY_FACTORY_KEY; /** @@ -100,6 +104,11 @@ public class SearchableSnapshots extends Plugin implements IndexStorePlugin, Eng true, Setting.Property.IndexScope ); + public static final Setting SNAPSHOT_CACHE_PREWARM_ENABLED_SETTING = Setting.boolSetting( + "index.store.snapshot.cache.prewarm.enabled", + false, + Setting.Property.IndexScope + ); // The file extensions that are excluded from the cache public static final Setting> SNAPSHOT_CACHE_EXCLUDED_FILE_TYPES_SETTING = Setting.listSetting( "index.store.snapshot.cache.excluded_file_types", @@ -117,6 +126,7 @@ public class SearchableSnapshots extends Plugin implements IndexStorePlugin, Eng private volatile Supplier repositoriesServiceSupplier; private final SetOnce cacheService = new SetOnce<>(); + private final SetOnce threadPool = new SetOnce<>(); private final Settings settings; public SearchableSnapshots(final Settings settings) { @@ -138,6 +148,7 @@ public class SearchableSnapshots extends Plugin implements IndexStorePlugin, Eng SNAPSHOT_SNAPSHOT_ID_SETTING, SNAPSHOT_INDEX_ID_SETTING, SNAPSHOT_CACHE_ENABLED_SETTING, + SNAPSHOT_CACHE_PREWARM_ENABLED_SETTING, SNAPSHOT_CACHE_EXCLUDED_FILE_TYPES_SETTING, SNAPSHOT_UNCACHED_CHUNK_SIZE_SETTING, CacheService.SNAPSHOT_CACHE_SIZE_SETTING, @@ -166,6 +177,7 @@ public class SearchableSnapshots extends Plugin implements IndexStorePlugin, Eng final CacheService cacheService = new CacheService(settings); this.cacheService.set(cacheService); this.repositoriesServiceSupplier = repositoriesServiceSupplier; + this.threadPool.set(threadPool); return org.elasticsearch.common.collect.List.of(cacheService); } else { this.repositoriesServiceSupplier = () -> { @@ -191,7 +203,9 @@ public class SearchableSnapshots extends Plugin implements IndexStorePlugin, Eng assert repositories != null; final CacheService cache = cacheService.get(); assert cache != null; - return SearchableSnapshotDirectory.create(repositories, cache, indexSettings, shardPath, System::nanoTime); + final ThreadPool threadPool = this.threadPool.get(); + assert threadPool != null; + return SearchableSnapshotDirectory.create(repositories, cache, indexSettings, shardPath, System::nanoTime, threadPool); }); } else { return org.elasticsearch.common.collect.Map.of(); @@ -253,4 +267,21 @@ public class SearchableSnapshots extends Plugin implements IndexStorePlugin, Eng } } + public List> getExecutorBuilders(Settings settings) { + if (SEARCHABLE_SNAPSHOTS_FEATURE_ENABLED) { + return org.elasticsearch.common.collect.List.of(executorBuilder()); + } else { + return org.elasticsearch.common.collect.List.of(); + } + } + + public static ExecutorBuilder executorBuilder() { + return new ScalingExecutorBuilder( + SEARCHABLE_SNAPSHOTS_THREAD_POOL_NAME, + 0, + 32, + TimeValue.timeValueSeconds(30L), + "xpack.searchable_snapshots.thread_pool" + ); + } } diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheService.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheService.java index f0cf1d4d3c8..0abba9b20ef 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheService.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheService.java @@ -112,7 +112,7 @@ public class CacheService extends AbstractLifecycleComponent { final Path path = cacheDir.resolve(uuid); assert Files.notExists(path) : "cache file already exists " + path; - return new CacheFile(key.toString(), fileLength, path, getRangeSize()); + return new CacheFile(key.toString(), fileLength, path); }); } diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryStatsTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryStatsTests.java index be388a4820a..da0d5c06dac 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryStatsTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryStatsTests.java @@ -22,6 +22,8 @@ import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.F import org.elasticsearch.index.store.cache.TestUtils; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.snapshots.SnapshotId; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService; import java.io.IOException; @@ -571,6 +573,7 @@ public class SearchableSnapshotDirectoryStatsTests extends ESIndexInputTestCase final ShardId shardId = new ShardId("_name", "_uuid", 0); final AtomicLong fakeClock = new AtomicLong(); final LongSupplier statsCurrentTimeNanos = () -> fakeClock.addAndGet(FAKE_CLOCK_ADVANCE_NANOS); + final ThreadPool threadPool = new TestThreadPool(getTestClass().getSimpleName()); final Long seekingThreshold = randomBoolean() ? randomLongBetween(1L, fileContent.length) : null; @@ -591,7 +594,8 @@ public class SearchableSnapshotDirectoryStatsTests extends ESIndexInputTestCase indexSettings, statsCurrentTimeNanos, cacheService, - createTempDir() + createTempDir(), + threadPool ) { @Override protected IndexInputStats createIndexInputStats(long fileLength) { @@ -611,6 +615,8 @@ public class SearchableSnapshotDirectoryStatsTests extends ESIndexInputTestCase assertThat("BlobContainer should be loaded", directory.blobContainer(), notNullValue()); test.apply(fileName, fileContent, directory); + } finally { + terminate(threadPool); } } } diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryTests.java index d8af7382c28..8f5e5a831e7 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryTests.java @@ -70,6 +70,7 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.IndexSettingsModule; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots; import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService; import org.hamcrest.Matcher; @@ -93,6 +94,7 @@ import java.util.Map; import static java.util.Arrays.asList; import static java.util.Collections.emptyMap; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_CACHE_ENABLED_SETTING; +import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_CACHE_PREWARM_ENABLED_SETTING; import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; @@ -351,7 +353,7 @@ public class SearchableSnapshotDirectoryTests extends ESTestCase { writer.commit(); } - final ThreadPool threadPool = new TestThreadPool(getClass().getSimpleName()); + final ThreadPool threadPool = new TestThreadPool(getTestName(), SearchableSnapshots.executorBuilder()); releasables.add(() -> terminate(threadPool)); final Store store = new Store(shardId, indexSettings, directory, new DummyShardLock(shardId)); @@ -440,10 +442,14 @@ public class SearchableSnapshotDirectoryTests extends ESTestCase { snapshotId, indexId, shardId, - Settings.builder().put(SNAPSHOT_CACHE_ENABLED_SETTING.getKey(), randomBoolean()).build(), + Settings.builder() + .put(SNAPSHOT_CACHE_ENABLED_SETTING.getKey(), randomBoolean()) + .put(SNAPSHOT_CACHE_PREWARM_ENABLED_SETTING.getKey(), randomBoolean()) + .build(), () -> 0L, cacheService, - cacheDir + cacheDir, + threadPool ) ) { final boolean loaded = snapshotDirectory.loadSnapshot(); @@ -514,6 +520,7 @@ public class SearchableSnapshotDirectoryTests extends ESTestCase { final ShardId shardId = new ShardId(new Index("_name", "_id"), 0); final Path cacheDir = createTempDir(); + final ThreadPool threadPool = new TestThreadPool(getTestName(), SearchableSnapshots.executorBuilder()); try ( SearchableSnapshotDirectory directory = new SearchableSnapshotDirectory( () -> blobContainer, @@ -521,10 +528,14 @@ public class SearchableSnapshotDirectoryTests extends ESTestCase { snapshotId, indexId, shardId, - Settings.builder().put(SNAPSHOT_CACHE_ENABLED_SETTING.getKey(), true).build(), + Settings.builder() + .put(SNAPSHOT_CACHE_ENABLED_SETTING.getKey(), true) + .put(SNAPSHOT_CACHE_PREWARM_ENABLED_SETTING.getKey(), randomBoolean()) + .build(), () -> 0L, cacheService, - cacheDir + cacheDir, + threadPool ) ) { @@ -554,6 +565,8 @@ public class SearchableSnapshotDirectoryTests extends ESTestCase { assertListOfFiles(cacheDir, equalTo(0), equalTo(0L)); } } + } finally { + terminate(threadPool); } } } diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CacheFileTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CacheFileTests.java index 331d5ec006e..6ac7442f552 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CacheFileTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CacheFileTests.java @@ -6,8 +6,8 @@ package org.elasticsearch.index.store.cache; import org.apache.lucene.util.SetOnce; -import org.elasticsearch.test.ESTestCase; import org.elasticsearch.index.store.cache.CacheFile.EvictionListener; +import org.elasticsearch.test.ESTestCase; import java.io.IOException; import java.nio.channels.FileChannel; @@ -27,7 +27,7 @@ public class CacheFileTests extends ESTestCase { public void testAcquireAndRelease() throws Exception { final Path file = createTempDir().resolve("file.cache"); - final CacheFile cacheFile = new CacheFile("test", randomLongBetween(1, 100), file, randomIntBetween(1, 100)); + final CacheFile cacheFile = new CacheFile("test", randomLongBetween(1, 100), file); assertThat("Cache file is not acquired: no channel exists", cacheFile.getChannel(), nullValue()); assertThat("Cache file is not acquired: file does not exist", Files.exists(file), is(false)); @@ -70,7 +70,7 @@ public class CacheFileTests extends ESTestCase { public void testCacheFileNotAcquired() throws IOException { final Path file = createTempDir().resolve("file.cache"); - final CacheFile cacheFile = new CacheFile("test", randomLongBetween(1, 100), file, randomIntBetween(1, 100)); + final CacheFile cacheFile = new CacheFile("test", randomLongBetween(1, 100), file); assertThat(Files.exists(file), is(false)); assertThat(cacheFile.getChannel(), nullValue()); @@ -94,7 +94,7 @@ public class CacheFileTests extends ESTestCase { public void testDeleteOnCloseAfterLastRelease() throws Exception { final Path file = createTempDir().resolve("file.cache"); - final CacheFile cacheFile = new CacheFile("test", randomLongBetween(1, 100), file, randomIntBetween(1, 100)); + final CacheFile cacheFile = new CacheFile("test", randomLongBetween(1, 100), file); final List acquiredListeners = new ArrayList<>(); for (int i = 0; i < randomIntBetween(1, 20); i++) { diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CachePreWarmingTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CachePreWarmingTests.java new file mode 100644 index 00000000000..3d39e3d2c0f --- /dev/null +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CachePreWarmingTests.java @@ -0,0 +1,321 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.index.store.cache; + +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field; +import org.apache.lucene.document.NumericDocValuesField; +import org.apache.lucene.document.StringField; +import org.apache.lucene.document.TextField; +import org.apache.lucene.index.IndexCommit; +import org.apache.lucene.index.IndexFileNames; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.SegmentInfos; +import org.apache.lucene.store.Directory; +import org.elasticsearch.Version; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.RepositoryMetadata; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.blobstore.BlobContainer; +import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.lease.Releasables; +import org.elasticsearch.common.lucene.Lucene; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.env.Environment; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.seqno.SequenceNumbers; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; +import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot; +import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo; +import org.elasticsearch.index.store.SearchableSnapshotDirectory; +import org.elasticsearch.index.store.Store; +import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.repositories.blobstore.BlobStoreRepository; +import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil; +import org.elasticsearch.repositories.fs.FsRepository; +import org.elasticsearch.snapshots.SnapshotId; +import org.elasticsearch.snapshots.mockstore.BlobContainerWrapper; +import org.elasticsearch.test.DummyShardLock; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.IndexSettingsModule; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots; +import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService; + +import java.io.FilterInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static java.util.Collections.emptyMap; +import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_CACHE_ENABLED_SETTING; +import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_CACHE_PREWARM_ENABLED_SETTING; +import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.SEARCHABLE_SNAPSHOTS_THREAD_POOL_NAME; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; + +public class CachePreWarmingTests extends ESTestCase { + + public void testCachePreWarming() throws Exception { + final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings( + "_index", + Settings.builder() + .put(IndexMetadata.SETTING_INDEX_UUID, UUIDs.randomBase64UUID(random())) + .put(IndexMetadata.SETTING_VERSION_CREATED, org.elasticsearch.Version.CURRENT) + .build() + ); + final ShardId shardId = new ShardId(indexSettings.getIndex(), randomIntBetween(0, 10)); + final List releasables = new ArrayList<>(); + + try (Directory directory = newDirectory()) { + final IndexWriterConfig indexWriterConfig = newIndexWriterConfig(); + try (IndexWriter writer = new IndexWriter(directory, indexWriterConfig)) { + final int nbDocs = scaledRandomIntBetween(0, 1_000); + final List words = org.elasticsearch.common.collect.List.of( + "the", + "quick", + "brown", + "fox", + "jumps", + "over", + "the", + "lazy", + "dog" + ); + for (int i = 0; i < nbDocs; i++) { + final Document doc = new Document(); + doc.add(new StringField("id", "" + i, Field.Store.YES)); + String text = String.join(" ", randomSubsetOf(randomIntBetween(1, words.size()), words)); + doc.add(new TextField("text", text, Field.Store.YES)); + doc.add(new NumericDocValuesField("rank", i)); + writer.addDocument(doc); + } + if (randomBoolean()) { + writer.flush(); + } + if (randomBoolean()) { + writer.forceMerge(1, true); + } + final Map userData = new HashMap<>(2); + userData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, "0"); + userData.put(Translog.TRANSLOG_UUID_KEY, UUIDs.randomBase64UUID(random())); + writer.setLiveCommitData(userData.entrySet()); + writer.commit(); + } + + final ThreadPool threadPool = new TestThreadPool(getTestName(), SearchableSnapshots.executorBuilder()); + releasables.add(() -> terminate(threadPool)); + + final Store store = new Store(shardId, indexSettings, directory, new DummyShardLock(shardId)); + store.incRef(); + releasables.add(store::decRef); + try { + final SegmentInfos segmentInfos = Lucene.readSegmentInfos(store.directory()); + final IndexCommit indexCommit = Lucene.getIndexCommit(segmentInfos, store.directory()); + + Path repositoryPath = createTempDir(); + Settings.Builder repositorySettings = Settings.builder().put("location", repositoryPath); + boolean compress = randomBoolean(); + if (compress) { + repositorySettings.put("compress", randomBoolean()); + } + if (randomBoolean()) { + repositorySettings.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES); + } + + final String repositoryName = randomAlphaOfLength(10); + final RepositoryMetadata repositoryMetadata = new RepositoryMetadata( + repositoryName, + FsRepository.TYPE, + repositorySettings.build() + ); + + final BlobStoreRepository repository = new FsRepository( + repositoryMetadata, + new Environment( + Settings.builder() + .put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toAbsolutePath()) + .put(Environment.PATH_REPO_SETTING.getKey(), repositoryPath.toAbsolutePath()) + .putList(Environment.PATH_DATA_SETTING.getKey(), tmpPaths()) + .build(), + null + ), + NamedXContentRegistry.EMPTY, + BlobStoreTestUtil.mockClusterService(repositoryMetadata) + ) { + + @Override + protected void assertSnapshotOrGenericThread() { + // eliminate thread name check as we create repo manually on test/main threads + } + }; + repository.start(); + releasables.add(repository::stop); + + final SnapshotId snapshotId = new SnapshotId("_snapshot", UUIDs.randomBase64UUID(random())); + final IndexId indexId = new IndexId(indexSettings.getIndex().getName(), UUIDs.randomBase64UUID(random())); + + final PlainActionFuture future = PlainActionFuture.newFuture(); + threadPool.generic().submit(() -> { + IndexShardSnapshotStatus snapshotStatus = IndexShardSnapshotStatus.newInitializing(null); + repository.snapshotShard( + store, + null, + snapshotId, + indexId, + indexCommit, + null, + snapshotStatus, + Version.CURRENT, + emptyMap(), + future + ); + future.actionGet(); + }); + future.actionGet(); + + final Path cacheDir = createTempDir(); + final CacheService cacheService = new CacheService(Settings.EMPTY); + releasables.add(cacheService); + cacheService.start(); + + final List excludedFromCache = randomSubsetOf(Arrays.asList("fdt", "fdx", "nvd", "dvd", "tip", "cfs", "dim")); + + final Settings restoredIndexSettings = Settings.builder() + .put(SNAPSHOT_CACHE_ENABLED_SETTING.getKey(), true) + .put(SNAPSHOT_CACHE_PREWARM_ENABLED_SETTING.getKey(), true) + .putList(SearchableSnapshots.SNAPSHOT_CACHE_EXCLUDED_FILE_TYPES_SETTING.getKey(), excludedFromCache) + .build(); + + final BlobContainer blobContainer = repository.shardContainer(indexId, shardId.id()); + final BlobStoreIndexShardSnapshot snapshot = repository.loadShardSnapshot(blobContainer, snapshotId); + + final List expectedPrewarmedBlobs = snapshot.indexFiles() + .stream() + .filter(fileInfo -> fileInfo.metadata().hashEqualsContents() == false) + .filter(fileInfo -> excludedFromCache.contains(IndexFileNames.getExtension(fileInfo.physicalName())) == false) + .collect(Collectors.toList()); + + final FilterBlobContainer filterBlobContainer = new FilterBlobContainer(blobContainer); + try ( + SearchableSnapshotDirectory snapshotDirectory = new SearchableSnapshotDirectory( + () -> filterBlobContainer, + () -> snapshot, + snapshotId, + indexId, + shardId, + restoredIndexSettings, + () -> 0L, + cacheService, + cacheDir, + threadPool + ) + ) { + assertThat(filterBlobContainer.totalFilesRead(), equalTo(0L)); + assertThat(filterBlobContainer.totalBytesRead(), equalTo(0L)); + + final boolean loaded = snapshotDirectory.loadSnapshot(); + assertThat("Failed to load snapshot", loaded, is(true)); + + final ExecutorService executor = threadPool.executor(SEARCHABLE_SNAPSHOTS_THREAD_POOL_NAME); + executor.shutdown(); + executor.awaitTermination(30L, TimeUnit.SECONDS); + + assertThat( + filterBlobContainer.totalFilesRead(), + equalTo(expectedPrewarmedBlobs.stream().mapToLong(FileInfo::numberOfParts).sum()) + ); + assertThat( + filterBlobContainer.totalBytesRead(), + equalTo(expectedPrewarmedBlobs.stream().mapToLong(FileInfo::length).sum()) + ); + + for (FileInfo expectedPrewarmedBlob : expectedPrewarmedBlobs) { + for (int part = 0; part < expectedPrewarmedBlob.numberOfParts(); part++) { + String partName = expectedPrewarmedBlob.partName(part); + assertThat(filterBlobContainer.totalBytesRead(partName), equalTo(expectedPrewarmedBlob.partBytes(part))); + } + } + } + } finally { + Releasables.close(releasables); + } + } + } + + private static class FilterBlobContainer extends BlobContainerWrapper { + + private final Map files = new ConcurrentHashMap<>(); + + FilterBlobContainer(BlobContainer delegate) { + super(delegate); + } + + public long totalFilesRead() { + return files.size(); + } + + public long totalBytesRead() { + return files.values().stream().mapToLong(bytesRead -> bytesRead).sum(); + } + + @Nullable + public Long totalBytesRead(String name) { + return files.getOrDefault(name, null); + } + + @Override + public InputStream readBlob(String blobName, long position, long length) throws IOException { + return new FilterInputStream(super.readBlob(blobName, position, length)) { + long bytesRead = 0L; + + @Override + public int read() throws IOException { + final int result = in.read(); + if (result == -1) { + return result; + } + bytesRead += 1L; + return result; + } + + @Override + public int read(byte[] b, int offset, int len) throws IOException { + final int result = in.read(b, offset, len); + if (result == -1) { + return result; + } + bytesRead += len; + return result; + } + + @Override + public void close() throws IOException { + files.put(blobName, bytesRead); + super.close(); + } + }; + } + } +} diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInputTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInputTests.java index e3a6acecb70..0b6cd065417 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInputTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInputTests.java @@ -87,7 +87,8 @@ public class CachedBlobContainerIndexInputTests extends ESIndexInputTestCase { Settings.builder().put(SNAPSHOT_CACHE_ENABLED_SETTING.getKey(), true).build(), () -> 0L, cacheService, - cacheDir + cacheDir, + null ) ) { final boolean loaded = directory.loadSnapshot(); @@ -156,7 +157,8 @@ public class CachedBlobContainerIndexInputTests extends ESIndexInputTestCase { Settings.EMPTY, () -> 0L, cacheService, - cacheDir + cacheDir, + null ) ) { final boolean loaded = searchableSnapshotDirectory.loadSnapshot(); diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java index 8ea9cb531e5..194428f79e3 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java @@ -126,6 +126,9 @@ public class SearchableSnapshotsIntegTests extends BaseSearchableSnapshotsIntegT Settings.Builder indexSettingsBuilder = Settings.builder() .put(SearchableSnapshots.SNAPSHOT_CACHE_ENABLED_SETTING.getKey(), cacheEnabled) .put(IndexSettings.INDEX_CHECK_ON_STARTUP.getKey(), Boolean.FALSE.toString()); + if (cacheEnabled) { + indexSettingsBuilder.put(SearchableSnapshots.SNAPSHOT_CACHE_PREWARM_ENABLED_SETTING.getKey(), randomBoolean()); + } final List nonCachedExtensions; if (randomBoolean()) { nonCachedExtensions = randomSubsetOf(Arrays.asList("fdt", "fdx", "nvd", "dvd", "tip", "cfs", "dim"));