Allow searching of snapshot taken while indexing (#55511)

Today a read-only engine requires a complete history of operations, in the
sense that its local checkpoint must equal its maximum sequence number. This is
a valid check for read-only engines that were obtained by closing an index
since closing an index waits for all in-flight operations to complete. However
a snapshot may not have this property if it was taken while indexing was
ongoing, but that's ok.

This commit weakens the check for a complete history to exclude the case of a
searchable snapshot.

Relates #50999
This commit is contained in:
David Turner 2020-04-21 12:10:59 +01:00
parent 0f7917b94b
commit be60d50452
16 changed files with 167 additions and 60 deletions

View File

@ -54,7 +54,7 @@ public final class NoOpEngine extends ReadOnlyEngine {
private final DocsStats docsStats;
public NoOpEngine(EngineConfig config) {
super(config, null, null, true, Function.identity());
super(config, null, null, true, Function.identity(), true);
this.segmentsStats = new SegmentsStats();
Directory directory = store.directory();
try (DirectoryReader reader = openDirectory(directory, config.getIndexSettings().isSoftDeleteEnabled())) {

View File

@ -60,7 +60,7 @@ import java.util.stream.Stream;
* Note: this engine can be opened side-by-side with a read-write engine but will not reflect any changes made to the read-write
* engine.
*
* @see #ReadOnlyEngine(EngineConfig, SeqNoStats, TranslogStats, boolean, Function)
* @see #ReadOnlyEngine(EngineConfig, SeqNoStats, TranslogStats, boolean, Function, boolean)
*/
public class ReadOnlyEngine extends Engine {
@ -78,6 +78,7 @@ public class ReadOnlyEngine extends Engine {
private final RamAccountingRefreshListener refreshListener;
private final SafeCommitInfo safeCommitInfo;
private final CompletionStatsCache completionStatsCache;
private final boolean requireCompleteHistory;
protected volatile TranslogStats translogStats;
@ -92,11 +93,13 @@ public class ReadOnlyEngine extends Engine {
* @param obtainLock if <code>true</code> this engine will try to obtain the {@link IndexWriter#WRITE_LOCK_NAME} lock. Otherwise
* the lock won't be obtained
* @param readerWrapperFunction allows to wrap the index-reader for this engine.
* @param requireCompleteHistory indicates whether this engine permits an incomplete history (i.e. LCP &lt; MSN)
*/
public ReadOnlyEngine(EngineConfig config, SeqNoStats seqNoStats, TranslogStats translogStats, boolean obtainLock,
Function<DirectoryReader, DirectoryReader> readerWrapperFunction) {
Function<DirectoryReader, DirectoryReader> readerWrapperFunction, boolean requireCompleteHistory) {
super(config);
this.refreshListener = new RamAccountingRefreshListener(engineConfig.getCircuitBreakerService());
this.requireCompleteHistory = requireCompleteHistory;
try {
Store store = config.getStore();
store.incRef();
@ -137,6 +140,9 @@ public class ReadOnlyEngine extends Engine {
}
protected void ensureMaxSeqNoEqualsToGlobalCheckpoint(final SeqNoStats seqNoStats) {
if (requireCompleteHistory == false) {
return;
}
// Before 8.0 the global checkpoint is not known and up to date when the engine is created after
// peer recovery, so we only check the max seq no / global checkpoint coherency when the global
// checkpoint is different from the unassigned sequence number value.

View File

@ -3390,7 +3390,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
// we must create both new read-only engine and new read-write engine under engineMutex to ensure snapshotStoreMetadata,
// acquireXXXCommit and close works.
final Engine readOnlyEngine =
new ReadOnlyEngine(newEngineConfig(replicationTracker), seqNoStats, translogStats, false, Function.identity()) {
new ReadOnlyEngine(newEngineConfig(replicationTracker), seqNoStats, translogStats, false, Function.identity(), true) {
@Override
public IndexCommitRef acquireLastIndexCommit(boolean flushFirst) {
synchronized (engineMutex) {

View File

@ -70,7 +70,7 @@ public class ReadOnlyEngineTests extends EngineTestCase {
globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getPersistedLocalCheckpoint()));
engine.flush();
readOnlyEngine = new ReadOnlyEngine(engine.engineConfig, engine.getSeqNoStats(globalCheckpoint.get()),
engine.getTranslogStats(), false, Function.identity());
engine.getTranslogStats(), false, Function.identity(), true);
lastSeqNoStats = engine.getSeqNoStats(globalCheckpoint.get());
lastDocIds = getDocIds(engine, true);
assertThat(readOnlyEngine.getPersistedLocalCheckpoint(), equalTo(lastSeqNoStats.getLocalCheckpoint()));
@ -138,7 +138,7 @@ public class ReadOnlyEngineTests extends EngineTestCase {
globalCheckpoint.set(engine.getPersistedLocalCheckpoint());
engine.syncTranslog();
engine.flushAndClose();
readOnlyEngine = new ReadOnlyEngine(engine.engineConfig, null , null, true, Function.identity());
readOnlyEngine = new ReadOnlyEngine(engine.engineConfig, null , null, true, Function.identity(), true);
Engine.CommitId flush = readOnlyEngine.flush(randomBoolean(), true);
assertEquals(flush, readOnlyEngine.flush(randomBoolean(), true));
} finally {
@ -166,7 +166,7 @@ public class ReadOnlyEngineTests extends EngineTestCase {
engine.flushAndClose();
IllegalStateException exception = expectThrows(IllegalStateException.class,
() -> new ReadOnlyEngine(config, null, null, true, Function.identity()) {
() -> new ReadOnlyEngine(config, null, null, true, Function.identity(), true) {
@Override
protected boolean assertMaxSeqNoEqualsToGlobalCheckpoint(final long maxSeqNo, final long globalCheckpoint) {
// we don't want the assertion to trip in this test
@ -185,7 +185,7 @@ public class ReadOnlyEngineTests extends EngineTestCase {
try (Store store = createStore()) {
EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get);
store.createEmpty(Version.CURRENT.luceneVersion);
try (ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(config, null , new TranslogStats(), true, Function.identity())) {
try (ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(config, null , new TranslogStats(), true, Function.identity(), true)) {
Class<? extends Throwable> expectedException = LuceneTestCase.TEST_ASSERTS_ENABLED ? AssertionError.class :
UnsupportedOperationException.class;
expectThrows(expectedException, () -> readOnlyEngine.index(null));
@ -206,7 +206,7 @@ public class ReadOnlyEngineTests extends EngineTestCase {
try (Store store = createStore()) {
EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get);
store.createEmpty(Version.CURRENT.luceneVersion);
try (ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(config, null , new TranslogStats(), true, Function.identity())) {
try (ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(config, null , new TranslogStats(), true, Function.identity(), true)) {
globalCheckpoint.set(randomNonNegativeLong());
try {
readOnlyEngine.verifyEngineBeforeIndexClosing();
@ -236,7 +236,7 @@ public class ReadOnlyEngineTests extends EngineTestCase {
engine.syncTranslog();
engine.flushAndClose();
}
try (ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(config, null , null, true, Function.identity())) {
try (ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(config, null , null, true, Function.identity(), true)) {
final TranslogHandler translogHandler = new TranslogHandler(xContentRegistry(), config.getIndexSettings());
readOnlyEngine.recoverFromTranslog(translogHandler, randomNonNegativeLong());
@ -278,7 +278,7 @@ public class ReadOnlyEngineTests extends EngineTestCase {
engine.flush(true, true);
}
try (ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(config, null, null, true, Function.identity())) {
try (ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(config, null, null, true, Function.identity(), true)) {
assertThat(readOnlyEngine.getTranslogStats().estimatedNumberOfOperations(), equalTo(softDeletesEnabled ? 0 : numDocs));
assertThat(readOnlyEngine.getTranslogStats().getUncommittedOperations(), equalTo(0));
assertThat(readOnlyEngine.getTranslogStats().getTranslogSizeInBytes(), greaterThan(0L));

View File

@ -4136,7 +4136,7 @@ public class IndexShardTests extends IndexShardTestCase {
ShardRouting readonlyShardRouting = newShardRouting(replicaRouting.shardId(), replicaRouting.currentNodeId(), true,
ShardRoutingState.INITIALIZING, RecoverySource.ExistingStoreRecoverySource.INSTANCE);
final IndexShard readonlyShard = reinitShard(shard, readonlyShardRouting, shard.indexSettings.getIndexMetadata(),
engineConfig -> new ReadOnlyEngine(engineConfig, null, null, true, Function.identity()) {
engineConfig -> new ReadOnlyEngine(engineConfig, null, null, true, Function.identity(), true) {
@Override
protected void ensureMaxSeqNoEqualsToGlobalCheckpoint(SeqNoStats seqNoStats) {
// just like a following shard, we need to skip this check for now.

View File

@ -204,7 +204,7 @@ public final class SourceOnlySnapshotRepository extends FilterRepository {
} catch (IOException e) {
throw new UncheckedIOException(e);
}
});
}, true);
}
/**

View File

@ -0,0 +1,37 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.searchablesnapshots;
import org.elasticsearch.Build;
import org.elasticsearch.common.settings.Settings;
import static org.elasticsearch.index.IndexModule.INDEX_STORE_TYPE_SETTING;
public class SearchableSnapshotsConstants {
public static final boolean SEARCHABLE_SNAPSHOTS_FEATURE_ENABLED;
static {
final String property = System.getProperty("es.searchable_snapshots_feature_enabled");
if ("true".equals(property)) {
SEARCHABLE_SNAPSHOTS_FEATURE_ENABLED = true;
} else if ("false".equals(property)) {
SEARCHABLE_SNAPSHOTS_FEATURE_ENABLED = false;
} else if (property == null) {
SEARCHABLE_SNAPSHOTS_FEATURE_ENABLED = Build.CURRENT.isSnapshot();
} else {
throw new IllegalArgumentException(
"expected es.searchable_snapshots_feature_enabled to be unset or [true|false] but was [" + property + "]"
);
}
}
public static final String SNAPSHOT_DIRECTORY_FACTORY_KEY = "snapshot";
public static boolean isSearchableSnapshotStore(Settings indexSettings) {
return SEARCHABLE_SNAPSHOTS_FEATURE_ENABLED
&& SNAPSHOT_DIRECTORY_FACTORY_KEY.equals(INDEX_STORE_TYPE_SETTING.get(indexSettings));
}
}

View File

@ -75,8 +75,8 @@ public final class FrozenEngine extends ReadOnlyEngine {
private volatile ElasticsearchDirectoryReader lastOpenedReader;
private final ElasticsearchDirectoryReader canMatchReader;
public FrozenEngine(EngineConfig config) {
super(config, null, null, true, Function.identity());
public FrozenEngine(EngineConfig config, boolean requireCompleteHistory) {
super(config, null, null, true, Function.identity(), requireCompleteHistory);
boolean success = false;
Directory directory = store.directory();

View File

@ -37,6 +37,8 @@ import java.util.List;
import java.util.Optional;
import java.util.function.Supplier;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.isSearchableSnapshotStore;
public class FrozenIndices extends Plugin implements ActionPlugin, EnginePlugin {
private boolean transportClientMode;
@ -48,7 +50,8 @@ public class FrozenIndices extends Plugin implements ActionPlugin, EnginePlugin
@Override
public Optional<EngineFactory> getEngineFactory(IndexSettings indexSettings) {
if (indexSettings.getValue(FrozenEngine.INDEX_FROZEN)) {
return Optional.of(FrozenEngine::new);
final boolean requireCompleteHistory = isSearchableSnapshotStore(indexSettings.getSettings()) == false;
return Optional.of(config -> new FrozenEngine(config, requireCompleteHistory));
} else {
return Optional.empty();
}

View File

@ -48,7 +48,7 @@ public class FrozenEngineTests extends EngineTestCase {
int numDocs = Math.min(10, addDocuments(globalCheckpoint, engine));
engine.flushAndClose();
listener.reset();
try (FrozenEngine frozenEngine = new FrozenEngine(engine.engineConfig)) {
try (FrozenEngine frozenEngine = new FrozenEngine(engine.engineConfig, true)) {
assertFalse(frozenEngine.isReaderOpen());
Engine.Searcher searcher = frozenEngine.acquireSearcher("test");
assertEquals(config.getShardId(), ElasticsearchDirectoryReader.getElasticsearchDirectoryReader(searcher
@ -82,7 +82,7 @@ public class FrozenEngineTests extends EngineTestCase {
int numDocs = Math.min(10, addDocuments(globalCheckpoint, engine));
engine.flushAndClose();
listener.reset();
try (FrozenEngine frozenEngine = new FrozenEngine(engine.engineConfig)) {
try (FrozenEngine frozenEngine = new FrozenEngine(engine.engineConfig, true)) {
assertFalse(frozenEngine.isReaderOpen());
Engine.Searcher searcher1 = frozenEngine.acquireSearcher("test");
assertTrue(frozenEngine.isReaderOpen());
@ -118,7 +118,7 @@ public class FrozenEngineTests extends EngineTestCase {
addDocuments(globalCheckpoint, engine);
engine.flushAndClose();
listener.reset();
try (FrozenEngine frozenEngine = new FrozenEngine(engine.engineConfig)) {
try (FrozenEngine frozenEngine = new FrozenEngine(engine.engineConfig, true)) {
Engine.Searcher searcher = frozenEngine.acquireSearcher("test");
SegmentsStats segmentsStats = frozenEngine.segmentsStats(randomBoolean(), false);
assertEquals(frozenEngine.segments(randomBoolean()).size(), segmentsStats.getCount());
@ -156,7 +156,7 @@ public class FrozenEngineTests extends EngineTestCase {
engine.refresh("test"); // pull the reader to account for RAM in the breaker.
}
final long expectedUse;
try (ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(config, null, null, true, i -> i)) {
try (ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(config, null, null, true, i -> i, true)) {
expectedUse = breaker.getUsed();
DocsStats docsStats = readOnlyEngine.docStats();
assertEquals(docs, docsStats.getCount());
@ -164,7 +164,7 @@ public class FrozenEngineTests extends EngineTestCase {
assertTrue(expectedUse > 0);
assertEquals(0, breaker.getUsed());
listener.reset();
try (FrozenEngine frozenEngine = new FrozenEngine(config)) {
try (FrozenEngine frozenEngine = new FrozenEngine(config, true)) {
Engine.Searcher searcher = frozenEngine.acquireSearcher("test");
assertEquals(expectedUse, breaker.getUsed());
FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader()).release();
@ -209,7 +209,7 @@ public class FrozenEngineTests extends EngineTestCase {
int numDocsAdded = addDocuments(globalCheckpoint, engine);
engine.flushAndClose();
int numIters = randomIntBetween(100, 1000);
try (FrozenEngine frozenEngine = new FrozenEngine(engine.engineConfig)) {
try (FrozenEngine frozenEngine = new FrozenEngine(engine.engineConfig, true)) {
int numThreads = randomIntBetween(2, 4);
Thread[] threads = new Thread[numThreads];
CyclicBarrier barrier = new CyclicBarrier(numThreads);
@ -305,7 +305,7 @@ public class FrozenEngineTests extends EngineTestCase {
addDocuments(globalCheckpoint, engine);
engine.flushAndClose();
listener.reset();
try (FrozenEngine frozenEngine = new FrozenEngine(engine.engineConfig)) {
try (FrozenEngine frozenEngine = new FrozenEngine(engine.engineConfig, true)) {
DirectoryReader reader;
try (Engine.Searcher searcher = frozenEngine.acquireSearcher("can_match")) {
assertNotNull(ElasticsearchDirectoryReader.getElasticsearchDirectoryReader(searcher.getDirectoryReader()));
@ -350,7 +350,7 @@ public class FrozenEngineTests extends EngineTestCase {
totalDocs = searcher.search(new MatchAllDocsQuery(), Integer.MAX_VALUE).scoreDocs.length;
}
}
try (FrozenEngine frozenEngine = new FrozenEngine(config)) {
try (FrozenEngine frozenEngine = new FrozenEngine(config, true)) {
try (Engine.Searcher searcher = frozenEngine.acquireSearcher("test")) {
TopDocs topDocs = searcher.search(new MatchAllDocsQuery(), Integer.MAX_VALUE);
assertThat(topDocs.scoreDocs.length, equalTo(totalDocs));

View File

@ -30,12 +30,12 @@ public class FrozenIndexShardTests extends IndexShardTestCase {
final ShardRouting shardRouting = indexShard.routingEntry();
IndexShard frozenShard = reinitShard(indexShard, ShardRoutingHelper.initWithSameId(shardRouting,
shardRouting.primary() ? RecoverySource.ExistingStoreRecoverySource.INSTANCE : RecoverySource.PeerRecoverySource.INSTANCE
), indexShard.indexSettings().getIndexMetadata(), FrozenEngine::new);
), indexShard.indexSettings().getIndexMetadata(), config -> new FrozenEngine(config, true));
recoverShardFromStore(frozenShard);
assertThat(frozenShard.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(frozenShard.seqNoStats().getMaxSeqNo()));
assertDocCount(frozenShard, 3);
IndexShard replica = newShard(false, Settings.EMPTY, FrozenEngine::new);
IndexShard replica = newShard(false, Settings.EMPTY, config -> new FrozenEngine(config, true));
recoverReplica(replica, frozenShard, true);
assertDocCount(replica, 3);
closeShards(frozenShard, replica);

View File

@ -19,7 +19,7 @@ import org.elasticsearch.threadpool.ThreadPool;
import java.nio.file.Path;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.isSearchableSnapshotStore;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.isSearchableSnapshotStore;
public class SearchableSnapshotIndexEventListener implements IndexEventListener {

View File

@ -6,7 +6,6 @@
package org.elasticsearch.xpack.searchablesnapshots;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.Build;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.client.Client;
@ -68,30 +67,14 @@ import java.util.function.Function;
import java.util.function.Supplier;
import static java.util.Collections.emptyList;
import static org.elasticsearch.index.IndexModule.INDEX_STORE_TYPE_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.SEARCHABLE_SNAPSHOTS_FEATURE_ENABLED;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.SNAPSHOT_DIRECTORY_FACTORY_KEY;
/**
* Plugin for Searchable Snapshots feature
*/
public class SearchableSnapshots extends Plugin implements IndexStorePlugin, EnginePlugin, ActionPlugin, ClusterPlugin {
private static final boolean SEARCHABLE_SNAPSHOTS_FEATURE_ENABLED;
static {
final String property = System.getProperty("es.searchable_snapshots_feature_enabled");
if ("true".equals(property)) {
SEARCHABLE_SNAPSHOTS_FEATURE_ENABLED = true;
} else if ("false".equals(property)) {
SEARCHABLE_SNAPSHOTS_FEATURE_ENABLED = false;
} else if (property == null) {
SEARCHABLE_SNAPSHOTS_FEATURE_ENABLED = Build.CURRENT.isSnapshot();
} else {
throw new IllegalArgumentException(
"expected es.searchable_snapshots_feature_enabled to be unset or [true|false] but was [" + property + "]"
);
}
}
public static final Setting<String> SNAPSHOT_REPOSITORY_SETTING = Setting.simpleString(
"index.store.snapshot.repository_name",
Setting.Property.IndexScope,
@ -132,8 +115,6 @@ public class SearchableSnapshots extends Plugin implements IndexStorePlugin, Eng
Setting.Property.NodeScope
);
public static final String SNAPSHOT_DIRECTORY_FACTORY_KEY = "snapshot";
private volatile Supplier<RepositoriesService> repositoriesServiceSupplier;
private final SetOnce<CacheService> cacheService = new SetOnce<>();
private final Settings settings;
@ -197,7 +178,7 @@ public class SearchableSnapshots extends Plugin implements IndexStorePlugin, Eng
@Override
public void onIndexModule(IndexModule indexModule) {
if (SEARCHABLE_SNAPSHOTS_FEATURE_ENABLED && isSearchableSnapshotStore(indexModule.getSettings())) {
if (SearchableSnapshotsConstants.isSearchableSnapshotStore(indexModule.getSettings())) {
indexModule.addIndexEventListener(new SearchableSnapshotIndexEventListener());
}
}
@ -219,10 +200,11 @@ public class SearchableSnapshots extends Plugin implements IndexStorePlugin, Eng
@Override
public Optional<EngineFactory> getEngineFactory(IndexSettings indexSettings) {
if (SEARCHABLE_SNAPSHOTS_FEATURE_ENABLED
&& isSearchableSnapshotStore(indexSettings.getSettings())
if (SearchableSnapshotsConstants.isSearchableSnapshotStore(indexSettings.getSettings())
&& indexSettings.getSettings().getAsBoolean("index.frozen", false) == false) {
return Optional.of(engineConfig -> new ReadOnlyEngine(engineConfig, null, new TranslogStats(), false, Function.identity()));
return Optional.of(
engineConfig -> new ReadOnlyEngine(engineConfig, null, new TranslogStats(), false, Function.identity(), false)
);
}
return Optional.empty();
}
@ -271,7 +253,4 @@ public class SearchableSnapshots extends Plugin implements IndexStorePlugin, Eng
}
}
static boolean isSearchableSnapshotStore(Settings indexSettings) {
return SNAPSHOT_DIRECTORY_FACTORY_KEY.equals(INDEX_STORE_TYPE_SETTING.get(indexSettings));
}
}

View File

@ -26,15 +26,14 @@ import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots;
import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import static org.elasticsearch.index.IndexModule.INDEX_STORE_TYPE_SETTING;
import static org.elasticsearch.index.store.SearchableSnapshotDirectory.unwrapDirectory;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_DIRECTORY_FACTORY_KEY;
public abstract class AbstractTransportSearchableSnapshotsAction<
Request extends BroadcastRequest<Request>,
@ -94,7 +93,7 @@ public abstract class AbstractTransportSearchableSnapshotsAction<
IndexMetadata indexMetaData = state.metadata().index(concreteIndex);
if (indexMetaData != null) {
Settings indexSettings = indexMetaData.getSettings();
if (INDEX_STORE_TYPE_SETTING.get(indexSettings).equals(SNAPSHOT_DIRECTORY_FACTORY_KEY)) {
if (SearchableSnapshotsConstants.isSearchableSnapshotStore(indexSettings)) {
searchableSnapshotIndices.add(concreteIndex);
}
}

View File

@ -36,6 +36,7 @@ import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotA
import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotRequest;
import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotAllocator;
import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots;
import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants;
import java.io.IOException;
import java.util.Map;
@ -112,7 +113,7 @@ public class TransportMountSearchableSnapshotAction extends TransportMasterNodeA
.put(SearchableSnapshots.SNAPSHOT_SNAPSHOT_NAME_SETTING.getKey(), snapshotId.getName())
.put(SearchableSnapshots.SNAPSHOT_SNAPSHOT_ID_SETTING.getKey(), snapshotId.getUUID())
.put(SearchableSnapshots.SNAPSHOT_INDEX_ID_SETTING.getKey(), indexId.getId())
.put(INDEX_STORE_TYPE_SETTING.getKey(), SearchableSnapshots.SNAPSHOT_DIRECTORY_FACTORY_KEY)
.put(INDEX_STORE_TYPE_SETTING.getKey(), SearchableSnapshotsConstants.SNAPSHOT_DIRECTORY_FACTORY_KEY)
.put(IndexMetadata.SETTING_BLOCKS_WRITE, true)
.put(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_SETTING.getKey(), SearchableSnapshotAllocator.ALLOCATOR_NAME)
.build();

View File

@ -43,13 +43,15 @@ import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.stream.StreamSupport;
import static org.elasticsearch.index.IndexSettings.INDEX_SOFT_DELETES_SETTING;
import static org.elasticsearch.index.query.QueryBuilders.matchQuery;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_DIRECTORY_FACTORY_KEY;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.SNAPSHOT_DIRECTORY_FACTORY_KEY;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasSize;
@ -207,6 +209,86 @@ public class SearchableSnapshotsIntegTests extends BaseSearchableSnapshotsIntegT
assertSearchableSnapshotStats(restoredIndexName, cacheEnabled, nonCachedExtensions);
}
public void testCanMountSnapshotTakenWhileConcurrentlyIndexing() throws Exception {
final String fsRepoName = randomAlphaOfLength(10);
final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
final String restoredIndexName = randomBoolean() ? indexName : randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
final String snapshotName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
final Path repo = randomRepoPath();
assertAcked(
client().admin()
.cluster()
.preparePutRepository(fsRepoName)
.setType("fs")
.setSettings(Settings.builder().put("location", repo).put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES))
);
assertAcked(prepareCreate(indexName, Settings.builder().put(INDEX_SOFT_DELETES_SETTING.getKey(), true)));
final CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
final Thread indexingThead = new Thread(() -> {
final List<IndexRequestBuilder> indexRequestBuilders = new ArrayList<>();
for (int i = between(10, 10_000); i >= 0; i--) {
indexRequestBuilders.add(client().prepareIndex(indexName, "_doc").setSource("foo", randomBoolean() ? "bar" : "baz"));
}
try {
cyclicBarrier.await();
indexRandom(true, true, indexRequestBuilders);
} catch (InterruptedException | BrokenBarrierException e) {
throw new AssertionError(e);
}
refresh(indexName);
assertThat(
client().admin().indices().prepareForceMerge(indexName).setOnlyExpungeDeletes(true).setFlush(true).get().getFailedShards(),
equalTo(0)
);
});
final Thread snapshotThread = new Thread(() -> {
try {
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
throw new AssertionError(e);
}
CreateSnapshotResponse createSnapshotResponse = client().admin()
.cluster()
.prepareCreateSnapshot(fsRepoName, snapshotName)
.setWaitForCompletion(true)
.get();
final SnapshotInfo snapshotInfo = createSnapshotResponse.getSnapshotInfo();
assertThat(snapshotInfo.successfulShards(), greaterThan(0));
assertThat(snapshotInfo.successfulShards(), equalTo(snapshotInfo.totalShards()));
});
indexingThead.start();
snapshotThread.start();
indexingThead.join();
snapshotThread.join();
assertAcked(client().admin().indices().prepareDelete(indexName));
logger.info("--> restoring index [{}]", restoredIndexName);
Settings.Builder indexSettingsBuilder = Settings.builder()
.put(SearchableSnapshots.SNAPSHOT_CACHE_ENABLED_SETTING.getKey(), true)
.put(IndexSettings.INDEX_CHECK_ON_STARTUP.getKey(), Boolean.FALSE.toString());
final MountSearchableSnapshotRequest req = new MountSearchableSnapshotRequest(
restoredIndexName,
fsRepoName,
snapshotName,
indexName,
indexSettingsBuilder.build(),
Strings.EMPTY_ARRAY,
true
);
final RestoreSnapshotResponse restoreSnapshotResponse = client().execute(MountSearchableSnapshotAction.INSTANCE, req).get();
assertThat(restoreSnapshotResponse.getRestoreInfo().failedShards(), equalTo(0));
ensureGreen(restoredIndexName);
}
private void assertRecovered(String indexName, TotalHits originalAllHits, TotalHits originalBarHits) throws Exception {
final Thread[] threads = new Thread[between(1, 5)];
final AtomicArray<TotalHits> allHits = new AtomicArray<>(threads.length);