diff --git a/server/src/main/java/org/elasticsearch/index/engine/NoOpEngine.java b/server/src/main/java/org/elasticsearch/index/engine/NoOpEngine.java index c6e3f0342bc..fc6783969a6 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/NoOpEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/NoOpEngine.java @@ -54,7 +54,7 @@ public final class NoOpEngine extends ReadOnlyEngine { private final DocsStats docsStats; public NoOpEngine(EngineConfig config) { - super(config, null, null, true, Function.identity()); + super(config, null, null, true, Function.identity(), true); this.segmentsStats = new SegmentsStats(); Directory directory = store.directory(); try (DirectoryReader reader = openDirectory(directory, config.getIndexSettings().isSoftDeleteEnabled())) { diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java index 9c8858a91ff..7af36616084 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -60,7 +60,7 @@ import java.util.stream.Stream; * Note: this engine can be opened side-by-side with a read-write engine but will not reflect any changes made to the read-write * engine. * - * @see #ReadOnlyEngine(EngineConfig, SeqNoStats, TranslogStats, boolean, Function) + * @see #ReadOnlyEngine(EngineConfig, SeqNoStats, TranslogStats, boolean, Function, boolean) */ public class ReadOnlyEngine extends Engine { @@ -78,6 +78,7 @@ public class ReadOnlyEngine extends Engine { private final RamAccountingRefreshListener refreshListener; private final SafeCommitInfo safeCommitInfo; private final CompletionStatsCache completionStatsCache; + private final boolean requireCompleteHistory; protected volatile TranslogStats translogStats; @@ -92,11 +93,13 @@ public class ReadOnlyEngine extends Engine { * @param obtainLock if true this engine will try to obtain the {@link IndexWriter#WRITE_LOCK_NAME} lock. Otherwise * the lock won't be obtained * @param readerWrapperFunction allows to wrap the index-reader for this engine. + * @param requireCompleteHistory indicates whether this engine permits an incomplete history (i.e. LCP < MSN) */ public ReadOnlyEngine(EngineConfig config, SeqNoStats seqNoStats, TranslogStats translogStats, boolean obtainLock, - Function readerWrapperFunction) { + Function readerWrapperFunction, boolean requireCompleteHistory) { super(config); this.refreshListener = new RamAccountingRefreshListener(engineConfig.getCircuitBreakerService()); + this.requireCompleteHistory = requireCompleteHistory; try { Store store = config.getStore(); store.incRef(); @@ -137,6 +140,9 @@ public class ReadOnlyEngine extends Engine { } protected void ensureMaxSeqNoEqualsToGlobalCheckpoint(final SeqNoStats seqNoStats) { + if (requireCompleteHistory == false) { + return; + } // Before 8.0 the global checkpoint is not known and up to date when the engine is created after // peer recovery, so we only check the max seq no / global checkpoint coherency when the global // checkpoint is different from the unassigned sequence number value. diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index faddeae60df..969b9a7e63d 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -3390,7 +3390,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl // we must create both new read-only engine and new read-write engine under engineMutex to ensure snapshotStoreMetadata, // acquireXXXCommit and close works. final Engine readOnlyEngine = - new ReadOnlyEngine(newEngineConfig(replicationTracker), seqNoStats, translogStats, false, Function.identity()) { + new ReadOnlyEngine(newEngineConfig(replicationTracker), seqNoStats, translogStats, false, Function.identity(), true) { @Override public IndexCommitRef acquireLastIndexCommit(boolean flushFirst) { synchronized (engineMutex) { diff --git a/server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java index c01aca80825..334eccedef2 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java @@ -70,7 +70,7 @@ public class ReadOnlyEngineTests extends EngineTestCase { globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getPersistedLocalCheckpoint())); engine.flush(); readOnlyEngine = new ReadOnlyEngine(engine.engineConfig, engine.getSeqNoStats(globalCheckpoint.get()), - engine.getTranslogStats(), false, Function.identity()); + engine.getTranslogStats(), false, Function.identity(), true); lastSeqNoStats = engine.getSeqNoStats(globalCheckpoint.get()); lastDocIds = getDocIds(engine, true); assertThat(readOnlyEngine.getPersistedLocalCheckpoint(), equalTo(lastSeqNoStats.getLocalCheckpoint())); @@ -138,7 +138,7 @@ public class ReadOnlyEngineTests extends EngineTestCase { globalCheckpoint.set(engine.getPersistedLocalCheckpoint()); engine.syncTranslog(); engine.flushAndClose(); - readOnlyEngine = new ReadOnlyEngine(engine.engineConfig, null , null, true, Function.identity()); + readOnlyEngine = new ReadOnlyEngine(engine.engineConfig, null , null, true, Function.identity(), true); Engine.CommitId flush = readOnlyEngine.flush(randomBoolean(), true); assertEquals(flush, readOnlyEngine.flush(randomBoolean(), true)); } finally { @@ -166,7 +166,7 @@ public class ReadOnlyEngineTests extends EngineTestCase { engine.flushAndClose(); IllegalStateException exception = expectThrows(IllegalStateException.class, - () -> new ReadOnlyEngine(config, null, null, true, Function.identity()) { + () -> new ReadOnlyEngine(config, null, null, true, Function.identity(), true) { @Override protected boolean assertMaxSeqNoEqualsToGlobalCheckpoint(final long maxSeqNo, final long globalCheckpoint) { // we don't want the assertion to trip in this test @@ -185,7 +185,7 @@ public class ReadOnlyEngineTests extends EngineTestCase { try (Store store = createStore()) { EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get); store.createEmpty(Version.CURRENT.luceneVersion); - try (ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(config, null , new TranslogStats(), true, Function.identity())) { + try (ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(config, null , new TranslogStats(), true, Function.identity(), true)) { Class expectedException = LuceneTestCase.TEST_ASSERTS_ENABLED ? AssertionError.class : UnsupportedOperationException.class; expectThrows(expectedException, () -> readOnlyEngine.index(null)); @@ -206,7 +206,7 @@ public class ReadOnlyEngineTests extends EngineTestCase { try (Store store = createStore()) { EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get); store.createEmpty(Version.CURRENT.luceneVersion); - try (ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(config, null , new TranslogStats(), true, Function.identity())) { + try (ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(config, null , new TranslogStats(), true, Function.identity(), true)) { globalCheckpoint.set(randomNonNegativeLong()); try { readOnlyEngine.verifyEngineBeforeIndexClosing(); @@ -236,7 +236,7 @@ public class ReadOnlyEngineTests extends EngineTestCase { engine.syncTranslog(); engine.flushAndClose(); } - try (ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(config, null , null, true, Function.identity())) { + try (ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(config, null , null, true, Function.identity(), true)) { final TranslogHandler translogHandler = new TranslogHandler(xContentRegistry(), config.getIndexSettings()); readOnlyEngine.recoverFromTranslog(translogHandler, randomNonNegativeLong()); @@ -278,7 +278,7 @@ public class ReadOnlyEngineTests extends EngineTestCase { engine.flush(true, true); } - try (ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(config, null, null, true, Function.identity())) { + try (ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(config, null, null, true, Function.identity(), true)) { assertThat(readOnlyEngine.getTranslogStats().estimatedNumberOfOperations(), equalTo(softDeletesEnabled ? 0 : numDocs)); assertThat(readOnlyEngine.getTranslogStats().getUncommittedOperations(), equalTo(0)); assertThat(readOnlyEngine.getTranslogStats().getTranslogSizeInBytes(), greaterThan(0L)); diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 4a99960fbf7..4dcf0734f89 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -4136,7 +4136,7 @@ public class IndexShardTests extends IndexShardTestCase { ShardRouting readonlyShardRouting = newShardRouting(replicaRouting.shardId(), replicaRouting.currentNodeId(), true, ShardRoutingState.INITIALIZING, RecoverySource.ExistingStoreRecoverySource.INSTANCE); final IndexShard readonlyShard = reinitShard(shard, readonlyShardRouting, shard.indexSettings.getIndexMetadata(), - engineConfig -> new ReadOnlyEngine(engineConfig, null, null, true, Function.identity()) { + engineConfig -> new ReadOnlyEngine(engineConfig, null, null, true, Function.identity(), true) { @Override protected void ensureMaxSeqNoEqualsToGlobalCheckpoint(SeqNoStats seqNoStats) { // just like a following shard, we need to skip this check for now. diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java index f814a02a6e6..1cccc00f5c7 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java @@ -204,7 +204,7 @@ public final class SourceOnlySnapshotRepository extends FilterRepository { } catch (IOException e) { throw new UncheckedIOException(e); } - }); + }, true); } /** diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsConstants.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsConstants.java new file mode 100644 index 00000000000..e4504aa1389 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsConstants.java @@ -0,0 +1,37 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.searchablesnapshots; + +import org.elasticsearch.Build; +import org.elasticsearch.common.settings.Settings; + +import static org.elasticsearch.index.IndexModule.INDEX_STORE_TYPE_SETTING; + +public class SearchableSnapshotsConstants { + public static final boolean SEARCHABLE_SNAPSHOTS_FEATURE_ENABLED; + + static { + final String property = System.getProperty("es.searchable_snapshots_feature_enabled"); + if ("true".equals(property)) { + SEARCHABLE_SNAPSHOTS_FEATURE_ENABLED = true; + } else if ("false".equals(property)) { + SEARCHABLE_SNAPSHOTS_FEATURE_ENABLED = false; + } else if (property == null) { + SEARCHABLE_SNAPSHOTS_FEATURE_ENABLED = Build.CURRENT.isSnapshot(); + } else { + throw new IllegalArgumentException( + "expected es.searchable_snapshots_feature_enabled to be unset or [true|false] but was [" + property + "]" + ); + } + } + + public static final String SNAPSHOT_DIRECTORY_FACTORY_KEY = "snapshot"; + + public static boolean isSearchableSnapshotStore(Settings indexSettings) { + return SEARCHABLE_SNAPSHOTS_FEATURE_ENABLED + && SNAPSHOT_DIRECTORY_FACTORY_KEY.equals(INDEX_STORE_TYPE_SETTING.get(indexSettings)); + } +} diff --git a/x-pack/plugin/frozen-indices/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java b/x-pack/plugin/frozen-indices/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java index c5d2849dd9d..fe2c9694aaa 100644 --- a/x-pack/plugin/frozen-indices/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java +++ b/x-pack/plugin/frozen-indices/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java @@ -75,8 +75,8 @@ public final class FrozenEngine extends ReadOnlyEngine { private volatile ElasticsearchDirectoryReader lastOpenedReader; private final ElasticsearchDirectoryReader canMatchReader; - public FrozenEngine(EngineConfig config) { - super(config, null, null, true, Function.identity()); + public FrozenEngine(EngineConfig config, boolean requireCompleteHistory) { + super(config, null, null, true, Function.identity(), requireCompleteHistory); boolean success = false; Directory directory = store.directory(); diff --git a/x-pack/plugin/frozen-indices/src/main/java/org/elasticsearch/xpack/frozen/FrozenIndices.java b/x-pack/plugin/frozen-indices/src/main/java/org/elasticsearch/xpack/frozen/FrozenIndices.java index f8c0e0845b8..fb9c0202c46 100644 --- a/x-pack/plugin/frozen-indices/src/main/java/org/elasticsearch/xpack/frozen/FrozenIndices.java +++ b/x-pack/plugin/frozen-indices/src/main/java/org/elasticsearch/xpack/frozen/FrozenIndices.java @@ -37,6 +37,8 @@ import java.util.List; import java.util.Optional; import java.util.function.Supplier; +import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.isSearchableSnapshotStore; + public class FrozenIndices extends Plugin implements ActionPlugin, EnginePlugin { private boolean transportClientMode; @@ -48,7 +50,8 @@ public class FrozenIndices extends Plugin implements ActionPlugin, EnginePlugin @Override public Optional getEngineFactory(IndexSettings indexSettings) { if (indexSettings.getValue(FrozenEngine.INDEX_FROZEN)) { - return Optional.of(FrozenEngine::new); + final boolean requireCompleteHistory = isSearchableSnapshotStore(indexSettings.getSettings()) == false; + return Optional.of(config -> new FrozenEngine(config, requireCompleteHistory)); } else { return Optional.empty(); } diff --git a/x-pack/plugin/frozen-indices/src/test/java/org/elasticsearch/index/engine/FrozenEngineTests.java b/x-pack/plugin/frozen-indices/src/test/java/org/elasticsearch/index/engine/FrozenEngineTests.java index 8e3c1295857..90489834477 100644 --- a/x-pack/plugin/frozen-indices/src/test/java/org/elasticsearch/index/engine/FrozenEngineTests.java +++ b/x-pack/plugin/frozen-indices/src/test/java/org/elasticsearch/index/engine/FrozenEngineTests.java @@ -48,7 +48,7 @@ public class FrozenEngineTests extends EngineTestCase { int numDocs = Math.min(10, addDocuments(globalCheckpoint, engine)); engine.flushAndClose(); listener.reset(); - try (FrozenEngine frozenEngine = new FrozenEngine(engine.engineConfig)) { + try (FrozenEngine frozenEngine = new FrozenEngine(engine.engineConfig, true)) { assertFalse(frozenEngine.isReaderOpen()); Engine.Searcher searcher = frozenEngine.acquireSearcher("test"); assertEquals(config.getShardId(), ElasticsearchDirectoryReader.getElasticsearchDirectoryReader(searcher @@ -82,7 +82,7 @@ public class FrozenEngineTests extends EngineTestCase { int numDocs = Math.min(10, addDocuments(globalCheckpoint, engine)); engine.flushAndClose(); listener.reset(); - try (FrozenEngine frozenEngine = new FrozenEngine(engine.engineConfig)) { + try (FrozenEngine frozenEngine = new FrozenEngine(engine.engineConfig, true)) { assertFalse(frozenEngine.isReaderOpen()); Engine.Searcher searcher1 = frozenEngine.acquireSearcher("test"); assertTrue(frozenEngine.isReaderOpen()); @@ -118,7 +118,7 @@ public class FrozenEngineTests extends EngineTestCase { addDocuments(globalCheckpoint, engine); engine.flushAndClose(); listener.reset(); - try (FrozenEngine frozenEngine = new FrozenEngine(engine.engineConfig)) { + try (FrozenEngine frozenEngine = new FrozenEngine(engine.engineConfig, true)) { Engine.Searcher searcher = frozenEngine.acquireSearcher("test"); SegmentsStats segmentsStats = frozenEngine.segmentsStats(randomBoolean(), false); assertEquals(frozenEngine.segments(randomBoolean()).size(), segmentsStats.getCount()); @@ -156,7 +156,7 @@ public class FrozenEngineTests extends EngineTestCase { engine.refresh("test"); // pull the reader to account for RAM in the breaker. } final long expectedUse; - try (ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(config, null, null, true, i -> i)) { + try (ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(config, null, null, true, i -> i, true)) { expectedUse = breaker.getUsed(); DocsStats docsStats = readOnlyEngine.docStats(); assertEquals(docs, docsStats.getCount()); @@ -164,7 +164,7 @@ public class FrozenEngineTests extends EngineTestCase { assertTrue(expectedUse > 0); assertEquals(0, breaker.getUsed()); listener.reset(); - try (FrozenEngine frozenEngine = new FrozenEngine(config)) { + try (FrozenEngine frozenEngine = new FrozenEngine(config, true)) { Engine.Searcher searcher = frozenEngine.acquireSearcher("test"); assertEquals(expectedUse, breaker.getUsed()); FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader()).release(); @@ -209,7 +209,7 @@ public class FrozenEngineTests extends EngineTestCase { int numDocsAdded = addDocuments(globalCheckpoint, engine); engine.flushAndClose(); int numIters = randomIntBetween(100, 1000); - try (FrozenEngine frozenEngine = new FrozenEngine(engine.engineConfig)) { + try (FrozenEngine frozenEngine = new FrozenEngine(engine.engineConfig, true)) { int numThreads = randomIntBetween(2, 4); Thread[] threads = new Thread[numThreads]; CyclicBarrier barrier = new CyclicBarrier(numThreads); @@ -305,7 +305,7 @@ public class FrozenEngineTests extends EngineTestCase { addDocuments(globalCheckpoint, engine); engine.flushAndClose(); listener.reset(); - try (FrozenEngine frozenEngine = new FrozenEngine(engine.engineConfig)) { + try (FrozenEngine frozenEngine = new FrozenEngine(engine.engineConfig, true)) { DirectoryReader reader; try (Engine.Searcher searcher = frozenEngine.acquireSearcher("can_match")) { assertNotNull(ElasticsearchDirectoryReader.getElasticsearchDirectoryReader(searcher.getDirectoryReader())); @@ -350,7 +350,7 @@ public class FrozenEngineTests extends EngineTestCase { totalDocs = searcher.search(new MatchAllDocsQuery(), Integer.MAX_VALUE).scoreDocs.length; } } - try (FrozenEngine frozenEngine = new FrozenEngine(config)) { + try (FrozenEngine frozenEngine = new FrozenEngine(config, true)) { try (Engine.Searcher searcher = frozenEngine.acquireSearcher("test")) { TopDocs topDocs = searcher.search(new MatchAllDocsQuery(), Integer.MAX_VALUE); assertThat(topDocs.scoreDocs.length, equalTo(totalDocs)); diff --git a/x-pack/plugin/frozen-indices/src/test/java/org/elasticsearch/index/engine/FrozenIndexShardTests.java b/x-pack/plugin/frozen-indices/src/test/java/org/elasticsearch/index/engine/FrozenIndexShardTests.java index 70c16db6fcf..a61fcd8e537 100644 --- a/x-pack/plugin/frozen-indices/src/test/java/org/elasticsearch/index/engine/FrozenIndexShardTests.java +++ b/x-pack/plugin/frozen-indices/src/test/java/org/elasticsearch/index/engine/FrozenIndexShardTests.java @@ -30,12 +30,12 @@ public class FrozenIndexShardTests extends IndexShardTestCase { final ShardRouting shardRouting = indexShard.routingEntry(); IndexShard frozenShard = reinitShard(indexShard, ShardRoutingHelper.initWithSameId(shardRouting, shardRouting.primary() ? RecoverySource.ExistingStoreRecoverySource.INSTANCE : RecoverySource.PeerRecoverySource.INSTANCE - ), indexShard.indexSettings().getIndexMetadata(), FrozenEngine::new); + ), indexShard.indexSettings().getIndexMetadata(), config -> new FrozenEngine(config, true)); recoverShardFromStore(frozenShard); assertThat(frozenShard.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(frozenShard.seqNoStats().getMaxSeqNo())); assertDocCount(frozenShard, 3); - IndexShard replica = newShard(false, Settings.EMPTY, FrozenEngine::new); + IndexShard replica = newShard(false, Settings.EMPTY, config -> new FrozenEngine(config, true)); recoverReplica(replica, frozenShard, true); assertDocCount(replica, 3); closeShards(frozenShard, replica); diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotIndexEventListener.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotIndexEventListener.java index 887f942f235..71183bfba38 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotIndexEventListener.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotIndexEventListener.java @@ -19,7 +19,7 @@ import org.elasticsearch.threadpool.ThreadPool; import java.nio.file.Path; -import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.isSearchableSnapshotStore; +import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.isSearchableSnapshotStore; public class SearchableSnapshotIndexEventListener implements IndexEventListener { diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java index e23a8e7876a..81b44d993a5 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java @@ -6,7 +6,6 @@ package org.elasticsearch.xpack.searchablesnapshots; import org.apache.lucene.util.SetOnce; -import org.elasticsearch.Build; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.client.Client; @@ -68,30 +67,14 @@ import java.util.function.Function; import java.util.function.Supplier; import static java.util.Collections.emptyList; -import static org.elasticsearch.index.IndexModule.INDEX_STORE_TYPE_SETTING; +import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.SEARCHABLE_SNAPSHOTS_FEATURE_ENABLED; +import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.SNAPSHOT_DIRECTORY_FACTORY_KEY; /** * Plugin for Searchable Snapshots feature */ public class SearchableSnapshots extends Plugin implements IndexStorePlugin, EnginePlugin, ActionPlugin, ClusterPlugin { - private static final boolean SEARCHABLE_SNAPSHOTS_FEATURE_ENABLED; - - static { - final String property = System.getProperty("es.searchable_snapshots_feature_enabled"); - if ("true".equals(property)) { - SEARCHABLE_SNAPSHOTS_FEATURE_ENABLED = true; - } else if ("false".equals(property)) { - SEARCHABLE_SNAPSHOTS_FEATURE_ENABLED = false; - } else if (property == null) { - SEARCHABLE_SNAPSHOTS_FEATURE_ENABLED = Build.CURRENT.isSnapshot(); - } else { - throw new IllegalArgumentException( - "expected es.searchable_snapshots_feature_enabled to be unset or [true|false] but was [" + property + "]" - ); - } - } - public static final Setting SNAPSHOT_REPOSITORY_SETTING = Setting.simpleString( "index.store.snapshot.repository_name", Setting.Property.IndexScope, @@ -132,8 +115,6 @@ public class SearchableSnapshots extends Plugin implements IndexStorePlugin, Eng Setting.Property.NodeScope ); - public static final String SNAPSHOT_DIRECTORY_FACTORY_KEY = "snapshot"; - private volatile Supplier repositoriesServiceSupplier; private final SetOnce cacheService = new SetOnce<>(); private final Settings settings; @@ -197,7 +178,7 @@ public class SearchableSnapshots extends Plugin implements IndexStorePlugin, Eng @Override public void onIndexModule(IndexModule indexModule) { - if (SEARCHABLE_SNAPSHOTS_FEATURE_ENABLED && isSearchableSnapshotStore(indexModule.getSettings())) { + if (SearchableSnapshotsConstants.isSearchableSnapshotStore(indexModule.getSettings())) { indexModule.addIndexEventListener(new SearchableSnapshotIndexEventListener()); } } @@ -219,10 +200,11 @@ public class SearchableSnapshots extends Plugin implements IndexStorePlugin, Eng @Override public Optional getEngineFactory(IndexSettings indexSettings) { - if (SEARCHABLE_SNAPSHOTS_FEATURE_ENABLED - && isSearchableSnapshotStore(indexSettings.getSettings()) + if (SearchableSnapshotsConstants.isSearchableSnapshotStore(indexSettings.getSettings()) && indexSettings.getSettings().getAsBoolean("index.frozen", false) == false) { - return Optional.of(engineConfig -> new ReadOnlyEngine(engineConfig, null, new TranslogStats(), false, Function.identity())); + return Optional.of( + engineConfig -> new ReadOnlyEngine(engineConfig, null, new TranslogStats(), false, Function.identity(), false) + ); } return Optional.empty(); } @@ -271,7 +253,4 @@ public class SearchableSnapshots extends Plugin implements IndexStorePlugin, Eng } } - static boolean isSearchableSnapshotStore(Settings indexSettings) { - return SNAPSHOT_DIRECTORY_FACTORY_KEY.equals(INDEX_STORE_TYPE_SETTING.get(indexSettings)); - } } diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/AbstractTransportSearchableSnapshotsAction.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/AbstractTransportSearchableSnapshotsAction.java index 4f53193c6e4..fde972606d1 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/AbstractTransportSearchableSnapshotsAction.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/AbstractTransportSearchableSnapshotsAction.java @@ -26,15 +26,14 @@ import org.elasticsearch.indices.IndicesService; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots; +import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Objects; -import static org.elasticsearch.index.IndexModule.INDEX_STORE_TYPE_SETTING; import static org.elasticsearch.index.store.SearchableSnapshotDirectory.unwrapDirectory; -import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_DIRECTORY_FACTORY_KEY; public abstract class AbstractTransportSearchableSnapshotsAction< Request extends BroadcastRequest, @@ -94,7 +93,7 @@ public abstract class AbstractTransportSearchableSnapshotsAction< IndexMetadata indexMetaData = state.metadata().index(concreteIndex); if (indexMetaData != null) { Settings indexSettings = indexMetaData.getSettings(); - if (INDEX_STORE_TYPE_SETTING.get(indexSettings).equals(SNAPSHOT_DIRECTORY_FACTORY_KEY)) { + if (SearchableSnapshotsConstants.isSearchableSnapshotStore(indexSettings)) { searchableSnapshotIndices.add(concreteIndex); } } diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/TransportMountSearchableSnapshotAction.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/TransportMountSearchableSnapshotAction.java index 2ed2258e3de..ab5146e6a04 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/TransportMountSearchableSnapshotAction.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/TransportMountSearchableSnapshotAction.java @@ -36,6 +36,7 @@ import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotA import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotRequest; import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotAllocator; import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots; +import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants; import java.io.IOException; import java.util.Map; @@ -112,7 +113,7 @@ public class TransportMountSearchableSnapshotAction extends TransportMasterNodeA .put(SearchableSnapshots.SNAPSHOT_SNAPSHOT_NAME_SETTING.getKey(), snapshotId.getName()) .put(SearchableSnapshots.SNAPSHOT_SNAPSHOT_ID_SETTING.getKey(), snapshotId.getUUID()) .put(SearchableSnapshots.SNAPSHOT_INDEX_ID_SETTING.getKey(), indexId.getId()) - .put(INDEX_STORE_TYPE_SETTING.getKey(), SearchableSnapshots.SNAPSHOT_DIRECTORY_FACTORY_KEY) + .put(INDEX_STORE_TYPE_SETTING.getKey(), SearchableSnapshotsConstants.SNAPSHOT_DIRECTORY_FACTORY_KEY) .put(IndexMetadata.SETTING_BLOCKS_WRITE, true) .put(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_SETTING.getKey(), SearchableSnapshotAllocator.ALLOCATOR_NAME) .build(); diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java index 3f3a41d4946..8ea9cb531e5 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java @@ -43,13 +43,15 @@ import java.util.HashSet; import java.util.List; import java.util.Locale; import java.util.Set; +import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; import java.util.stream.StreamSupport; import static org.elasticsearch.index.IndexSettings.INDEX_SOFT_DELETES_SETTING; import static org.elasticsearch.index.query.QueryBuilders.matchQuery; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; -import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_DIRECTORY_FACTORY_KEY; +import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.SNAPSHOT_DIRECTORY_FACTORY_KEY; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.hasSize; @@ -207,6 +209,86 @@ public class SearchableSnapshotsIntegTests extends BaseSearchableSnapshotsIntegT assertSearchableSnapshotStats(restoredIndexName, cacheEnabled, nonCachedExtensions); } + public void testCanMountSnapshotTakenWhileConcurrentlyIndexing() throws Exception { + final String fsRepoName = randomAlphaOfLength(10); + final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + final String restoredIndexName = randomBoolean() ? indexName : randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + final String snapshotName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + + final Path repo = randomRepoPath(); + assertAcked( + client().admin() + .cluster() + .preparePutRepository(fsRepoName) + .setType("fs") + .setSettings(Settings.builder().put("location", repo).put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)) + ); + + assertAcked(prepareCreate(indexName, Settings.builder().put(INDEX_SOFT_DELETES_SETTING.getKey(), true))); + + final CyclicBarrier cyclicBarrier = new CyclicBarrier(2); + + final Thread indexingThead = new Thread(() -> { + final List indexRequestBuilders = new ArrayList<>(); + for (int i = between(10, 10_000); i >= 0; i--) { + indexRequestBuilders.add(client().prepareIndex(indexName, "_doc").setSource("foo", randomBoolean() ? "bar" : "baz")); + } + try { + cyclicBarrier.await(); + indexRandom(true, true, indexRequestBuilders); + } catch (InterruptedException | BrokenBarrierException e) { + throw new AssertionError(e); + } + refresh(indexName); + assertThat( + client().admin().indices().prepareForceMerge(indexName).setOnlyExpungeDeletes(true).setFlush(true).get().getFailedShards(), + equalTo(0) + ); + }); + + final Thread snapshotThread = new Thread(() -> { + try { + cyclicBarrier.await(); + } catch (InterruptedException | BrokenBarrierException e) { + throw new AssertionError(e); + } + CreateSnapshotResponse createSnapshotResponse = client().admin() + .cluster() + .prepareCreateSnapshot(fsRepoName, snapshotName) + .setWaitForCompletion(true) + .get(); + final SnapshotInfo snapshotInfo = createSnapshotResponse.getSnapshotInfo(); + assertThat(snapshotInfo.successfulShards(), greaterThan(0)); + assertThat(snapshotInfo.successfulShards(), equalTo(snapshotInfo.totalShards())); + }); + + indexingThead.start(); + snapshotThread.start(); + indexingThead.join(); + snapshotThread.join(); + + assertAcked(client().admin().indices().prepareDelete(indexName)); + + logger.info("--> restoring index [{}]", restoredIndexName); + + Settings.Builder indexSettingsBuilder = Settings.builder() + .put(SearchableSnapshots.SNAPSHOT_CACHE_ENABLED_SETTING.getKey(), true) + .put(IndexSettings.INDEX_CHECK_ON_STARTUP.getKey(), Boolean.FALSE.toString()); + final MountSearchableSnapshotRequest req = new MountSearchableSnapshotRequest( + restoredIndexName, + fsRepoName, + snapshotName, + indexName, + indexSettingsBuilder.build(), + Strings.EMPTY_ARRAY, + true + ); + + final RestoreSnapshotResponse restoreSnapshotResponse = client().execute(MountSearchableSnapshotAction.INSTANCE, req).get(); + assertThat(restoreSnapshotResponse.getRestoreInfo().failedShards(), equalTo(0)); + ensureGreen(restoredIndexName); + } + private void assertRecovered(String indexName, TotalHits originalAllHits, TotalHits originalBarHits) throws Exception { final Thread[] threads = new Thread[between(1, 5)]; final AtomicArray allHits = new AtomicArray<>(threads.length);