Use a dedicated thread pool for searchable snapshot cache prewarming (#59313) (#59590)

Since #58728 writing operations on searchable snapshot directory cache files
are executed in an asynchronous manner using a dedicated thread pool. The
thread pool used is searchable_snapshots which has been created to execute
prewarming tasks.

Reusing the same thread pool wasn't a good idea as it can lead to deadlock
situations. One of these situation arose in a test failure where the thread pool
was full of prewarming tasks, all waiting for a cache file to be accessible, while
the cache file was being evicted by the cache service. But such an eviction
can only be processed when all read/write operations on the cache file are
completed and in this case the deadlock occurred because the cache file was
actively being read by a concurrent search which also won the privilege to
write the range of bytes in cache... and this writing operation could never have
 been completed because of the prewarming tasks making no progress and
filling up the thread pool.

This commit renames the searchable_snapshots thread pool to
searchable_snapshots_cache_fetch_async. Assertions are added to assert
that cache writes are executed using this thread pool and to assert that read
on cached index inputs are executed using a different thread pool to avoid
potential deadlock situations.

This commit also adds a searchable_snapshots_cache_prewarming that is
used to execute prewarming tasks. It also converts the existing cache prewarming
test into a more complte integration test that creates multiple searchable
snapshot indices concurrently with randomized thread pool sizes, and verifies
that all files have been correctly prewarmed.
This commit is contained in:
Tanguy Leroux 2020-07-15 11:45:52 +02:00 committed by GitHub
parent a699c89133
commit 604f22db79
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 478 additions and 372 deletions

View File

@ -35,5 +35,9 @@ public class SearchableSnapshotsConstants {
&& SNAPSHOT_DIRECTORY_FACTORY_KEY.equals(INDEX_STORE_TYPE_SETTING.get(indexSettings));
}
public static final String SEARCHABLE_SNAPSHOTS_THREAD_POOL_NAME = "searchable_snapshots";
public static final String CACHE_FETCH_ASYNC_THREAD_POOL_NAME = "searchable_snapshots_cache_fetch_async";
public static final String CACHE_FETCH_ASYNC_THREAD_POOL_SETTING = "xpack.searchable_snapshots.cache_fetch_async_thread_pool";
public static final String CACHE_PREWARMING_THREAD_POOL_NAME = "searchable_snapshots_cache_prewarming";
public static final String CACHE_PREWARMING_THREAD_POOL_SETTING = "xpack.searchable_snapshots.cache_prewarming_thread_pool";
}

View File

@ -136,8 +136,11 @@ public abstract class BaseSearchableSnapshotIndexInput extends BufferedIndexInpu
|| threadName.contains('[' + ThreadPool.Names.SEARCH + ']')
|| threadName.contains('[' + ThreadPool.Names.SEARCH_THROTTLED + ']')
// Cache prewarming runs on a dedicated thread pool.
|| threadName.contains('[' + SearchableSnapshotsConstants.SEARCHABLE_SNAPSHOTS_THREAD_POOL_NAME + ']')
// Cache asynchronous fetching runs on a dedicated thread pool.
|| threadName.contains('[' + SearchableSnapshotsConstants.CACHE_FETCH_ASYNC_THREAD_POOL_NAME + ']')
// Cache prewarming also runs on a dedicated thread pool.
|| threadName.contains('[' + SearchableSnapshotsConstants.CACHE_PREWARMING_THREAD_POOL_NAME + ']')
// Unit tests access the blob store on the main test thread; simplest just to permit this rather than have them override this
// method somehow.

View File

