Allow to prewarm the cache for searchable snapshot shards (#55322)

Relates #50999
This commit is contained in:
Tanguy Leroux 2020-04-24 17:47:50 +02:00
parent 75f8d8bfd3
commit 41ddbd4188
13 changed files with 606 additions and 51 deletions

View File

@ -34,4 +34,6 @@ public class SearchableSnapshotsConstants {
return SEARCHABLE_SNAPSHOTS_FEATURE_ENABLED return SEARCHABLE_SNAPSHOTS_FEATURE_ENABLED
&& SNAPSHOT_DIRECTORY_FACTORY_KEY.equals(INDEX_STORE_TYPE_SETTING.get(indexSettings)); && SNAPSHOT_DIRECTORY_FACTORY_KEY.equals(INDEX_STORE_TYPE_SETTING.get(indexSettings));
} }
public static final String SEARCHABLE_SNAPSHOTS_THREAD_POOL_NAME = "searchable_snapshots";
} }

View File

@ -12,6 +12,7 @@ import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo; import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo;
import org.elasticsearch.index.snapshots.blobstore.SlicedInputStream; import org.elasticsearch.index.snapshots.blobstore.SlicedInputStream;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
@ -136,6 +137,9 @@ public abstract class BaseSearchableSnapshotIndexInput extends BufferedIndexInpu
|| threadName.contains('[' + ThreadPool.Names.SEARCH + ']') || threadName.contains('[' + ThreadPool.Names.SEARCH + ']')
|| threadName.contains('[' + ThreadPool.Names.SEARCH_THROTTLED + ']') || threadName.contains('[' + ThreadPool.Names.SEARCH_THROTTLED + ']')
// Cache prewarming runs on a dedicated thread pool.
|| threadName.contains('[' + SearchableSnapshotsConstants.SEARCHABLE_SNAPSHOTS_THREAD_POOL_NAME + ']')
// Today processExistingRecoveries considers all shards and constructs a shard store snapshot on this thread, this needs // Today processExistingRecoveries considers all shards and constructs a shard store snapshot on this thread, this needs
// addressing. TODO NORELEASE // addressing. TODO NORELEASE
|| threadName.contains('[' + ThreadPool.Names.FETCH_SHARD_STORE + ']') || threadName.contains('[' + ThreadPool.Names.FETCH_SHARD_STORE + ']')

View File

