Do not warm up searcher in engine constructor (#48605)

With this change, we won't warm up searchers until we externally refresh
an engine. We explicitly refresh before allowing reading from a shard
(i.e., move to post_recovery state) and during resetting. These
guarantees that we have warmed up the engine before exposing the
external searcher.

Another prerequisite for #47186.
This commit is contained in:
Nhat Nguyen 2019-10-30 14:08:35 -04:00
parent c9ead80c31
commit f8ef402027
4 changed files with 112 additions and 32 deletions

View File

@ -670,6 +670,7 @@ public abstract class Engine implements Closeable {
}
Releasable releasable = store::decRef;
try {
assert assertSearcherIsWarmedUp(source, scope);
ReferenceManager<ElasticsearchDirectoryReader> referenceManager = getReferenceManager(scope);
final ElasticsearchDirectoryReader acquire = referenceManager.acquire();
AtomicBoolean released = new AtomicBoolean(false);
@ -705,6 +706,10 @@ public abstract class Engine implements Closeable {
protected abstract ReferenceManager<ElasticsearchDirectoryReader> getReferenceManager(SearcherScope scope);
boolean assertSearcherIsWarmedUp(String source, SearcherScope scope) {
return true;
}
public enum SearcherScope {
EXTERNAL, INTERNAL
}

View File

@ -322,18 +322,13 @@ public class InternalEngine extends Engine {
private static final class ExternalReaderManager extends ReferenceManager<ElasticsearchDirectoryReader> {
private final BiConsumer<ElasticsearchDirectoryReader, ElasticsearchDirectoryReader> refreshListener;
private final ElasticsearchReaderManager internalReaderManager;
private boolean isWarmedUp; //guarded by refreshLock
ExternalReaderManager(ElasticsearchReaderManager internalReaderManager,
BiConsumer<ElasticsearchDirectoryReader, ElasticsearchDirectoryReader> refreshListener) throws IOException {
this.refreshListener = refreshListener;
this.internalReaderManager = internalReaderManager;
ElasticsearchDirectoryReader acquire = internalReaderManager.acquire();
try {
incrementAndNotify(acquire, null);
current = acquire;
} finally {
internalReaderManager.release(acquire);
}
this.current = internalReaderManager.acquire(); // steal the reference without warming up
}
@Override
@ -342,26 +337,25 @@ public class InternalEngine extends Engine {
// it's a save operation since we acquire the reader which incs it's reference but then down the road
// steal it by calling incRef on the "stolen" reader
internalReaderManager.maybeRefreshBlocking();
ElasticsearchDirectoryReader acquire = internalReaderManager.acquire();
try {
if (acquire == referenceToRefresh) {
// nothing has changed - both ref managers share the same instance so we can use reference equality
return null;
} else {
incrementAndNotify(acquire, referenceToRefresh);
return acquire;
final ElasticsearchDirectoryReader newReader = internalReaderManager.acquire();
if (isWarmedUp == false || newReader != referenceToRefresh) {
boolean success = false;
try {
refreshListener.accept(newReader, isWarmedUp ? referenceToRefresh : null);
isWarmedUp = true;
success = true;
} finally {
if (success == false) {
internalReaderManager.release(newReader);
}
}
} finally {
internalReaderManager.release(acquire);
}
}
private void incrementAndNotify(ElasticsearchDirectoryReader reader,
ElasticsearchDirectoryReader previousReader) throws IOException {
reader.incRef(); // steal the reference
try (Closeable c = reader::decRef) {
refreshListener.accept(reader, previousReader);
reader.incRef(); // double inc-ref if we were successful
// nothing has changed - both ref managers share the same instance so we can use reference equality
if (referenceToRefresh == newReader) {
internalReaderManager.release(newReader);
return null;
} else {
return newReader; // steal the reference
}
}
@ -376,7 +370,24 @@ public class InternalEngine extends Engine {
}
@Override
protected void decRef(ElasticsearchDirectoryReader reference) throws IOException { reference.decRef(); }
protected void decRef(ElasticsearchDirectoryReader reference) throws IOException {
reference.decRef();
}
}
@Override
final boolean assertSearcherIsWarmedUp(String source, SearcherScope scope) {
if (scope == SearcherScope.EXTERNAL) {
switch (source) {
// we can access segment_stats while a shard is still in the recovering state.
case "segments":
case "segments_stats":
break;
default:
assert externalReaderManager.isWarmedUp : "searcher was not warmed up yet for source[" + source + "]";
}
}
return true;
}
@Override

View File

@ -2702,6 +2702,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
final Sort indexSort = indexSortSupplier.get();
final Engine.Warmer warmer = reader -> {
assert Thread.holdsLock(mutex) == false : "warming engine under mutex";
assert reader != null;
if (this.warmer != null) {
this.warmer.warm(reader);
}
@ -3413,6 +3414,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
// TODO: add a dedicate recovery stats for the reset translog
});
newEngineReference.get().recoverFromTranslog(translogRunner, globalCheckpoint);
newEngineReference.get().refresh("reset_engine");
synchronized (mutex) {
verifyNotClosed();
IOUtils.close(currentEngineReference.getAndSet(newEngineReference.get()));

View File

@ -204,6 +204,7 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.in;
import static org.hamcrest.Matchers.isIn;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.not;
@ -215,6 +216,7 @@ import static org.mockito.Mockito.when;
public class InternalEngineTests extends EngineTestCase {
public void testVersionMapAfterAutoIDDocument() throws IOException {
engine.refresh("warm_up");
ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField("test"),
new BytesArray("{}".getBytes(Charset.defaultCharset())), null);
Engine.Index operation = randomBoolean() ?
@ -926,6 +928,7 @@ public class InternalEngineTests extends EngineTestCase {
}
public void testSimpleOperations() throws Exception {
engine.refresh("warm_up");
Engine.Searcher searchResult = engine.acquireSearcher("test");
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(0));
searchResult.close();
@ -1103,6 +1106,7 @@ public class InternalEngineTests extends EngineTestCase {
}
public void testSearchResultRelease() throws Exception {
engine.refresh("warm_up");
Engine.Searcher searchResult = engine.acquireSearcher("test");
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(0));
searchResult.close();
@ -2175,7 +2179,7 @@ public class InternalEngineTests extends EngineTestCase {
final int opsOnPrimary = assertOpsOnPrimary(primaryOps, finalReplicaVersion, deletedOnReplica, replicaEngine);
final long currentSeqNo = getSequenceID(replicaEngine,
new Engine.Get(false, false, "type", lastReplicaOp.uid().text(), lastReplicaOp.uid())).v1();
try (Engine.Searcher searcher = engine.acquireSearcher("test")) {
try (Engine.Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)) {
final TotalHitCountCollector collector = new TotalHitCountCollector();
searcher.search(new MatchAllDocsQuery(), collector);
if (collector.getTotalHits() > 0) {
@ -2740,7 +2744,7 @@ public class InternalEngineTests extends EngineTestCase {
}
public void testExtractShardId() {
try (Engine.Searcher test = this.engine.acquireSearcher("test")) {
try (Engine.Searcher test = this.engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)) {
ShardId shardId = ShardUtils.extractShardId(test.getDirectoryReader());
assertNotNull(shardId);
assertEquals(shardId, engine.config().getShardId());
@ -3015,7 +3019,7 @@ public class InternalEngineTests extends EngineTestCase {
engine.close();
try (InternalEngine engine = new InternalEngine(config)) {
engine.skipTranslogRecovery();
try (Engine.Searcher searcher = engine.acquireSearcher("test")) {
try (Engine.Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)) {
TopDocs topDocs = searcher.search(new MatchAllDocsQuery(), randomIntBetween(numDocs, numDocs + 10));
assertThat(topDocs.totalHits.value, equalTo(0L));
}
@ -3058,6 +3062,7 @@ public class InternalEngineTests extends EngineTestCase {
// we need to reuse the engine config unless the parser.mappingModified won't work
engine = new InternalEngine(copy(engine.config(), inSyncGlobalCheckpointSupplier));
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
engine.refresh("warm_up");
assertVisibleCount(engine, numDocs, false);
assertEquals(numDocs, translogHandler.appliedOperations());
@ -3071,6 +3076,7 @@ public class InternalEngineTests extends EngineTestCase {
engine.close();
translogHandler = createTranslogHandler(engine.engineConfig.getIndexSettings());
engine = createEngine(store, primaryTranslogDir, inSyncGlobalCheckpointSupplier);
engine.refresh("warm_up");
assertVisibleCount(engine, numDocs, false);
assertEquals(0, translogHandler.appliedOperations());
@ -3100,6 +3106,7 @@ public class InternalEngineTests extends EngineTestCase {
engine.close();
translogHandler = createTranslogHandler(engine.engineConfig.getIndexSettings());
engine = createEngine(store, primaryTranslogDir, inSyncGlobalCheckpointSupplier);
engine.refresh("warm_up");
try (Engine.Searcher searcher = engine.acquireSearcher("test")) {
TopDocs topDocs = searcher.search(new MatchAllDocsQuery(), numDocs + 1);
assertThat(topDocs.totalHits.value, equalTo(numDocs + 1L));
@ -4491,7 +4498,7 @@ public class InternalEngineTests extends EngineTestCase {
* second is the primary term.
*/
private Tuple<Long, Long> getSequenceID(Engine engine, Engine.Get get) throws EngineException {
try (Engine.Searcher searcher = engine.acquireSearcher("get")) {
try (Engine.Searcher searcher = engine.acquireSearcher("get", Engine.SearcherScope.INTERNAL)) {
final long primaryTerm;
final long seqNo;
DocIdAndSeqNo docIdAndSeqNo = VersionsAndSeqNoResolver.loadDocIdAndSeqNo(searcher.getIndexReader(), get.uid());
@ -4673,7 +4680,7 @@ public class InternalEngineTests extends EngineTestCase {
InternalEngine engine =
// disable merges to make sure that the reader doesn't change unexpectedly during the test
createEngine(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE)) {
engine.refresh("warm_up");
try (Engine.Searcher getSearcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL);
Engine.Searcher searchSearcher = engine.acquireSearcher("test", Engine.SearcherScope.EXTERNAL)) {
assertSameReader(getSearcher, searchSearcher);
@ -5536,7 +5543,7 @@ public class InternalEngineTests extends EngineTestCase {
public void testAcquireSearcherOnClosingEngine() throws Exception {
engine.close();
expectThrows(AlreadyClosedException.class, () -> engine.acquireSearcher("test"));
expectThrows(AlreadyClosedException.class, () -> engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL));
}
public void testNoOpOnClosingEngine() throws Exception {
@ -6195,4 +6202,59 @@ public class InternalEngineTests extends EngineTestCase {
}
}
}
public void testNotWarmUpSearcherInEngineCtor() throws Exception {
try (Store store = createStore()) {
List<ElasticsearchDirectoryReader> warmedUpReaders = new ArrayList<>();
Engine.Warmer warmer = reader -> {
assertNotNull(reader);
assertThat(reader, not(in(warmedUpReaders)));
warmedUpReaders.add(reader);
};
EngineConfig config = engine.config();
final TranslogConfig translogConfig = new TranslogConfig(config.getTranslogConfig().getShardId(),
createTempDir(), config.getTranslogConfig().getIndexSettings(), config.getTranslogConfig().getBigArrays());
EngineConfig configWithWarmer = new EngineConfig(config.getShardId(), config.getAllocationId(), config.getThreadPool(),
config.getIndexSettings(), warmer, store, config.getMergePolicy(), config.getAnalyzer(),
config.getSimilarity(), new CodecService(null, logger), config.getEventListener(), config.getQueryCache(),
config.getQueryCachingPolicy(), translogConfig, config.getFlushMergesAfter(),
config.getExternalRefreshListener(), config.getInternalRefreshListener(), config.getIndexSort(),
config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(),
config.getPrimaryTermSupplier(), config.getTombstoneDocSupplier());
try (InternalEngine engine = createEngine(configWithWarmer)) {
assertThat(warmedUpReaders, empty());
assertThat(expectThrows(Throwable.class, () -> engine.acquireSearcher("test")).getMessage(),
equalTo("searcher was not warmed up yet for source[test]"));
int times = randomIntBetween(1, 10);
for (int i = 0; i < times; i++) {
engine.refresh("test");
}
assertThat(warmedUpReaders, hasSize(1));
try (Engine.Searcher internalSearcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)) {
try (Engine.Searcher externalSearcher = engine.acquireSearcher("test", Engine.SearcherScope.EXTERNAL)) {
assertSame(internalSearcher.getDirectoryReader(), externalSearcher.getDirectoryReader());
assertSame(warmedUpReaders.get(0), externalSearcher.getDirectoryReader());
}
}
index(engine, randomInt());
if (randomBoolean()) {
engine.refresh("test", Engine.SearcherScope.INTERNAL, true);
assertThat(warmedUpReaders, hasSize(1));
try (Engine.Searcher internalSearcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)) {
try (Engine.Searcher externalSearcher = engine.acquireSearcher("test", Engine.SearcherScope.EXTERNAL)) {
assertNotSame(internalSearcher.getDirectoryReader(), externalSearcher.getDirectoryReader());
}
}
}
engine.refresh("test");
assertThat(warmedUpReaders, hasSize(2));
try (Engine.Searcher internalSearcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)) {
try (Engine.Searcher externalSearcher = engine.acquireSearcher("test", Engine.SearcherScope.EXTERNAL)) {
assertSame(internalSearcher.getDirectoryReader(), externalSearcher.getDirectoryReader());
assertSame(warmedUpReaders.get(1), externalSearcher.getDirectoryReader());
}
}
}
}
}
}