@ -81,7 +81,7 @@ import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SN
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_ID_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_NAME_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_UNCACHED_CHUNK_SIZE_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.SEARCHABLE_SNAPSHOTS_THREAD_POOL_NAME;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.CACHE_FETCH_ASYNC_THREAD_POOL_NAME;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.SNAPSHOT_DIRECTORY_FACTORY_KEY;
/**
@ -321,11 +321,11 @@ public class SearchableSnapshotDirectory extends BaseDirectory {
}
public Executor cacheFetchAsyncExecutor() {
return threadPool.executor(SearchableSnapshotsConstants.SEARCHABLE_SNAPSHOTS_THREAD_POOL_NAME);
return threadPool.executor(SearchableSnapshotsConstants.CACHE_FETCH_ASYNC_THREAD_POOL_NAME);
}
public Executor prewarmExecutor() {
return threadPool.executor(ThreadPool.Names.SAME);
return threadPool.executor(SearchableSnapshotsConstants.CACHE_PREWARMING_THREAD_POOL_NAME);
}
@Override
@ -377,7 +377,7 @@ public class SearchableSnapshotDirectory extends BaseDirectory {
private void prewarmCache() {
if (prewarmCache) {
final BlockingQueue<Tuple<ActionListener<Void>, CheckedRunnable<Exception>>> queue = new LinkedBlockingQueue<>();
final Executor executor = threadPool.executor(SEARCHABLE_SNAPSHOTS_THREAD_POOL_NAME);
final Executor executor = prewarmExecutor();
for (BlobStoreIndexShardSnapshot.FileInfo file : snapshot().indexFiles()) {
if (file.metadata().hashEqualsContents() || isExcludedFromCache(file.physicalName())) {
@ -422,7 +422,7 @@ public class SearchableSnapshotDirectory extends BaseDirectory {
logger.debug("{} warming shard cache for [{}] files", shardId, queue.size());
// Start as many workers as fit into the searchable snapshot pool at once at the most
final int workers = Math.min(threadPool.info(SEARCHABLE_SNAPSHOTS_THREAD_POOL_NAME).getMax(), queue.size());
final int workers = Math.min(threadPool.info(CACHE_FETCH_ASYNC_THREAD_POOL_NAME).getMax(), queue.size());
for (int i = 0; i < workers; ++i) {
prewarmNext(executor, queue);
}

View File

@ -132,6 +132,7 @@ public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn
@Override
protected void readInternal(ByteBuffer b) throws IOException {
ensureContext(ctx -> ctx != CACHE_WARMING_CONTEXT);
assert assertCurrentThreadIsNotCacheFetchAsync();
final long position = getFilePointer() + this.offset;
final int length = b.remaining();
@ -240,13 +241,7 @@ public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn
final long readStart = rangeStart + totalBytesRead;
final Tuple<Long, Long> rangeToWrite = Tuple.tuple(readStart, readStart + bytesRead);
// Prewarming don't need to read the cached data after it been written in cache; so the range to read is empty. In
// case where the range is actively being (or about to be) written in cache by a concurrent search the sync fetching
// returns immediately and the next range can be prewarmed. If the range is not available in cache then the range
// will be written by this prewarming task and blocks until fully written to disk.
final Tuple<Long, Long> rangeToRead = Tuple.tuple(readStart, readStart);
cacheFile.fetchAsync(rangeToWrite, rangeToRead, (channel) -> 0, (channel, start, end, progressUpdater) -> {
cacheFile.fetchAsync(rangeToWrite, rangeToWrite, (channel) -> bytesRead, (channel, start, end, progressUpdater) -> {
final ByteBuffer byteBuffer = ByteBuffer.wrap(
copyBuffer,
Math.toIntExact(start - readStart),
@ -262,7 +257,7 @@ public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn
);
totalBytesWritten.addAndGet(writtenBytes);
progressUpdater.accept(start + writtenBytes);
}, directory.prewarmExecutor());
}, directory.cacheFetchAsyncExecutor()).get();
totalBytesRead += bytesRead;
remainingBytes -= bytesRead;
}
@ -279,7 +274,7 @@ public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn
@SuppressForbidden(reason = "Use positional writes on purpose")
private static int positionalWrite(FileChannel fc, long start, ByteBuffer byteBuffer) throws IOException {
assert assertSearchableSnapshotsThread();
assert assertCurrentThreadMayWriteCacheFile();
return fc.write(byteBuffer, start);
}
@ -354,7 +349,7 @@ public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn
private void writeCacheFile(final FileChannel fc, final long start, final long end, final Consumer<Long> progressUpdater)
throws IOException {
assert assertFileChannelOpen(fc);
assert assertSearchableSnapshotsThread();
assert assertCurrentThreadMayWriteCacheFile();
final long length = end - start;
final byte[] copyBuffer = new byte[Math.toIntExact(Math.min(COPY_BUFFER_SIZE, length))];
logger.trace(() -> new ParameterizedMessage("writing range [{}-{}] to cache file [{}]", start, end, cacheFileReference));
@ -546,11 +541,23 @@ public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn
return true;
}
private static boolean assertSearchableSnapshotsThread() {
private static boolean isCacheFetchAsyncThread(final String threadName) {
return threadName.contains('[' + SearchableSnapshotsConstants.CACHE_FETCH_ASYNC_THREAD_POOL_NAME + ']');
}
private static boolean assertCurrentThreadMayWriteCacheFile() {
final String threadName = Thread.currentThread().getName();
assert threadName.contains(
'[' + SearchableSnapshotsConstants.SEARCHABLE_SNAPSHOTS_THREAD_POOL_NAME + ']'
) : "expected the current thread [" + threadName + "] to belong to the searchable snapshots thread pool";
assert isCacheFetchAsyncThread(threadName) : "expected the current thread ["
+ threadName
+ "] to belong to the cache fetch async thread pool";
return true;
}
private static boolean assertCurrentThreadIsNotCacheFetchAsync() {
final String threadName = Thread.currentThread().getName();
assert false == isCacheFetchAsyncThread(threadName) : "expected the current thread ["
+ threadName
+ "] to belong to the cache fetch async thread pool";
return true;
}
}

View File

@ -73,8 +73,11 @@ import java.util.function.Function;
import java.util.function.Supplier;
import static java.util.Collections.emptyList;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.CACHE_FETCH_ASYNC_THREAD_POOL_NAME;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.CACHE_FETCH_ASYNC_THREAD_POOL_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.CACHE_PREWARMING_THREAD_POOL_NAME;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.CACHE_PREWARMING_THREAD_POOL_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.SEARCHABLE_SNAPSHOTS_FEATURE_ENABLED;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.SEARCHABLE_SNAPSHOTS_THREAD_POOL_NAME;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.SNAPSHOT_DIRECTORY_FACTORY_KEY;
/**
@ -296,19 +299,27 @@ public class SearchableSnapshots extends Plugin implements IndexStorePlugin, Eng
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
if (SEARCHABLE_SNAPSHOTS_FEATURE_ENABLED) {
return org.elasticsearch.common.collect.List.of(executorBuilder());
return org.elasticsearch.common.collect.List.of(executorBuilders());
} else {
return org.elasticsearch.common.collect.List.of();
}
}
public static ExecutorBuilder<?> executorBuilder() {
return new ScalingExecutorBuilder(
SEARCHABLE_SNAPSHOTS_THREAD_POOL_NAME,
0,
32,
TimeValue.timeValueSeconds(30L),
"xpack.searchable_snapshots.thread_pool"
);
public static ScalingExecutorBuilder[] executorBuilders() {
return new ScalingExecutorBuilder[] {
new ScalingExecutorBuilder(
CACHE_FETCH_ASYNC_THREAD_POOL_NAME,
0,
32,
TimeValue.timeValueSeconds(30L),
CACHE_FETCH_ASYNC_THREAD_POOL_SETTING
),
new ScalingExecutorBuilder(
CACHE_PREWARMING_THREAD_POOL_NAME,
0,
32,
TimeValue.timeValueSeconds(30L),
CACHE_PREWARMING_THREAD_POOL_SETTING
) };
}
}

View File

@ -589,7 +589,7 @@ public class SearchableSnapshotDirectoryStatsTests extends ESIndexInputTestCase
final ShardId shardId = new ShardId("_name", "_uuid", 0);
final AtomicLong fakeClock = new AtomicLong();
final LongSupplier statsCurrentTimeNanos = () -> fakeClock.addAndGet(FAKE_CLOCK_ADVANCE_NANOS);
final ThreadPool threadPool = new TestThreadPool(getTestClass().getSimpleName(), SearchableSnapshots.executorBuilder());
final ThreadPool threadPool = new TestThreadPool(getTestClass().getSimpleName(), SearchableSnapshots.executorBuilders());
final Long seekingThreshold = randomBoolean() ? randomLongBetween(1L, fileContent.length) : null;

View File

@ -471,7 +471,7 @@ public class SearchableSnapshotDirectoryTests extends ESTestCase {
writer.commit();
}
final ThreadPool threadPool = new TestThreadPool(getTestName(), SearchableSnapshots.executorBuilder());
final ThreadPool threadPool = new TestThreadPool(getTestName(), SearchableSnapshots.executorBuilders());
releasables.add(() -> terminate(threadPool));
final Store store = new Store(shardId, indexSettings, directory, new DummyShardLock(shardId));
@ -639,7 +639,7 @@ public class SearchableSnapshotDirectoryTests extends ESTestCase {
final ShardId shardId = new ShardId(new Index("_name", "_id"), 0);
final Path cacheDir = createTempDir();
final ThreadPool threadPool = new TestThreadPool(getTestName(), SearchableSnapshots.executorBuilder());
final ThreadPool threadPool = new TestThreadPool(getTestName(), SearchableSnapshots.executorBuilders());
try (
SearchableSnapshotDirectory directory = new SearchableSnapshotDirectory(
() -> blobContainer,

View File

@ -1,335 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.index.store.cache;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.document.StringField;
import org.apache.lucene.document.TextField;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.store.Directory;
import org.elasticsearch.Version;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.support.FilterBlobContainer;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot;
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo;
import org.elasticsearch.index.store.SearchableSnapshotDirectory;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil;
import org.elasticsearch.repositories.fs.FsRepository;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.test.DummyShardLock;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.IndexSettingsModule;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots;
import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static java.util.Collections.emptyMap;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_CACHE_ENABLED_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_CACHE_PREWARM_ENABLED_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.SEARCHABLE_SNAPSHOTS_THREAD_POOL_NAME;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
public class CachePreWarmingTests extends ESTestCase {
public void testCachePreWarming() throws Exception {
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(
"_index",
Settings.builder()
.put(IndexMetadata.SETTING_INDEX_UUID, UUIDs.randomBase64UUID(random()))
.put(IndexMetadata.SETTING_VERSION_CREATED, org.elasticsearch.Version.CURRENT)
.build()
);
final ShardId shardId = new ShardId(indexSettings.getIndex(), randomIntBetween(0, 10));
final List<Releasable> releasables = new ArrayList<>();
try (Directory directory = newDirectory()) {
final IndexWriterConfig indexWriterConfig = newIndexWriterConfig();
try (IndexWriter writer = new IndexWriter(directory, indexWriterConfig)) {
final int nbDocs = scaledRandomIntBetween(0, 1_000);
final List<String> words = org.elasticsearch.common.collect.List.of(
"the",
"quick",
"brown",
"fox",
"jumps",
"over",
"the",
"lazy",
"dog"
);
for (int i = 0; i < nbDocs; i++) {
final Document doc = new Document();
doc.add(new StringField("id", "" + i, Field.Store.YES));
String text = String.join(" ", randomSubsetOf(randomIntBetween(1, words.size()), words));
doc.add(new TextField("text", text, Field.Store.YES));
doc.add(new NumericDocValuesField("rank", i));
writer.addDocument(doc);
}
if (randomBoolean()) {
writer.flush();
}
if (randomBoolean()) {
writer.forceMerge(1, true);
}
final Map<String, String> userData = new HashMap<>(2);
userData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, "0");
userData.put(Translog.TRANSLOG_UUID_KEY, UUIDs.randomBase64UUID(random()));
writer.setLiveCommitData(userData.entrySet());
writer.commit();
}
final ThreadPool threadPool = new TestThreadPool(getTestName(), SearchableSnapshots.executorBuilder());
releasables.add(() -> terminate(threadPool));
final Store store = new Store(shardId, indexSettings, directory, new DummyShardLock(shardId));
store.incRef();
releasables.add(store::decRef);
try {
final SegmentInfos segmentInfos = Lucene.readSegmentInfos(store.directory());
final IndexCommit indexCommit = Lucene.getIndexCommit(segmentInfos, store.directory());
Path repositoryPath = createTempDir();
Settings.Builder repositorySettings = Settings.builder().put("location", repositoryPath);
boolean compress = randomBoolean();
if (compress) {
repositorySettings.put("compress", randomBoolean());
}
if (randomBoolean()) {
repositorySettings.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES);
}
final String repositoryName = randomAlphaOfLength(10);
final RepositoryMetadata repositoryMetadata = new RepositoryMetadata(
repositoryName,
FsRepository.TYPE,
repositorySettings.build()
);
final BlobStoreRepository repository = new FsRepository(
repositoryMetadata,
new Environment(
Settings.builder()
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toAbsolutePath())
.put(Environment.PATH_REPO_SETTING.getKey(), repositoryPath.toAbsolutePath())
.putList(Environment.PATH_DATA_SETTING.getKey(), tmpPaths())
.build(),
null
),
NamedXContentRegistry.EMPTY,
BlobStoreTestUtil.mockClusterService(repositoryMetadata),
new RecoverySettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))
) {
@Override
protected void assertSnapshotOrGenericThread() {
// eliminate thread name check as we create repo manually on test/main threads
}
};
repository.start();
releasables.add(repository::stop);
final SnapshotId snapshotId = new SnapshotId("_snapshot", UUIDs.randomBase64UUID(random()));
final IndexId indexId = new IndexId(indexSettings.getIndex().getName(), UUIDs.randomBase64UUID(random()));
final PlainActionFuture<String> future = PlainActionFuture.newFuture();
threadPool.generic().submit(() -> {
IndexShardSnapshotStatus snapshotStatus = IndexShardSnapshotStatus.newInitializing(null);
repository.snapshotShard(
store,
null,
snapshotId,
indexId,
indexCommit,
null,
snapshotStatus,
Version.CURRENT,
emptyMap(),
future
);
future.actionGet();
});
future.actionGet();
final Path cacheDir = createTempDir();
final CacheService cacheService = TestUtils.createDefaultCacheService();
releasables.add(cacheService);
cacheService.start();
final List<String> excludedFromCache = randomSubsetOf(Arrays.asList("fdt", "fdx", "nvd", "dvd", "tip", "cfs", "dim"));
final Settings restoredIndexSettings = Settings.builder()
.put(SNAPSHOT_CACHE_ENABLED_SETTING.getKey(), true)
.put(SNAPSHOT_CACHE_PREWARM_ENABLED_SETTING.getKey(), true)
.putList(SearchableSnapshots.SNAPSHOT_CACHE_EXCLUDED_FILE_TYPES_SETTING.getKey(), excludedFromCache)
.build();
final BlobContainer blobContainer = repository.shardContainer(indexId, shardId.id());
final BlobStoreIndexShardSnapshot snapshot = repository.loadShardSnapshot(blobContainer, snapshotId);
final List<FileInfo> expectedPrewarmedBlobs = snapshot.indexFiles()
.stream()
.filter(fileInfo -> fileInfo.metadata().hashEqualsContents() == false)
.filter(fileInfo -> excludedFromCache.contains(IndexFileNames.getExtension(fileInfo.physicalName())) == false)
.collect(Collectors.toList());
final TrackingFilesBlobContainer filterBlobContainer = new TrackingFilesBlobContainer(blobContainer);
try (
SearchableSnapshotDirectory snapshotDirectory = new SearchableSnapshotDirectory(
() -> filterBlobContainer,
() -> snapshot,
snapshotId,
indexId,
shardId,
restoredIndexSettings,
() -> 0L,
cacheService,
cacheDir,
threadPool
)
) {
assertThat(filterBlobContainer.totalFilesRead(), equalTo(0L));
assertThat(filterBlobContainer.totalBytesRead(), equalTo(0L));
final boolean loaded = snapshotDirectory.loadSnapshot();
assertThat("Failed to load snapshot", loaded, is(true));
final ExecutorService executor = threadPool.executor(SEARCHABLE_SNAPSHOTS_THREAD_POOL_NAME);
executor.shutdown();
executor.awaitTermination(30L, TimeUnit.SECONDS);
assertThat(
filterBlobContainer.totalFilesRead(),
equalTo(expectedPrewarmedBlobs.stream().mapToLong(FileInfo::numberOfParts).sum())
);
assertThat(
filterBlobContainer.totalBytesRead(),
equalTo(expectedPrewarmedBlobs.stream().mapToLong(FileInfo::length).sum())
);
for (FileInfo expectedPrewarmedBlob : expectedPrewarmedBlobs) {
for (int part = 0; part < expectedPrewarmedBlob.numberOfParts(); part++) {
String partName = expectedPrewarmedBlob.partName(part);
assertThat(filterBlobContainer.totalBytesRead(partName), equalTo(expectedPrewarmedBlob.partBytes(part)));
}
}
}
} finally {
Releasables.close(releasables);
}
}
}
private static class TrackingFilesBlobContainer extends FilterBlobContainer {
private final ConcurrentHashMap<String, Long> files;
TrackingFilesBlobContainer(BlobContainer delegate) {
this(delegate, new ConcurrentHashMap<>());
}
TrackingFilesBlobContainer(BlobContainer delegate, ConcurrentHashMap<String, Long> files) {
super(delegate);
this.files = Objects.requireNonNull(files);
}
public long totalFilesRead() {
return files.size();
}
public long totalBytesRead() {
return files.values().stream().mapToLong(bytesRead -> bytesRead).sum();
}
@Nullable
public Long totalBytesRead(String name) {
return files.getOrDefault(name, null);
}
@Override
public InputStream readBlob(String blobName, long position, long length) throws IOException {
return new FilterInputStream(super.readBlob(blobName, position, length)) {
long bytesRead = 0L;
@Override
public int read() throws IOException {
final int result = in.read();
if (result == -1) {
return result;
}
bytesRead += 1L;
return result;
}
@Override
public int read(byte[] b, int offset, int len) throws IOException {
final int result = in.read(b, offset, len);
if (result == -1) {
return result;
}
bytesRead += len;
return result;
}
@Override
public void close() throws IOException {
files.put(blobName, bytesRead);
super.close();
}
};
}
@Override
protected BlobContainer wrapChild(BlobContainer child) {
return new TrackingFilesBlobContainer(child, this.files);
}
}
}

View File

@ -47,7 +47,7 @@ import static org.hamcrest.Matchers.notNullValue;
public class CachedBlobContainerIndexInputTests extends ESIndexInputTestCase {
public void testRandomReads() throws IOException {
final ThreadPool threadPool = new TestThreadPool(getTestName(), SearchableSnapshots.executorBuilder());
final ThreadPool threadPool = new TestThreadPool(getTestName(), SearchableSnapshots.executorBuilders());
try (CacheService cacheService = createCacheService(random())) {
cacheService.start();
@ -157,7 +157,7 @@ public class CachedBlobContainerIndexInputTests extends ESIndexInputTestCase {
);
final BlobContainer blobContainer = singleBlobContainer(blobName, input);
final ThreadPool threadPool = new TestThreadPool(getTestName(), SearchableSnapshots.executorBuilder());
final ThreadPool threadPool = new TestThreadPool(getTestName(), SearchableSnapshots.executorBuilders());
final Path cacheDir = createTempDir();
try (
SearchableSnapshotDirectory searchableSnapshotDirectory = new SearchableSnapshotDirectory(

View File

@ -0,0 +1,416 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.searchablesnapshots;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.blobstore.support.FilterBlobContainer;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.license.LicenseService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.plugins.RepositoryPlugin;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.repositories.blobstore.ESBlobStoreRepositoryIntegTestCase;
import org.elasticsearch.repositories.fs.FsRepository;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotAction;
import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotRequest;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_CACHE_ENABLED_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_CACHE_PREWARM_ENABLED_SETTING;
import static org.hamcrest.Matchers.emptyCollectionOf;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
@LuceneTestCase.SuppressFileSystems(value = "ExtrasFS") // we don't want extra dirs to be added to the shards snapshot_cache directories
public class SearchableSnapshotsPrewarmingIntegTests extends ESSingleNodeTestCase {
private static final int MAX_NUMBER_OF_INDICES = 10;
@Override
protected Collection<Class<? extends Plugin>> getPlugins() {
return org.elasticsearch.common.collect.List.of(
SearchableSnapshots.class,
LocalStateCompositeXPackPlugin.class,
TrackingRepositoryPlugin.class
);
}
@Override
protected Settings nodeSettings() {
return Settings.builder()
.put(super.nodeSettings())
.put(LicenseService.SELF_GENERATED_LICENSE_TYPE.getKey(), "trial")
.put(SearchableSnapshotsConstants.CACHE_FETCH_ASYNC_THREAD_POOL_SETTING + ".max", randomIntBetween(1, 32))
.put(SearchableSnapshotsConstants.CACHE_FETCH_ASYNC_THREAD_POOL_SETTING + ".keep_alive", "1s")
.put(SearchableSnapshotsConstants.CACHE_PREWARMING_THREAD_POOL_SETTING + ".max", randomIntBetween(1, 32))
.put(SearchableSnapshotsConstants.CACHE_PREWARMING_THREAD_POOL_SETTING + ".keep_alive", "1s")
.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.getKey(), MAX_NUMBER_OF_INDICES)
.build();
}
public void testConcurrentPrewarming() throws Exception {
final int nbIndices = randomIntBetween(1, MAX_NUMBER_OF_INDICES);
logger.debug("--> creating [{}] indices", nbIndices);
final Map<String, Integer> shardsPerIndex = new HashMap<>();
for (int index = 0; index < nbIndices; index++) {
final String indexName = "index-" + index;
final int nbShards = randomIntBetween(1, 5);
createIndex(
indexName,
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, nbShards)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.build()
);
shardsPerIndex.put(indexName, nbShards);
}
logger.debug("--> indexing documents");
final Map<String, Long> docsPerIndex = new HashMap<>();
for (int index = 0; index < nbIndices; index++) {
final String indexName = "index-" + index;
final long nbDocs = scaledRandomIntBetween(0, 500);
logger.debug("--> indexing [{}] documents in {}", nbDocs, indexName);
if (nbDocs > 0) {
final BulkRequestBuilder bulkRequest = client().prepareBulk();
for (int i = 0; i < nbDocs; i++) {
bulkRequest.add(client().prepareIndex(indexName, "_doc").setSource("foo", randomBoolean() ? "bar" : "baz"));
}
final BulkResponse bulkResponse = bulkRequest.get();
assertThat(bulkResponse.status(), is(RestStatus.OK));
assertThat(bulkResponse.hasFailures(), is(false));
}
docsPerIndex.put(indexName, nbDocs);
}
final Path repositoryPath = node().getEnvironment().resolveRepoFile(randomAlphaOfLength(10));
final Settings.Builder repositorySettings = Settings.builder().put("location", repositoryPath);
if (randomBoolean()) {
repositorySettings.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES);
}
logger.debug("--> registering repository");
assertAcked(
client().admin().cluster().preparePutRepository("repository").setType(FsRepository.TYPE).setSettings(repositorySettings.build())
);
logger.debug("--> snapshotting indices");
final CreateSnapshotResponse createSnapshotResponse = client().admin()
.cluster()
.prepareCreateSnapshot("repository", "snapshot")
.setIncludeGlobalState(false)
.setIndices("index-*")
.setWaitForCompletion(true)
.get();
final int totalShards = shardsPerIndex.values().stream().mapToInt(i -> i).sum();
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(totalShards));
assertThat(createSnapshotResponse.getSnapshotInfo().failedShards(), equalTo(0));
ensureGreen("index-*");
logger.debug("--> deleting indices");
assertAcked(client().admin().indices().prepareDelete("index-*"));
logger.debug("--> deleting repository");
assertAcked(client().admin().cluster().prepareDeleteRepository("repository"));
logger.debug("--> registering tracking repository");
assertAcked(
client().admin()
.cluster()
.preparePutRepository("repository")
.setType("tracking")
.setVerify(false)
.setSettings(repositorySettings.build())
);
TrackingRepositoryPlugin tracker = null;
for (RepositoryPlugin plugin : getInstanceFromNode(PluginsService.class).filterPlugins(RepositoryPlugin.class)) {
if (plugin instanceof TrackingRepositoryPlugin) {
tracker = ((TrackingRepositoryPlugin) plugin);
}
}
assertThat(tracker, notNullValue());
assertThat(tracker.totalFilesRead(), equalTo(0L));
assertThat(tracker.totalBytesRead(), equalTo(0L));
final Map<String, List<String>> exclusionsPerIndex = new HashMap<>();
for (int index = 0; index < nbIndices; index++) {
exclusionsPerIndex.put("index-" + index, randomSubsetOf(Arrays.asList("fdt", "fdx", "nvd", "dvd", "tip", "cfs", "dim")));
}
logger.debug("--> mounting indices");
final Thread[] threads = new Thread[nbIndices];
final AtomicArray<Throwable> throwables = new AtomicArray<>(nbIndices);
final CountDownLatch latch = new CountDownLatch(1);
for (int i = 0; i < threads.length; i++) {
int threadId = i;
final String indexName = "index-" + threadId;
final Thread thread = new Thread(() -> {
try {
latch.await();
final Settings restoredIndexSettings = Settings.builder()
.put(SNAPSHOT_CACHE_ENABLED_SETTING.getKey(), true)
.put(SNAPSHOT_CACHE_PREWARM_ENABLED_SETTING.getKey(), true)
.putList(SearchableSnapshots.SNAPSHOT_CACHE_EXCLUDED_FILE_TYPES_SETTING.getKey(), exclusionsPerIndex.get(indexName))
.build();
logger.info("--> mounting snapshot as index [{}] with settings {}", indexName, restoredIndexSettings);
final RestoreSnapshotResponse restoreSnapshotResponse = client().execute(
MountSearchableSnapshotAction.INSTANCE,
new MountSearchableSnapshotRequest(
indexName,
"repository",
"snapshot",
indexName,
restoredIndexSettings,
Strings.EMPTY_ARRAY,
true
)
).get();
ensureGreen(indexName);
assertThat(restoreSnapshotResponse.status(), is(RestStatus.OK));
assertThat(restoreSnapshotResponse.getRestoreInfo().successfulShards(), equalTo(shardsPerIndex.get(indexName)));
assertThat(restoreSnapshotResponse.getRestoreInfo().failedShards(), equalTo(0));
assertHitCount(client().prepareSearch(indexName).setSize(0).setTrackTotalHits(true).get(), docsPerIndex.get(indexName));
final GetSettingsResponse getSettingsResponse = client().admin().indices().prepareGetSettings(indexName).get();
assertThat(getSettingsResponse.getSetting(indexName, SNAPSHOT_CACHE_ENABLED_SETTING.getKey()), equalTo("true"));
assertThat(getSettingsResponse.getSetting(indexName, SNAPSHOT_CACHE_PREWARM_ENABLED_SETTING.getKey()), equalTo("true"));
} catch (Throwable t) {
logger.error(() -> new ParameterizedMessage("Fail to mount snapshot for index [{}]", indexName), t);
throwables.setOnce(threadId, t);
}
});
threads[threadId] = thread;
thread.start();
}
latch.countDown();
for (Thread thread : threads) {
thread.join();
}
assertThat("Failed to mount snapshot as indices", throwables.asList(), emptyCollectionOf(Throwable.class));
logger.debug("--> waiting for background cache prewarming to");
assertBusy(() -> {
final ThreadPool threadPool = getInstanceFromNode(ThreadPool.class);
assertThat(threadPool.info(SearchableSnapshotsConstants.CACHE_FETCH_ASYNC_THREAD_POOL_NAME).getQueueSize(), nullValue());
assertThat(threadPool.info(SearchableSnapshotsConstants.CACHE_PREWARMING_THREAD_POOL_NAME).getQueueSize(), nullValue());
});
logger.debug("--> loading snapshot metadata");
final Repository repository = getInstanceFromNode(RepositoriesService.class).repository("repository");
assertThat(repository, instanceOf(BlobStoreRepository.class));
final BlobStoreRepository blobStoreRepository = (BlobStoreRepository) repository;
assertThat("Repository blobs tracking disabled", tracker.enabled.compareAndSet(true, false), is(true));
final RepositoryData repositoryData = ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository);
final SnapshotId snapshotId = createSnapshotResponse.getSnapshotInfo().snapshotId();
logger.debug("--> checking prewarmed files");
for (int index = 0; index < nbIndices; index++) {
final String indexName = "index-" + index;
final IndexId indexId = repositoryData.resolveIndexId(indexName);
assertThat("Index id not found in snapshot for index " + indexName, indexId, notNullValue());
for (int shard = 0; shard < shardsPerIndex.get(indexName); shard++) {
logger.debug("--> loading shard snapshot metadata for index [{}][{}][{}]", indexName, indexId, shard);
final BlobStoreIndexShardSnapshot snapshot = blobStoreRepository.loadShardSnapshot(
blobStoreRepository.shardContainer(indexId, shard),
snapshotId
);
final List<BlobStoreIndexShardSnapshot.FileInfo> expectedPrewarmedBlobs = snapshot.indexFiles()
.stream()
.filter(file -> file.metadata().hashEqualsContents() == false)
.filter(file -> exclusionsPerIndex.get(indexName).contains(IndexFileNames.getExtension(file.physicalName())) == false)
.collect(Collectors.toList());
for (BlobStoreIndexShardSnapshot.FileInfo expectedPrewarmedBlob : expectedPrewarmedBlobs) {
for (int part = 0; part < expectedPrewarmedBlob.numberOfParts(); part++) {
final String blobName = expectedPrewarmedBlob.partName(part);
long actualBytesRead = tracker.totalBytesRead(blobName);
long expectedBytesRead = expectedPrewarmedBlob.partBytes(part);
assertThat("Blob [" + blobName + "] not fully warmed", actualBytesRead, greaterThanOrEqualTo(expectedBytesRead));
}
}
}
}
}
/**
* A plugin that allows to track the read operations on blobs
*/
public static class TrackingRepositoryPlugin extends Plugin implements RepositoryPlugin {
private final ConcurrentHashMap<String, Long> files = new ConcurrentHashMap<>();
private final AtomicBoolean enabled = new AtomicBoolean(true);
long totalFilesRead() {
return files.size();
}
long totalBytesRead() {
return files.values().stream().mapToLong(bytesRead -> bytesRead).sum();
}
@Nullable
Long totalBytesRead(String name) {
return files.getOrDefault(name, null);
}
@Override
public Map<String, Repository.Factory> getRepositories(
Environment env,
NamedXContentRegistry namedXContentRegistry,
ClusterService clusterService,
RecoverySettings recoverySettings
) {
return Collections.singletonMap(
"tracking",
(metadata) -> new FsRepository(metadata, env, namedXContentRegistry, clusterService, recoverySettings) {
@Override
protected void assertSnapshotOrGenericThread() {
if (enabled.get()) {
super.assertSnapshotOrGenericThread();
}
}
@Override
protected BlobStore createBlobStore() throws Exception {
final BlobStore delegate = super.createBlobStore();
if (enabled.get()) {
return new BlobStore() {
@Override
public BlobContainer blobContainer(BlobPath path) {
return new TrackingFilesBlobContainer(delegate.blobContainer(path));
}
@Override
public void close() throws IOException {
delegate.close();
}
};
}
return delegate;
}
}
);
}
class TrackingFilesBlobContainer extends FilterBlobContainer {
TrackingFilesBlobContainer(BlobContainer delegate) {
super(delegate);
}
@Override
public InputStream readBlob(String blobName, long position, long length) throws IOException {
return new FilterInputStream(super.readBlob(blobName, position, length)) {
long bytesRead = 0L;
@Override
public int read() throws IOException {
final int result = in.read();
if (result == -1) {
return result;
}
bytesRead += 1L;
return result;
}
@Override
public int read(byte[] b, int offset, int len) throws IOException {
final int result = in.read(b, offset, len);
if (result == -1) {
return result;
}
bytesRead += len;
return result;
}
@Override
public void close() throws IOException {
files.put(blobName, bytesRead);
super.close();
}
};
}
@Override
protected BlobContainer wrapChild(BlobContainer child) {
return new TrackingFilesBlobContainer(child);
}
}
}
}