@ -5,6 +5,9 @@
*/ */
package org.elasticsearch.index.store; package org.elasticsearch.index.store;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.index.IndexFileNames; import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.store.BaseDirectory; import org.apache.lucene.store.BaseDirectory;
import org.apache.lucene.store.Directory; import org.apache.lucene.store.Directory;
@ -18,8 +21,12 @@ import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.lucene.store.ByteArrayIndexInput; import org.elasticsearch.common.lucene.store.ByteArrayIndexInput;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.LazyInitializable; import org.elasticsearch.common.util.LazyInitializable;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardPath; import org.elasticsearch.index.shard.ShardPath;
@ -47,18 +54,22 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.LongSupplier; import java.util.function.LongSupplier;
import java.util.function.Supplier; import java.util.function.Supplier;
import java.util.stream.Collectors;
import static org.apache.lucene.store.BufferedIndexInput.bufferSize; import static org.apache.lucene.store.BufferedIndexInput.bufferSize;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_CACHE_ENABLED_SETTING; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_CACHE_ENABLED_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_CACHE_EXCLUDED_FILE_TYPES_SETTING; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_CACHE_EXCLUDED_FILE_TYPES_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_CACHE_PREWARM_ENABLED_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_ID_SETTING; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_ID_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_REPOSITORY_SETTING; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_REPOSITORY_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_ID_SETTING; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_ID_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_NAME_SETTING; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_NAME_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_UNCACHED_CHUNK_SIZE_SETTING; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_UNCACHED_CHUNK_SIZE_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.SEARCHABLE_SNAPSHOTS_THREAD_POOL_NAME;
/** /**
* Implementation of {@link Directory} that exposes files from a snapshot as a Lucene directory. Because snapshot are immutable this * Implementation of {@link Directory} that exposes files from a snapshot as a Lucene directory. Because snapshot are immutable this
@ -73,6 +84,8 @@ import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SN
*/ */
public class SearchableSnapshotDirectory extends BaseDirectory { public class SearchableSnapshotDirectory extends BaseDirectory {
private static final Logger logger = LogManager.getLogger(SearchableSnapshotDirectory.class);
private final Supplier<BlobContainer> blobContainerSupplier; private final Supplier<BlobContainer> blobContainerSupplier;
private final Supplier<BlobStoreIndexShardSnapshot> snapshotSupplier; private final Supplier<BlobStoreIndexShardSnapshot> snapshotSupplier;
private final SnapshotId snapshotId; private final SnapshotId snapshotId;
@ -80,8 +93,10 @@ public class SearchableSnapshotDirectory extends BaseDirectory {
private final ShardId shardId; private final ShardId shardId;
private final LongSupplier statsCurrentTimeNanosSupplier; private final LongSupplier statsCurrentTimeNanosSupplier;
private final Map<String, IndexInputStats> stats; private final Map<String, IndexInputStats> stats;
private final ThreadPool threadPool;
private final CacheService cacheService; private final CacheService cacheService;
private final boolean useCache; private final boolean useCache;
private final boolean prewarmCache;
private final Set<String> excludedFileTypes; private final Set<String> excludedFileTypes;
private final long uncachedChunkSize; // if negative use BlobContainer#readBlobPreferredLength, see #getUncachedChunkSize() private final long uncachedChunkSize; // if negative use BlobContainer#readBlobPreferredLength, see #getUncachedChunkSize()
private final Path cacheDir; private final Path cacheDir;
@ -101,7 +116,8 @@ public class SearchableSnapshotDirectory extends BaseDirectory {
Settings indexSettings, Settings indexSettings,
LongSupplier currentTimeNanosSupplier, LongSupplier currentTimeNanosSupplier,
CacheService cacheService, CacheService cacheService,
Path cacheDir Path cacheDir,
ThreadPool threadPool
) { ) {
super(new SingleInstanceLockFactory()); super(new SingleInstanceLockFactory());
this.snapshotSupplier = Objects.requireNonNull(snapshot); this.snapshotSupplier = Objects.requireNonNull(snapshot);
@ -115,8 +131,10 @@ public class SearchableSnapshotDirectory extends BaseDirectory {
this.cacheDir = Objects.requireNonNull(cacheDir); this.cacheDir = Objects.requireNonNull(cacheDir);
this.closed = new AtomicBoolean(false); this.closed = new AtomicBoolean(false);
this.useCache = SNAPSHOT_CACHE_ENABLED_SETTING.get(indexSettings); this.useCache = SNAPSHOT_CACHE_ENABLED_SETTING.get(indexSettings);
this.prewarmCache = useCache ? SNAPSHOT_CACHE_PREWARM_ENABLED_SETTING.get(indexSettings) : false;
this.excludedFileTypes = new HashSet<>(SNAPSHOT_CACHE_EXCLUDED_FILE_TYPES_SETTING.get(indexSettings)); this.excludedFileTypes = new HashSet<>(SNAPSHOT_CACHE_EXCLUDED_FILE_TYPES_SETTING.get(indexSettings));
this.uncachedChunkSize = SNAPSHOT_UNCACHED_CHUNK_SIZE_SETTING.get(indexSettings).getBytes(); this.uncachedChunkSize = SNAPSHOT_UNCACHED_CHUNK_SIZE_SETTING.get(indexSettings).getBytes();
this.threadPool = threadPool;
this.loaded = false; this.loaded = false;
assert invariant(); assert invariant();
} }
@ -142,6 +160,7 @@ public class SearchableSnapshotDirectory extends BaseDirectory {
* @return true if the snapshot was loaded by executing this method, false otherwise * @return true if the snapshot was loaded by executing this method, false otherwise
*/ */
public boolean loadSnapshot() { public boolean loadSnapshot() {
assert assertCurrentThreadMayLoadSnapshot();
boolean alreadyLoaded = this.loaded; boolean alreadyLoaded = this.loaded;
if (alreadyLoaded == false) { if (alreadyLoaded == false) {
synchronized (this) { synchronized (this) {
@ -150,10 +169,10 @@ public class SearchableSnapshotDirectory extends BaseDirectory {
this.blobContainer = blobContainerSupplier.get(); this.blobContainer = blobContainerSupplier.get();
this.snapshot = snapshotSupplier.get(); this.snapshot = snapshotSupplier.get();
this.loaded = true; this.loaded = true;
prewarmCache();
} }
} }
} }
assert assertCurrentThreadMayLoadSnapshot();
assert invariant(); assert invariant();
return alreadyLoaded == false; return alreadyLoaded == false;
} }
@ -300,7 +319,7 @@ public class SearchableSnapshotDirectory extends BaseDirectory {
final IndexInputStats inputStats = stats.computeIfAbsent(name, n -> createIndexInputStats(fileInfo.length())); final IndexInputStats inputStats = stats.computeIfAbsent(name, n -> createIndexInputStats(fileInfo.length()));
if (useCache && isExcludedFromCache(name) == false) { if (useCache && isExcludedFromCache(name) == false) {
return new CachedBlobContainerIndexInput(this, fileInfo, context, inputStats); return new CachedBlobContainerIndexInput(this, fileInfo, context, inputStats, cacheService.getRangeSize());
} else { } else {
return new DirectBlobContainerIndexInput( return new DirectBlobContainerIndexInput(
blobContainer(), blobContainer(),
@ -331,12 +350,86 @@ public class SearchableSnapshotDirectory extends BaseDirectory {
return this.getClass().getSimpleName() + "@snapshotId=" + snapshotId + " lockFactory=" + lockFactory; return this.getClass().getSimpleName() + "@snapshotId=" + snapshotId + " lockFactory=" + lockFactory;
} }
private void prewarmCache() {
if (prewarmCache) {
final List<BlobStoreIndexShardSnapshot.FileInfo> cacheFiles = snapshot().indexFiles()
.stream()
.filter(file -> file.metadata().hashEqualsContents() == false)
.filter(file -> isExcludedFromCache(file.physicalName()) == false)
.collect(Collectors.toList());
final Executor executor = threadPool.executor(SEARCHABLE_SNAPSHOTS_THREAD_POOL_NAME);
logger.debug("{} warming shard cache for [{}] files", shardId, cacheFiles.size());
for (BlobStoreIndexShardSnapshot.FileInfo cacheFile : cacheFiles) {
final String fileName = cacheFile.physicalName();
try {
final IndexInput input = openInput(fileName, CachedBlobContainerIndexInput.CACHE_WARMING_CONTEXT);
assert input instanceof CachedBlobContainerIndexInput : "expected cached index input but got " + input.getClass();
final long numberOfParts = cacheFile.numberOfParts();
final CountDown countDown = new CountDown(Math.toIntExact(numberOfParts));
for (long p = 0; p < numberOfParts; p++) {
final int part = Math.toIntExact(p);
// TODO use multiple workers to warm each part instead of filling the thread pool
executor.execute(new AbstractRunnable() {
@Override
protected void doRun() throws Exception {
ensureOpen();
logger.trace("warming cache for [{}] part [{}/{}]", fileName, part, numberOfParts);
final long startTimeInNanos = statsCurrentTimeNanosSupplier.getAsLong();
final CachedBlobContainerIndexInput cachedIndexInput = (CachedBlobContainerIndexInput) input.clone();
final int bytesRead = cachedIndexInput.prefetchPart(part); // TODO does not include any rate limitation
assert bytesRead == cacheFile.partBytes(part);
logger.trace(
() -> new ParameterizedMessage(
"part [{}/{}] of [{}] warmed in [{}] ms",
part,
numberOfParts,
fileName,
TimeValue.timeValueNanos(statsCurrentTimeNanosSupplier.getAsLong() - startTimeInNanos).millis()
)
);
}
@Override
public void onFailure(Exception e) {
logger.trace(
() -> new ParameterizedMessage(
"failed to warm cache for [{}] part [{}/{}]",
fileName,
part,
numberOfParts
),
e
);
}
@Override
public void onAfter() {
if (countDown.countDown()) {
IOUtils.closeWhileHandlingException(input);
}
}
});
}
} catch (IOException e) {
logger.trace(() -> new ParameterizedMessage("failed to warm cache for [{}]", fileName), e);
}
}
}
}
public static Directory create( public static Directory create(
RepositoriesService repositories, RepositoriesService repositories,
CacheService cache, CacheService cache,
IndexSettings indexSettings, IndexSettings indexSettings,
ShardPath shardPath, ShardPath shardPath,
LongSupplier currentTimeNanosSupplier LongSupplier currentTimeNanosSupplier,
ThreadPool threadPool
) throws IOException { ) throws IOException {
final Repository repository = repositories.repository(SNAPSHOT_REPOSITORY_SETTING.get(indexSettings.getSettings())); final Repository repository = repositories.repository(SNAPSHOT_REPOSITORY_SETTING.get(indexSettings.getSettings()));
@ -371,7 +464,8 @@ public class SearchableSnapshotDirectory extends BaseDirectory {
indexSettings.getSettings(), indexSettings.getSettings(),
currentTimeNanosSupplier, currentTimeNanosSupplier,
cache, cache,
cacheDir cacheDir,
threadPool
) )
); );
} }

View File

@ -51,7 +51,6 @@ public class CacheFile {
private final ReleasableLock readLock; private final ReleasableLock readLock;
private final SparseFileTracker tracker; private final SparseFileTracker tracker;
private final int rangeSize;
private final String description; private final String description;
private final Path file; private final Path file;
@ -61,12 +60,11 @@ public class CacheFile {
@Nullable // if evicted, or there are no listeners @Nullable // if evicted, or there are no listeners
private volatile FileChannel channel; private volatile FileChannel channel;
public CacheFile(String description, long length, Path file, int rangeSize) { public CacheFile(String description, long length, Path file) {
this.tracker = new SparseFileTracker(file.toString(), length); this.tracker = new SparseFileTracker(file.toString(), length);
this.description = Objects.requireNonNull(description); this.description = Objects.requireNonNull(description);
this.file = Objects.requireNonNull(file); this.file = Objects.requireNonNull(file);
this.listeners = new HashSet<>(); this.listeners = new HashSet<>();
this.rangeSize = rangeSize;
this.evicted = false; this.evicted = false;
final ReentrantReadWriteLock cacheLock = new ReentrantReadWriteLock(); final ReentrantReadWriteLock cacheLock = new ReentrantReadWriteLock();
@ -249,41 +247,35 @@ public class CacheFile {
} }
CompletableFuture<Integer> fetchRange( CompletableFuture<Integer> fetchRange(
long position, long start,
long end,
CheckedBiFunction<Long, Long, Integer, IOException> onRangeAvailable, CheckedBiFunction<Long, Long, Integer, IOException> onRangeAvailable,
CheckedBiConsumer<Long, Long, IOException> onRangeMissing CheckedBiConsumer<Long, Long, IOException> onRangeMissing
) { ) {
final CompletableFuture<Integer> future = new CompletableFuture<>(); final CompletableFuture<Integer> future = new CompletableFuture<>();
try { try {
if (position < 0 || position > tracker.getLength()) { if (start < 0 || start > tracker.getLength() || start > end || end > tracker.getLength()) {
throw new IllegalArgumentException("Wrong read position [" + position + "]"); throw new IllegalArgumentException(
"Invalid range [start=" + start + ", end=" + end + "] for length [" + tracker.getLength() + ']'
);
} }
ensureOpen(); ensureOpen();
final long rangeStart = (position / rangeSize) * rangeSize;
final long rangeEnd = Math.min(rangeStart + rangeSize, tracker.getLength());
final List<SparseFileTracker.Gap> gaps = tracker.waitForRange( final List<SparseFileTracker.Gap> gaps = tracker.waitForRange(
rangeStart, start,
rangeEnd, end,
ActionListener.wrap( ActionListener.wrap(
rangeReady -> future.complete(onRangeAvailable.apply(rangeStart, rangeEnd)), rangeReady -> future.complete(onRangeAvailable.apply(start, end)),
rangeFailure -> future.completeExceptionally(rangeFailure) rangeFailure -> future.completeExceptionally(rangeFailure)
) )
); );
if (gaps.size() > 0) { for (SparseFileTracker.Gap gap : gaps) {
final SparseFileTracker.Gap range = gaps.get(0);
assert gaps.size() == 1 : "expected 1 range to fetch but got " + gaps.size();
assert range.start == rangeStart : "range/gap start mismatch (" + range.start + ',' + rangeStart + ')';
assert range.end == rangeEnd : "range/gap end mismatch (" + range.end + ',' + rangeEnd + ')';
try { try {
ensureOpen(); ensureOpen();
onRangeMissing.accept(rangeStart, rangeEnd); onRangeMissing.accept(gap.start, gap.end);
range.onResponse(null); gap.onResponse(null);
} catch (Exception e) { } catch (Exception e) {
range.onFailure(e); gap.onFailure(e);
} }
} }
} catch (Exception e) { } catch (Exception e) {

View File

@ -14,6 +14,7 @@ import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexInput;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.Channels; import org.elasticsearch.common.io.Channels;
import org.elasticsearch.common.util.concurrent.ReleasableLock; import org.elasticsearch.common.util.concurrent.ReleasableLock;
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo; import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo;
@ -28,14 +29,24 @@ import java.nio.ByteBuffer;
import java.nio.channels.FileChannel; import java.nio.channels.FileChannel;
import java.util.Locale; import java.util.Locale;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.stream.IntStream;
public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexInput { public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexInput {
/**
* Specific IOContext used for prewarming the cache. This context allows to write
* a complete part of the {@link #fileInfo} at once in the cache and should not be
* used for anything else than what the {@link #prefetchPart(int)} method does.
*/
public static final IOContext CACHE_WARMING_CONTEXT = new IOContext();
private static final Logger logger = LogManager.getLogger(CachedBlobContainerIndexInput.class); private static final Logger logger = LogManager.getLogger(CachedBlobContainerIndexInput.class);
private static final int COPY_BUFFER_SIZE = 8192; private static final int COPY_BUFFER_SIZE = 8192;
private final SearchableSnapshotDirectory directory; private final SearchableSnapshotDirectory directory;
private final CacheFileReference cacheFileReference; private final CacheFileReference cacheFileReference;
private final int defaultRangeSize;
// last read position is kept around in order to detect (non)contiguous reads for stats // last read position is kept around in order to detect (non)contiguous reads for stats
private long lastReadPosition; private long lastReadPosition;
@ -46,7 +57,8 @@ public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn
SearchableSnapshotDirectory directory, SearchableSnapshotDirectory directory,
FileInfo fileInfo, FileInfo fileInfo,
IOContext context, IOContext context,
IndexInputStats stats IndexInputStats stats,
int rangeSize
) { ) {
this( this(
"CachedBlobContainerIndexInput(" + fileInfo.physicalName() + ")", "CachedBlobContainerIndexInput(" + fileInfo.physicalName() + ")",
@ -56,7 +68,8 @@ public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn
stats, stats,
0L, 0L,
fileInfo.length(), fileInfo.length(),
new CacheFileReference(directory, fileInfo.physicalName(), fileInfo.length()) new CacheFileReference(directory, fileInfo.physicalName(), fileInfo.length()),
rangeSize
); );
stats.incrementOpenCount(); stats.incrementOpenCount();
} }
@ -69,13 +82,15 @@ public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn
IndexInputStats stats, IndexInputStats stats,
long offset, long offset,
long length, long length,
CacheFileReference cacheFileReference CacheFileReference cacheFileReference,
int rangeSize
) { ) {
super(resourceDesc, directory.blobContainer(), fileInfo, context, stats, offset, length); super(resourceDesc, directory.blobContainer(), fileInfo, context, stats, offset, length);
this.directory = directory; this.directory = directory;
this.cacheFileReference = cacheFileReference; this.cacheFileReference = cacheFileReference;
this.lastReadPosition = this.offset; this.lastReadPosition = this.offset;
this.lastSeekPosition = this.offset; this.lastSeekPosition = this.offset;
this.defaultRangeSize = rangeSize;
} }
@Override @Override
@ -85,8 +100,35 @@ public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn
} }
} }
private void ensureContext(Predicate<IOContext> predicate) throws IOException {
if (predicate.test(context) == false) {
assert false : "this method should not be used with this context " + context;
throw new IOException("Cannot read the index input using context [context=" + context + ", input=" + this + ']');
}
}
private long getDefaultRangeSize() {
return (context != CACHE_WARMING_CONTEXT) ? defaultRangeSize : fileInfo.partSize().getBytes();
}
private Tuple<Long, Long> computeRange(long position) {
final long rangeSize = getDefaultRangeSize();
long start = (position / rangeSize) * rangeSize;
long end = Math.min(start + rangeSize, fileInfo.length());
return Tuple.tuple(start, end);
}
private CacheFile getCacheFileSafe() throws Exception {
final CacheFile cacheFile = cacheFileReference.get();
if (cacheFile == null) {
throw new AlreadyClosedException("Failed to acquire a non-evicted cache file");
}
return cacheFile;
}
@Override @Override
protected void readInternal(final byte[] buffer, final int offset, final int length) throws IOException { protected void readInternal(final byte[] buffer, final int offset, final int length) throws IOException {
ensureContext(ctx -> ctx != CACHE_WARMING_CONTEXT);
final long position = getFilePointer() + this.offset; final long position = getFilePointer() + this.offset;
int totalBytesRead = 0; int totalBytesRead = 0;
@ -97,14 +139,12 @@ public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn
int bytesRead = 0; int bytesRead = 0;
try { try {
final CacheFile cacheFile = cacheFileReference.get(); final CacheFile cacheFile = getCacheFileSafe();
if (cacheFile == null) {
throw new AlreadyClosedException("Failed to acquire a non-evicted cache file");
}
try (ReleasableLock ignored = cacheFile.fileLock()) { try (ReleasableLock ignored = cacheFile.fileLock()) {
final Tuple<Long, Long> range = computeRange(pos);
bytesRead = cacheFile.fetchRange( bytesRead = cacheFile.fetchRange(
pos, range.v1(),
range.v2(),
(start, end) -> readCacheFile(cacheFile.getChannel(), end, pos, buffer, off, len), (start, end) -> readCacheFile(cacheFile.getChannel(), end, pos, buffer, off, len),
(start, end) -> writeCacheFile(cacheFile.getChannel(), start, end) (start, end) -> writeCacheFile(cacheFile.getChannel(), start, end)
).get(); ).get();
@ -131,6 +171,50 @@ public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn
lastSeekPosition = lastReadPosition; lastSeekPosition = lastReadPosition;
} }
/**
* Prefetches a complete part and writes it in cache. This method is used to prewarm the cache.
*/
public int prefetchPart(final int part) throws IOException {
ensureContext(ctx -> ctx == CACHE_WARMING_CONTEXT);
if (part >= fileInfo.numberOfParts()) {
throw new IllegalArgumentException("Unexpected part number [" + part + "]");
}
final Tuple<Long, Long> range = computeRange(IntStream.range(0, part).mapToLong(fileInfo::partBytes).sum());
assert assertRangeIsAlignedWithPart(range);
try {
final CacheFile cacheFile = getCacheFileSafe();
try (ReleasableLock ignored = cacheFile.fileLock()) {
final int bytesRead = cacheFile.fetchRange(range.v1(), range.v2(), (start, end) -> {
logger.trace("range [{}-{}] of file [{}] is now available in cache", start, end, fileInfo.physicalName());
return Math.toIntExact(end - start);
}, (start, end) -> writeCacheFile(cacheFile.getChannel(), start, end)).get();
assert bytesRead == (range.v2() - range.v1());
return bytesRead;
}
} catch (final Exception e) {
throw new IOException("Failed to prefetch file part in cache", e);
}
}
/**
* Asserts that the range of bytes to warm in cache is aligned with {@link #fileInfo}'s part size.
*/
private boolean assertRangeIsAlignedWithPart(Tuple<Long, Long> range) {
if (fileInfo.numberOfParts() == 1L) {
final long length = fileInfo.length();
assert range.v1() == 0L : "start of range [" + range.v1() + "] is not aligned with zero";
assert range.v2() == length : "end of range [" + range.v2() + "] is not aligned with file length [" + length + ']';
} else {
final long length = fileInfo.partSize().getBytes();
assert range.v1() % length == 0L : "start of range [" + range.v1() + "] is not aligned with part start";
assert range.v2() % length == 0L || (range.v2() == fileInfo.length()) : "end of range ["
+ range.v2()
+ "] is not aligned with part end or with file length";
}
return true;
}
private int readCacheFile(FileChannel fc, long end, long position, byte[] buffer, int offset, long length) throws IOException { private int readCacheFile(FileChannel fc, long end, long position, byte[] buffer, int offset, long length) throws IOException {
assert assertFileChannelOpen(fc); assert assertFileChannelOpen(fc);
int bytesRead = Channels.readFromFileChannel(fc, position, buffer, offset, Math.toIntExact(Math.min(length, end - position))); int bytesRead = Channels.readFromFileChannel(fc, position, buffer, offset, Math.toIntExact(Math.min(length, end - position)));
@ -214,7 +298,8 @@ public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn
stats, stats,
this.offset + offset, this.offset + offset,
length, length,
cacheFileReference cacheFileReference,
defaultRangeSize
); );
slice.isClone = true; slice.isClone = true;
return slice; return slice;
@ -231,6 +316,8 @@ public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn
+ length() + length()
+ ", position=" + ", position="
+ getFilePointer() + getFilePointer()
+ ", rangeSize="
+ getDefaultRangeSize()
+ '}'; + '}';
} }

View File

@ -21,6 +21,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter; import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment; import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.NodeEnvironment;
@ -42,6 +43,8 @@ import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestHandler; import org.elasticsearch.rest.RestHandler;
import org.elasticsearch.script.ScriptService; import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.ExecutorBuilder;
import org.elasticsearch.threadpool.ScalingExecutorBuilder;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.ResourceWatcherService; import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotAction; import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotAction;
@ -68,6 +71,7 @@ import java.util.function.Supplier;
import static java.util.Collections.emptyList; import static java.util.Collections.emptyList;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.SEARCHABLE_SNAPSHOTS_FEATURE_ENABLED; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.SEARCHABLE_SNAPSHOTS_FEATURE_ENABLED;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.SEARCHABLE_SNAPSHOTS_THREAD_POOL_NAME;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.SNAPSHOT_DIRECTORY_FACTORY_KEY; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.SNAPSHOT_DIRECTORY_FACTORY_KEY;
/** /**
@ -100,6 +104,11 @@ public class SearchableSnapshots extends Plugin implements IndexStorePlugin, Eng
true, true,
Setting.Property.IndexScope Setting.Property.IndexScope
); );
public static final Setting<Boolean> SNAPSHOT_CACHE_PREWARM_ENABLED_SETTING = Setting.boolSetting(
"index.store.snapshot.cache.prewarm.enabled",
false,
Setting.Property.IndexScope
);
// The file extensions that are excluded from the cache // The file extensions that are excluded from the cache
public static final Setting<List<String>> SNAPSHOT_CACHE_EXCLUDED_FILE_TYPES_SETTING = Setting.listSetting( public static final Setting<List<String>> SNAPSHOT_CACHE_EXCLUDED_FILE_TYPES_SETTING = Setting.listSetting(
"index.store.snapshot.cache.excluded_file_types", "index.store.snapshot.cache.excluded_file_types",
@ -117,6 +126,7 @@ public class SearchableSnapshots extends Plugin implements IndexStorePlugin, Eng
private volatile Supplier<RepositoriesService> repositoriesServiceSupplier; private volatile Supplier<RepositoriesService> repositoriesServiceSupplier;
private final SetOnce<CacheService> cacheService = new SetOnce<>(); private final SetOnce<CacheService> cacheService = new SetOnce<>();
private final SetOnce<ThreadPool> threadPool = new SetOnce<>();
private final Settings settings; private final Settings settings;
public SearchableSnapshots(final Settings settings) { public SearchableSnapshots(final Settings settings) {
@ -138,6 +148,7 @@ public class SearchableSnapshots extends Plugin implements IndexStorePlugin, Eng
SNAPSHOT_SNAPSHOT_ID_SETTING, SNAPSHOT_SNAPSHOT_ID_SETTING,
SNAPSHOT_INDEX_ID_SETTING, SNAPSHOT_INDEX_ID_SETTING,
SNAPSHOT_CACHE_ENABLED_SETTING, SNAPSHOT_CACHE_ENABLED_SETTING,
SNAPSHOT_CACHE_PREWARM_ENABLED_SETTING,
SNAPSHOT_CACHE_EXCLUDED_FILE_TYPES_SETTING, SNAPSHOT_CACHE_EXCLUDED_FILE_TYPES_SETTING,
SNAPSHOT_UNCACHED_CHUNK_SIZE_SETTING, SNAPSHOT_UNCACHED_CHUNK_SIZE_SETTING,
CacheService.SNAPSHOT_CACHE_SIZE_SETTING, CacheService.SNAPSHOT_CACHE_SIZE_SETTING,
@ -166,6 +177,7 @@ public class SearchableSnapshots extends Plugin implements IndexStorePlugin, Eng
final CacheService cacheService = new CacheService(settings); final CacheService cacheService = new CacheService(settings);
this.cacheService.set(cacheService); this.cacheService.set(cacheService);
this.repositoriesServiceSupplier = repositoriesServiceSupplier; this.repositoriesServiceSupplier = repositoriesServiceSupplier;
this.threadPool.set(threadPool);
return org.elasticsearch.common.collect.List.of(cacheService); return org.elasticsearch.common.collect.List.of(cacheService);
} else { } else {
this.repositoriesServiceSupplier = () -> { this.repositoriesServiceSupplier = () -> {
@ -191,7 +203,9 @@ public class SearchableSnapshots extends Plugin implements IndexStorePlugin, Eng
assert repositories != null; assert repositories != null;
final CacheService cache = cacheService.get(); final CacheService cache = cacheService.get();
assert cache != null; assert cache != null;
return SearchableSnapshotDirectory.create(repositories, cache, indexSettings, shardPath, System::nanoTime); final ThreadPool threadPool = this.threadPool.get();
assert threadPool != null;
return SearchableSnapshotDirectory.create(repositories, cache, indexSettings, shardPath, System::nanoTime, threadPool);
}); });
} else { } else {
return org.elasticsearch.common.collect.Map.of(); return org.elasticsearch.common.collect.Map.of();
@ -253,4 +267,21 @@ public class SearchableSnapshots extends Plugin implements IndexStorePlugin, Eng
} }
} }
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
if (SEARCHABLE_SNAPSHOTS_FEATURE_ENABLED) {
return org.elasticsearch.common.collect.List.of(executorBuilder());
} else {
return org.elasticsearch.common.collect.List.of();
}
}
public static ExecutorBuilder<?> executorBuilder() {
return new ScalingExecutorBuilder(
SEARCHABLE_SNAPSHOTS_THREAD_POOL_NAME,
0,
32,
TimeValue.timeValueSeconds(30L),
"xpack.searchable_snapshots.thread_pool"
);
}
} }

View File

@ -112,7 +112,7 @@ public class CacheService extends AbstractLifecycleComponent {
final Path path = cacheDir.resolve(uuid); final Path path = cacheDir.resolve(uuid);
assert Files.notExists(path) : "cache file already exists " + path; assert Files.notExists(path) : "cache file already exists " + path;
return new CacheFile(key.toString(), fileLength, path, getRangeSize()); return new CacheFile(key.toString(), fileLength, path);
}); });
} }

View File

@ -22,6 +22,8 @@ import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.F
import org.elasticsearch.index.store.cache.TestUtils; import org.elasticsearch.index.store.cache.TestUtils;
import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService; import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService;
import java.io.IOException; import java.io.IOException;
@ -571,6 +573,7 @@ public class SearchableSnapshotDirectoryStatsTests extends ESIndexInputTestCase
final ShardId shardId = new ShardId("_name", "_uuid", 0); final ShardId shardId = new ShardId("_name", "_uuid", 0);
final AtomicLong fakeClock = new AtomicLong(); final AtomicLong fakeClock = new AtomicLong();
final LongSupplier statsCurrentTimeNanos = () -> fakeClock.addAndGet(FAKE_CLOCK_ADVANCE_NANOS); final LongSupplier statsCurrentTimeNanos = () -> fakeClock.addAndGet(FAKE_CLOCK_ADVANCE_NANOS);
final ThreadPool threadPool = new TestThreadPool(getTestClass().getSimpleName());
final Long seekingThreshold = randomBoolean() ? randomLongBetween(1L, fileContent.length) : null; final Long seekingThreshold = randomBoolean() ? randomLongBetween(1L, fileContent.length) : null;
@ -591,7 +594,8 @@ public class SearchableSnapshotDirectoryStatsTests extends ESIndexInputTestCase
indexSettings, indexSettings,
statsCurrentTimeNanos, statsCurrentTimeNanos,
cacheService, cacheService,
createTempDir() createTempDir(),
threadPool
) { ) {
@Override @Override
protected IndexInputStats createIndexInputStats(long fileLength) { protected IndexInputStats createIndexInputStats(long fileLength) {
@ -611,6 +615,8 @@ public class SearchableSnapshotDirectoryStatsTests extends ESIndexInputTestCase
assertThat("BlobContainer should be loaded", directory.blobContainer(), notNullValue()); assertThat("BlobContainer should be loaded", directory.blobContainer(), notNullValue());
test.apply(fileName, fileContent, directory); test.apply(fileName, fileContent, directory);
} finally {
terminate(threadPool);
} }
} }
} }

View File

@ -70,6 +70,7 @@ import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.IndexSettingsModule; import org.elasticsearch.test.IndexSettingsModule;
import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots;
import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService; import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService;
import org.hamcrest.Matcher; import org.hamcrest.Matcher;
@ -93,6 +94,7 @@ import java.util.Map;
import static java.util.Arrays.asList; import static java.util.Arrays.asList;
import static java.util.Collections.emptyMap; import static java.util.Collections.emptyMap;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_CACHE_ENABLED_SETTING; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_CACHE_ENABLED_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_CACHE_PREWARM_ENABLED_SETTING;
import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThan;
@ -351,7 +353,7 @@ public class SearchableSnapshotDirectoryTests extends ESTestCase {
writer.commit(); writer.commit();
} }
final ThreadPool threadPool = new TestThreadPool(getClass().getSimpleName()); final ThreadPool threadPool = new TestThreadPool(getTestName(), SearchableSnapshots.executorBuilder());
releasables.add(() -> terminate(threadPool)); releasables.add(() -> terminate(threadPool));
final Store store = new Store(shardId, indexSettings, directory, new DummyShardLock(shardId)); final Store store = new Store(shardId, indexSettings, directory, new DummyShardLock(shardId));
@ -440,10 +442,14 @@ public class SearchableSnapshotDirectoryTests extends ESTestCase {
snapshotId, snapshotId,
indexId, indexId,
shardId, shardId,
Settings.builder().put(SNAPSHOT_CACHE_ENABLED_SETTING.getKey(), randomBoolean()).build(), Settings.builder()
.put(SNAPSHOT_CACHE_ENABLED_SETTING.getKey(), randomBoolean())
.put(SNAPSHOT_CACHE_PREWARM_ENABLED_SETTING.getKey(), randomBoolean())
.build(),
() -> 0L, () -> 0L,
cacheService, cacheService,
cacheDir cacheDir,
threadPool
) )
) { ) {
final boolean loaded = snapshotDirectory.loadSnapshot(); final boolean loaded = snapshotDirectory.loadSnapshot();
@ -514,6 +520,7 @@ public class SearchableSnapshotDirectoryTests extends ESTestCase {
final ShardId shardId = new ShardId(new Index("_name", "_id"), 0); final ShardId shardId = new ShardId(new Index("_name", "_id"), 0);
final Path cacheDir = createTempDir(); final Path cacheDir = createTempDir();
final ThreadPool threadPool = new TestThreadPool(getTestName(), SearchableSnapshots.executorBuilder());
try ( try (
SearchableSnapshotDirectory directory = new SearchableSnapshotDirectory( SearchableSnapshotDirectory directory = new SearchableSnapshotDirectory(
() -> blobContainer, () -> blobContainer,
@ -521,10 +528,14 @@ public class SearchableSnapshotDirectoryTests extends ESTestCase {
snapshotId, snapshotId,
indexId, indexId,
shardId, shardId,
Settings.builder().put(SNAPSHOT_CACHE_ENABLED_SETTING.getKey(), true).build(), Settings.builder()
.put(SNAPSHOT_CACHE_ENABLED_SETTING.getKey(), true)
.put(SNAPSHOT_CACHE_PREWARM_ENABLED_SETTING.getKey(), randomBoolean())
.build(),
() -> 0L, () -> 0L,
cacheService, cacheService,
cacheDir cacheDir,
threadPool
) )
) { ) {
@ -554,6 +565,8 @@ public class SearchableSnapshotDirectoryTests extends ESTestCase {
assertListOfFiles(cacheDir, equalTo(0), equalTo(0L)); assertListOfFiles(cacheDir, equalTo(0), equalTo(0L));
} }
} }
} finally {
terminate(threadPool);
} }
} }
} }

View File

@ -6,8 +6,8 @@
package org.elasticsearch.index.store.cache; package org.elasticsearch.index.store.cache;
import org.apache.lucene.util.SetOnce; import org.apache.lucene.util.SetOnce;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.index.store.cache.CacheFile.EvictionListener; import org.elasticsearch.index.store.cache.CacheFile.EvictionListener;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException; import java.io.IOException;
import java.nio.channels.FileChannel; import java.nio.channels.FileChannel;
@ -27,7 +27,7 @@ public class CacheFileTests extends ESTestCase {
public void testAcquireAndRelease() throws Exception { public void testAcquireAndRelease() throws Exception {
final Path file = createTempDir().resolve("file.cache"); final Path file = createTempDir().resolve("file.cache");
final CacheFile cacheFile = new CacheFile("test", randomLongBetween(1, 100), file, randomIntBetween(1, 100)); final CacheFile cacheFile = new CacheFile("test", randomLongBetween(1, 100), file);
assertThat("Cache file is not acquired: no channel exists", cacheFile.getChannel(), nullValue()); assertThat("Cache file is not acquired: no channel exists", cacheFile.getChannel(), nullValue());
assertThat("Cache file is not acquired: file does not exist", Files.exists(file), is(false)); assertThat("Cache file is not acquired: file does not exist", Files.exists(file), is(false));
@ -70,7 +70,7 @@ public class CacheFileTests extends ESTestCase {
public void testCacheFileNotAcquired() throws IOException { public void testCacheFileNotAcquired() throws IOException {
final Path file = createTempDir().resolve("file.cache"); final Path file = createTempDir().resolve("file.cache");
final CacheFile cacheFile = new CacheFile("test", randomLongBetween(1, 100), file, randomIntBetween(1, 100)); final CacheFile cacheFile = new CacheFile("test", randomLongBetween(1, 100), file);
assertThat(Files.exists(file), is(false)); assertThat(Files.exists(file), is(false));
assertThat(cacheFile.getChannel(), nullValue()); assertThat(cacheFile.getChannel(), nullValue());
@ -94,7 +94,7 @@ public class CacheFileTests extends ESTestCase {
public void testDeleteOnCloseAfterLastRelease() throws Exception { public void testDeleteOnCloseAfterLastRelease() throws Exception {
final Path file = createTempDir().resolve("file.cache"); final Path file = createTempDir().resolve("file.cache");
final CacheFile cacheFile = new CacheFile("test", randomLongBetween(1, 100), file, randomIntBetween(1, 100)); final CacheFile cacheFile = new CacheFile("test", randomLongBetween(1, 100), file);
final List<TestEvictionListener> acquiredListeners = new ArrayList<>(); final List<TestEvictionListener> acquiredListeners = new ArrayList<>();
for (int i = 0; i < randomIntBetween(1, 20); i++) { for (int i = 0; i < randomIntBetween(1, 20); i++) {

View File

@ -0,0 +1,321 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.index.store.cache;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.document.StringField;
import org.apache.lucene.document.TextField;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.store.Directory;
import org.elasticsearch.Version;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot;
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo;
import org.elasticsearch.index.store.SearchableSnapshotDirectory;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil;
import org.elasticsearch.repositories.fs.FsRepository;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.mockstore.BlobContainerWrapper;
import org.elasticsearch.test.DummyShardLock;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.IndexSettingsModule;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots;
import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static java.util.Collections.emptyMap;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_CACHE_ENABLED_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_CACHE_PREWARM_ENABLED_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.SEARCHABLE_SNAPSHOTS_THREAD_POOL_NAME;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
public class CachePreWarmingTests extends ESTestCase {
public void testCachePreWarming() throws Exception {
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(
"_index",
Settings.builder()
.put(IndexMetadata.SETTING_INDEX_UUID, UUIDs.randomBase64UUID(random()))
.put(IndexMetadata.SETTING_VERSION_CREATED, org.elasticsearch.Version.CURRENT)
.build()
);
final ShardId shardId = new ShardId(indexSettings.getIndex(), randomIntBetween(0, 10));
final List<Releasable> releasables = new ArrayList<>();
try (Directory directory = newDirectory()) {
final IndexWriterConfig indexWriterConfig = newIndexWriterConfig();
try (IndexWriter writer = new IndexWriter(directory, indexWriterConfig)) {
final int nbDocs = scaledRandomIntBetween(0, 1_000);
final List<String> words = org.elasticsearch.common.collect.List.of(
"the",
"quick",
"brown",
"fox",
"jumps",
"over",
"the",
"lazy",
"dog"
);
for (int i = 0; i < nbDocs; i++) {
final Document doc = new Document();
doc.add(new StringField("id", "" + i, Field.Store.YES));
String text = String.join(" ", randomSubsetOf(randomIntBetween(1, words.size()), words));
doc.add(new TextField("text", text, Field.Store.YES));
doc.add(new NumericDocValuesField("rank", i));
writer.addDocument(doc);
}
if (randomBoolean()) {
writer.flush();
}
if (randomBoolean()) {
writer.forceMerge(1, true);
}
final Map<String, String> userData = new HashMap<>(2);
userData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, "0");
userData.put(Translog.TRANSLOG_UUID_KEY, UUIDs.randomBase64UUID(random()));
writer.setLiveCommitData(userData.entrySet());
writer.commit();
}
final ThreadPool threadPool = new TestThreadPool(getTestName(), SearchableSnapshots.executorBuilder());
releasables.add(() -> terminate(threadPool));
final Store store = new Store(shardId, indexSettings, directory, new DummyShardLock(shardId));
store.incRef();
releasables.add(store::decRef);
try {
final SegmentInfos segmentInfos = Lucene.readSegmentInfos(store.directory());
final IndexCommit indexCommit = Lucene.getIndexCommit(segmentInfos, store.directory());
Path repositoryPath = createTempDir();
Settings.Builder repositorySettings = Settings.builder().put("location", repositoryPath);
boolean compress = randomBoolean();
if (compress) {
repositorySettings.put("compress", randomBoolean());
}
if (randomBoolean()) {
repositorySettings.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES);
}
final String repositoryName = randomAlphaOfLength(10);
final RepositoryMetadata repositoryMetadata = new RepositoryMetadata(
repositoryName,
FsRepository.TYPE,
repositorySettings.build()
);
final BlobStoreRepository repository = new FsRepository(
repositoryMetadata,
new Environment(
Settings.builder()
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toAbsolutePath())
.put(Environment.PATH_REPO_SETTING.getKey(), repositoryPath.toAbsolutePath())
.putList(Environment.PATH_DATA_SETTING.getKey(), tmpPaths())
.build(),
null
),
NamedXContentRegistry.EMPTY,
BlobStoreTestUtil.mockClusterService(repositoryMetadata)
) {
@Override
protected void assertSnapshotOrGenericThread() {
// eliminate thread name check as we create repo manually on test/main threads
}
};
repository.start();
releasables.add(repository::stop);
final SnapshotId snapshotId = new SnapshotId("_snapshot", UUIDs.randomBase64UUID(random()));
final IndexId indexId = new IndexId(indexSettings.getIndex().getName(), UUIDs.randomBase64UUID(random()));
final PlainActionFuture<String> future = PlainActionFuture.newFuture();
threadPool.generic().submit(() -> {
IndexShardSnapshotStatus snapshotStatus = IndexShardSnapshotStatus.newInitializing(null);
repository.snapshotShard(
store,
null,
snapshotId,
indexId,
indexCommit,
null,
snapshotStatus,
Version.CURRENT,
emptyMap(),
future
);
future.actionGet();
});
future.actionGet();
final Path cacheDir = createTempDir();
final CacheService cacheService = new CacheService(Settings.EMPTY);
releasables.add(cacheService);
cacheService.start();
final List<String> excludedFromCache = randomSubsetOf(Arrays.asList("fdt", "fdx", "nvd", "dvd", "tip", "cfs", "dim"));
final Settings restoredIndexSettings = Settings.builder()
.put(SNAPSHOT_CACHE_ENABLED_SETTING.getKey(), true)
.put(SNAPSHOT_CACHE_PREWARM_ENABLED_SETTING.getKey(), true)
.putList(SearchableSnapshots.SNAPSHOT_CACHE_EXCLUDED_FILE_TYPES_SETTING.getKey(), excludedFromCache)
.build();
final BlobContainer blobContainer = repository.shardContainer(indexId, shardId.id());
final BlobStoreIndexShardSnapshot snapshot = repository.loadShardSnapshot(blobContainer, snapshotId);
final List<FileInfo> expectedPrewarmedBlobs = snapshot.indexFiles()
.stream()
.filter(fileInfo -> fileInfo.metadata().hashEqualsContents() == false)
.filter(fileInfo -> excludedFromCache.contains(IndexFileNames.getExtension(fileInfo.physicalName())) == false)
.collect(Collectors.toList());
final FilterBlobContainer filterBlobContainer = new FilterBlobContainer(blobContainer);
try (
SearchableSnapshotDirectory snapshotDirectory = new SearchableSnapshotDirectory(
() -> filterBlobContainer,
() -> snapshot,
snapshotId,
indexId,
shardId,
restoredIndexSettings,
() -> 0L,
cacheService,
cacheDir,
threadPool
)
) {
assertThat(filterBlobContainer.totalFilesRead(), equalTo(0L));
assertThat(filterBlobContainer.totalBytesRead(), equalTo(0L));
final boolean loaded = snapshotDirectory.loadSnapshot();
assertThat("Failed to load snapshot", loaded, is(true));
final ExecutorService executor = threadPool.executor(SEARCHABLE_SNAPSHOTS_THREAD_POOL_NAME);
executor.shutdown();
executor.awaitTermination(30L, TimeUnit.SECONDS);
assertThat(
filterBlobContainer.totalFilesRead(),
equalTo(expectedPrewarmedBlobs.stream().mapToLong(FileInfo::numberOfParts).sum())
);
assertThat(
filterBlobContainer.totalBytesRead(),
equalTo(expectedPrewarmedBlobs.stream().mapToLong(FileInfo::length).sum())
);
for (FileInfo expectedPrewarmedBlob : expectedPrewarmedBlobs) {
for (int part = 0; part < expectedPrewarmedBlob.numberOfParts(); part++) {
String partName = expectedPrewarmedBlob.partName(part);
assertThat(filterBlobContainer.totalBytesRead(partName), equalTo(expectedPrewarmedBlob.partBytes(part)));
}
}
}
} finally {
Releasables.close(releasables);
}
}
}
private static class FilterBlobContainer extends BlobContainerWrapper {
private final Map<String, Long> files = new ConcurrentHashMap<>();
FilterBlobContainer(BlobContainer delegate) {
super(delegate);
}
public long totalFilesRead() {
return files.size();
}
public long totalBytesRead() {
return files.values().stream().mapToLong(bytesRead -> bytesRead).sum();
}
@Nullable
public Long totalBytesRead(String name) {
return files.getOrDefault(name, null);
}
@Override
public InputStream readBlob(String blobName, long position, long length) throws IOException {
return new FilterInputStream(super.readBlob(blobName, position, length)) {
long bytesRead = 0L;
@Override
public int read() throws IOException {
final int result = in.read();
if (result == -1) {
return result;
}
bytesRead += 1L;
return result;
}
@Override
public int read(byte[] b, int offset, int len) throws IOException {
final int result = in.read(b, offset, len);
if (result == -1) {
return result;
}
bytesRead += len;
return result;
}
@Override
public void close() throws IOException {
files.put(blobName, bytesRead);
super.close();
}
};
}
}
}

View File

@ -87,7 +87,8 @@ public class CachedBlobContainerIndexInputTests extends ESIndexInputTestCase {
Settings.builder().put(SNAPSHOT_CACHE_ENABLED_SETTING.getKey(), true).build(), Settings.builder().put(SNAPSHOT_CACHE_ENABLED_SETTING.getKey(), true).build(),
() -> 0L, () -> 0L,
cacheService, cacheService,
cacheDir cacheDir,
null
) )
) { ) {
final boolean loaded = directory.loadSnapshot(); final boolean loaded = directory.loadSnapshot();
@ -156,7 +157,8 @@ public class CachedBlobContainerIndexInputTests extends ESIndexInputTestCase {
Settings.EMPTY, Settings.EMPTY,
() -> 0L, () -> 0L,
cacheService, cacheService,
cacheDir cacheDir,
null
) )
) { ) {
final boolean loaded = searchableSnapshotDirectory.loadSnapshot(); final boolean loaded = searchableSnapshotDirectory.loadSnapshot();

View File

@ -126,6 +126,9 @@ public class SearchableSnapshotsIntegTests extends BaseSearchableSnapshotsIntegT
Settings.Builder indexSettingsBuilder = Settings.builder() Settings.Builder indexSettingsBuilder = Settings.builder()
.put(SearchableSnapshots.SNAPSHOT_CACHE_ENABLED_SETTING.getKey(), cacheEnabled) .put(SearchableSnapshots.SNAPSHOT_CACHE_ENABLED_SETTING.getKey(), cacheEnabled)
.put(IndexSettings.INDEX_CHECK_ON_STARTUP.getKey(), Boolean.FALSE.toString()); .put(IndexSettings.INDEX_CHECK_ON_STARTUP.getKey(), Boolean.FALSE.toString());
if (cacheEnabled) {
indexSettingsBuilder.put(SearchableSnapshots.SNAPSHOT_CACHE_PREWARM_ENABLED_SETTING.getKey(), randomBoolean());
}
final List<String> nonCachedExtensions; final List<String> nonCachedExtensions;
if (randomBoolean()) { if (randomBoolean()) {
nonCachedExtensions = randomSubsetOf(Arrays.asList("fdt", "fdx", "nvd", "dvd", "tip", "cfs", "dim")); nonCachedExtensions = randomSubsetOf(Arrays.asList("fdt", "fdx", "nvd", "dvd", "tip", "cfs", "dim"));