From 21eb9bdf6a1903510512ba43dccace70edcb75c8 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Thu, 12 Oct 2017 17:19:43 +0200 Subject: [PATCH] Use separate searchers for "search visibility" vs "move indexing buffer to disk (#26972) Today, when ES detects it's using too much heap vs the configured indexing buffer (default 10% of JVM heap) it opens a new searcher to force Lucene to move the bytes to disk, clear version map, etc. But this has the unexpected side effect of making newly indexed/deleted documents visible to future searches, which is not nice for users who are trying to prevent that, e.g. #3593. This is also an indirect spinoff from #26802 where we potentially pay a big price on rebuilding caches etc. when updates / realtime-get is used. We are refreshing the internal reader for realtime gets which causes for instance global ords to be rebuild. I think we can gain quite a bit if we'd use a reader that is only used for GETs and not for searches etc. that way we can also solve problems of searchers being refreshed unexpectedly aside of replica recovery / relocation. Closes #15768 Closes #26912 --- .../termvectors/TermVectorsResponse.java | 6 +- .../elasticsearch/index/engine/Engine.java | 36 ++++-- .../index/engine/InternalEngine.java | 114 ++++++++++-------- .../index/engine/LiveVersionMap.java | 22 ---- .../elasticsearch/index/shard/IndexShard.java | 19 ++- .../index/engine/InternalEngineTests.java | 114 ++++++++++++++++-- .../index/shard/IndexShardTests.java | 2 +- .../index/shard/RefreshListenersTests.java | 1 - 8 files changed, 220 insertions(+), 94 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/termvectors/TermVectorsResponse.java b/core/src/main/java/org/elasticsearch/action/termvectors/TermVectorsResponse.java index 7532ade3fa3..21a77c2e0f2 100644 --- a/core/src/main/java/org/elasticsearch/action/termvectors/TermVectorsResponse.java +++ b/core/src/main/java/org/elasticsearch/action/termvectors/TermVectorsResponse.java @@ -305,9 +305,9 @@ public class TermVectorsResponse extends ActionResponse implements ToXContentObj long sumDocFreq = curTerms.getSumDocFreq(); int docCount = curTerms.getDocCount(); long sumTotalTermFrequencies = curTerms.getSumTotalTermFreq(); - if (docCount > 0) { - assert ((sumDocFreq > 0)) : "docCount >= 0 but sumDocFreq ain't!"; - assert ((sumTotalTermFrequencies > 0)) : "docCount >= 0 but sumTotalTermFrequencies ain't!"; + if (docCount >= 0) { + assert ((sumDocFreq >= 0)) : "docCount >= 0 but sumDocFreq ain't!"; + assert ((sumTotalTermFrequencies >= 0)) : "docCount >= 0 but sumTotalTermFrequencies ain't!"; builder.startObject(FieldStrings.FIELD_STATISTICS); builder.field(FieldStrings.SUM_DOC_FREQ, sumDocFreq); builder.field(FieldStrings.DOC_COUNT, docCount); diff --git a/core/src/main/java/org/elasticsearch/index/engine/Engine.java b/core/src/main/java/org/elasticsearch/index/engine/Engine.java index a755044c113..c01dd986e25 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -90,7 +90,7 @@ import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.function.Function; +import java.util.function.BiFunction; public abstract class Engine implements Closeable { @@ -465,8 +465,9 @@ public abstract class Engine implements Closeable { PENDING_OPERATIONS } - protected final GetResult getFromSearcher(Get get, Function searcherFactory) throws EngineException { - final Searcher searcher = searcherFactory.apply("get"); + protected final GetResult getFromSearcher(Get get, BiFunction searcherFactory, + SearcherScope scope) throws EngineException { + final Searcher searcher = searcherFactory.apply("get", scope); final DocIdAndVersion docIdAndVersion; try { docIdAndVersion = VersionsAndSeqNoResolver.loadDocIdAndVersion(searcher.reader(), get.uid()); @@ -494,23 +495,40 @@ public abstract class Engine implements Closeable { } } - public abstract GetResult get(Get get, Function searcherFactory) throws EngineException; + public abstract GetResult get(Get get, BiFunction searcherFactory) throws EngineException; + /** * Returns a new searcher instance. The consumer of this * API is responsible for releasing the returned searcher in a * safe manner, preferably in a try/finally block. * + * @param source the source API or routing that triggers this searcher acquire + * * @see Searcher#close() */ public final Searcher acquireSearcher(String source) throws EngineException { + return acquireSearcher(source, SearcherScope.EXTERNAL); + } + + /** + * Returns a new searcher instance. The consumer of this + * API is responsible for releasing the returned searcher in a + * safe manner, preferably in a try/finally block. + * + * @param source the source API or routing that triggers this searcher acquire + * @param scope the scope of this searcher ie. if the searcher will be used for get or search purposes + * + * @see Searcher#close() + */ + public final Searcher acquireSearcher(String source, SearcherScope scope) throws EngineException { boolean success = false; /* Acquire order here is store -> manager since we need * to make sure that the store is not closed before * the searcher is acquired. */ store.incRef(); try { - final SearcherManager manager = getSearcherManager(); // can never be null + final SearcherManager manager = getSearcherManager(source, scope); // can never be null /* This might throw NPE but that's fine we will run ensureOpen() * in the catch block and throw the right exception */ final IndexSearcher searcher = manager.acquire(); @@ -536,6 +554,10 @@ public abstract class Engine implements Closeable { } } + public enum SearcherScope { + EXTERNAL, INTERNAL + } + /** returns the translog for this engine */ public abstract Translog getTranslog(); @@ -768,7 +790,7 @@ public abstract class Engine implements Closeable { the store is closed so we need to make sure we increment it here */ try { - return getSearcherManager().isSearcherCurrent() == false; + return getSearcherManager("refresh_needed", SearcherScope.EXTERNAL).isSearcherCurrent() == false; } catch (IOException e) { logger.error("failed to access searcher manager", e); failEngine("failed to access searcher manager", e); @@ -1306,7 +1328,7 @@ public abstract class Engine implements Closeable { } } - protected abstract SearcherManager getSearcherManager(); + protected abstract SearcherManager getSearcherManager(String source, SearcherScope scope); /** * Method to close the engine while the write lock is held. diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 3655a2096dd..177fd3239ea 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -93,7 +93,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -import java.util.function.Function; +import java.util.function.BiFunction; import java.util.function.LongSupplier; public class InternalEngine extends Engine { @@ -108,20 +108,18 @@ public class InternalEngine extends Engine { private final IndexWriter indexWriter; - private final SearcherFactory searcherFactory; - private final SearcherManager searcherManager; + private final SearcherManager externalSearcherManager; + private final SearcherManager internalSearcherManager; private final Lock flushLock = new ReentrantLock(); private final ReentrantLock optimizeLock = new ReentrantLock(); // A uid (in the form of BytesRef) to the version map // we use the hashed variant since we iterate over it and check removal and additions on existing keys - private final LiveVersionMap versionMap; + private final LiveVersionMap versionMap = new LiveVersionMap(); private final KeyedLock keyedLock = new KeyedLock<>(); - private final AtomicBoolean versionMapRefreshPending = new AtomicBoolean(); - private volatile SegmentInfos lastCommittedSegmentInfos; private final IndexThrottle throttle; @@ -153,7 +151,6 @@ public class InternalEngine extends Engine { maxUnsafeAutoIdTimestamp.set(Long.MAX_VALUE); } this.uidField = engineConfig.getIndexSettings().isSingleType() ? IdFieldMapper.NAME : UidFieldMapper.NAME; - this.versionMap = new LiveVersionMap(); final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy( engineConfig.getIndexSettings().getTranslogRetentionSize().getBytes(), engineConfig.getIndexSettings().getTranslogRetentionAge().getMillis() @@ -163,7 +160,8 @@ public class InternalEngine extends Engine { store.incRef(); IndexWriter writer = null; Translog translog = null; - SearcherManager manager = null; + SearcherManager externalSearcherManager = null; + SearcherManager internalSearcherManager = null; EngineMergeScheduler scheduler = null; boolean success = false; try { @@ -171,7 +169,6 @@ public class InternalEngine extends Engine { mergeScheduler = scheduler = new EngineMergeScheduler(engineConfig.getShardId(), engineConfig.getIndexSettings()); throttle = new IndexThrottle(); - this.searcherFactory = new SearchFactory(logger, isClosed, engineConfig); try { final SeqNoStats seqNoStats; switch (openMode) { @@ -215,20 +212,21 @@ public class InternalEngine extends Engine { throw e; } } - manager = createSearcherManager(); - this.searcherManager = manager; - this.versionMap.setManager(searcherManager); + internalSearcherManager = createSearcherManager(new SearcherFactory(), false); + externalSearcherManager = createSearcherManager(new SearchFactory(logger, isClosed, engineConfig), true); + this.internalSearcherManager = internalSearcherManager; + this.externalSearcherManager = externalSearcherManager; + internalSearcherManager.addListener(versionMap); assert pendingTranslogRecovery.get() == false : "translog recovery can't be pending before we set it"; // don't allow commits until we are done with recovering pendingTranslogRecovery.set(openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG); for (ReferenceManager.RefreshListener listener: engineConfig.getRefreshListeners()) { - searcherManager.addListener(listener); + this.externalSearcherManager.addListener(listener); } success = true; } finally { if (success == false) { - IOUtils.closeWhileHandlingException(writer, translog, manager, scheduler); - versionMap.clear(); + IOUtils.closeWhileHandlingException(writer, translog, externalSearcherManager, internalSearcherManager, scheduler); if (isClosed.get() == false) { // failure we need to dec the store reference store.decRef(); @@ -345,6 +343,7 @@ public class InternalEngine extends Engine { logger.trace("flushing post recovery from translog. ops recovered [{}]. committed translog id [{}]. current id [{}]", opsRecovered, translogGeneration == null ? null : translogGeneration.translogFileGeneration, translog.currentFileGeneration()); flush(true, true); + refresh("translog_recovery"); } else if (translog.isCurrent(translogGeneration) == false) { commitIndexWriter(indexWriter, translog, lastCommittedSegmentInfos.getUserData().get(Engine.SYNC_COMMIT_ID)); refreshLastCommittedSegmentInfos(); @@ -441,14 +440,16 @@ public class InternalEngine extends Engine { return uuid; } - private SearcherManager createSearcherManager() throws EngineException { + private SearcherManager createSearcherManager(SearcherFactory searcherFactory, boolean readSegmentsInfo) throws EngineException { boolean success = false; SearcherManager searcherManager = null; try { try { final DirectoryReader directoryReader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(indexWriter), shardId); searcherManager = new SearcherManager(directoryReader, searcherFactory); - lastCommittedSegmentInfos = readLastCommittedSegmentInfos(searcherManager, store); + if (readSegmentsInfo) { + lastCommittedSegmentInfos = readLastCommittedSegmentInfos(searcherManager, store); + } success = true; return searcherManager; } catch (IOException e) { @@ -468,10 +469,11 @@ public class InternalEngine extends Engine { } @Override - public GetResult get(Get get, Function searcherFactory) throws EngineException { + public GetResult get(Get get, BiFunction searcherFactory) throws EngineException { assert Objects.equals(get.uid().field(), uidField) : get.uid().field(); try (ReleasableLock ignored = readLock.acquire()) { ensureOpen(); + SearcherScope scope; if (get.realtime()) { VersionValue versionValue = versionMap.getUnderLock(get.uid()); if (versionValue != null) { @@ -482,12 +484,16 @@ public class InternalEngine extends Engine { throw new VersionConflictEngineException(shardId, get.type(), get.id(), get.versionType().explainConflictForReads(versionValue.version, get.version())); } - refresh("realtime_get"); + refresh("realtime_get", SearcherScope.INTERNAL); } + scope = SearcherScope.INTERNAL; + } else { + // we expose what has been externally expose in a point in time snapshot via an explicit refresh + scope = SearcherScope.EXTERNAL; } // no version, get the version from the index, we know that we refresh on flush - return getFromSearcher(get, searcherFactory); + return getFromSearcher(get, searcherFactory, scope); } } @@ -1187,17 +1193,34 @@ public class InternalEngine extends Engine { @Override public void refresh(String source) throws EngineException { + refresh(source, SearcherScope.EXTERNAL); + } + + final void refresh(String source, SearcherScope scope) throws EngineException { // we obtain a read lock here, since we don't want a flush to happen while we are refreshing // since it flushes the index as well (though, in terms of concurrency, we are allowed to do it) try (ReleasableLock lock = readLock.acquire()) { ensureOpen(); - searcherManager.maybeRefreshBlocking(); + switch (scope) { + case EXTERNAL: + // even though we maintain 2 managers we really do the heavy-lifting only once. + // the second refresh will only do the extra work we have to do for warming caches etc. + externalSearcherManager.maybeRefreshBlocking(); + // the break here is intentional we never refresh both internal / external together + break; + case INTERNAL: + internalSearcherManager.maybeRefreshBlocking(); + break; + + default: + throw new IllegalArgumentException("unknown scope: " + scope); + } } catch (AlreadyClosedException e) { failOnTragicEvent(e); throw e; } catch (Exception e) { try { - failEngine("refresh failed", e); + failEngine("refresh failed source[" + source + "]", e); } catch (Exception inner) { e.addSuppressed(inner); } @@ -1208,36 +1231,20 @@ public class InternalEngine extends Engine { // We check for pruning in each delete request, but we also prune here e.g. in case a delete burst comes in and then no more deletes // for a long time: maybePruneDeletedTombstones(); - versionMapRefreshPending.set(false); mergeScheduler.refreshConfig(); } @Override public void writeIndexingBuffer() throws EngineException { - // we obtain a read lock here, since we don't want a flush to happen while we are writing // since it flushes the index as well (though, in terms of concurrency, we are allowed to do it) try (ReleasableLock lock = readLock.acquire()) { ensureOpen(); - - // TODO: it's not great that we secretly tie searcher visibility to "freeing up heap" here... really we should keep two - // searcher managers, one for searching which is only refreshed by the schedule the user requested (refresh_interval, or invoking - // refresh API), and another for version map interactions. See #15768. final long versionMapBytes = versionMap.ramBytesUsedForRefresh(); final long indexingBufferBytes = indexWriter.ramBytesUsed(); - - final boolean useRefresh = versionMapRefreshPending.get() || (indexingBufferBytes / 4 < versionMapBytes); - if (useRefresh) { - // The version map is using > 25% of the indexing buffer, so we do a refresh so the version map also clears - logger.debug("use refresh to write indexing buffer (heap size=[{}]), to also clear version map (heap size=[{}])", - new ByteSizeValue(indexingBufferBytes), new ByteSizeValue(versionMapBytes)); - refresh("write indexing buffer"); - } else { - // Most of our heap is used by the indexing buffer, so we do a cheaper (just writes segments, doesn't open a new searcher) IW.flush: - logger.debug("use IndexWriter.flush to write indexing buffer (heap size=[{}]) since version map is small (heap size=[{}])", - new ByteSizeValue(indexingBufferBytes), new ByteSizeValue(versionMapBytes)); - indexWriter.flush(); - } + logger.debug("use refresh to write indexing buffer (heap size=[{}]), to also clear version map (heap size=[{}])", + new ByteSizeValue(indexingBufferBytes), new ByteSizeValue(versionMapBytes)); + refresh("write indexing buffer", SearcherScope.INTERNAL); } catch (AlreadyClosedException e) { failOnTragicEvent(e); throw e; @@ -1302,10 +1309,11 @@ public class InternalEngine extends Engine { maybeFailEngine("renew sync commit", ex); throw new EngineException(shardId, "failed to renew sync commit", ex); } - if (renewed) { // refresh outside of the write lock - refresh("renew sync commit"); + if (renewed) { + // refresh outside of the write lock + // we have to refresh internal searcher here to ensure we release unreferenced segments. + refresh("renew sync commit", SearcherScope.INTERNAL); } - return renewed; } @@ -1347,7 +1355,7 @@ public class InternalEngine extends Engine { commitIndexWriter(indexWriter, translog, null); logger.trace("finished commit for flush"); // we need to refresh in order to clear older version values - refresh("version_table_flush"); + refresh("version_table_flush", SearcherScope.INTERNAL); translog.trimUnreferencedReaders(); } catch (Exception e) { throw new FlushFailedEngineException(shardId, e); @@ -1651,8 +1659,9 @@ public class InternalEngine extends Engine { assert rwl.isWriteLockedByCurrentThread() || failEngineLock.isHeldByCurrentThread() : "Either the write lock must be held or the engine must be currently be failing itself"; try { this.versionMap.clear(); + internalSearcherManager.removeListener(versionMap); try { - IOUtils.close(searcherManager); + IOUtils.close(externalSearcherManager, internalSearcherManager); } catch (Exception e) { logger.warn("Failed to close SearcherManager", e); } @@ -1684,8 +1693,15 @@ public class InternalEngine extends Engine { } @Override - protected SearcherManager getSearcherManager() { - return searcherManager; + protected SearcherManager getSearcherManager(String source, SearcherScope scope) { + switch (scope) { + case INTERNAL: + return internalSearcherManager; + case EXTERNAL: + return externalSearcherManager; + default: + throw new IllegalStateException("unknown scope: " + scope); + } } private Releasable acquireLock(BytesRef uid) { @@ -1698,7 +1714,7 @@ public class InternalEngine extends Engine { private long loadCurrentVersionFromIndex(Term uid) throws IOException { assert incrementIndexVersionLookup(); - try (Searcher searcher = acquireSearcher("load_version")) { + try (Searcher searcher = acquireSearcher("load_version", SearcherScope.INTERNAL)) { return VersionsAndSeqNoResolver.loadVersion(searcher.reader(), uid); } } diff --git a/core/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java b/core/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java index 9ee4bd43c21..7396c3143c6 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java +++ b/core/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java @@ -59,8 +59,6 @@ class LiveVersionMap implements ReferenceManager.RefreshListener, Accountable { private volatile Maps maps = new Maps(); - private ReferenceManager mgr; - /** Bytes consumed for each BytesRef UID: * In this base value, we account for the {@link BytesRef} object itself as * well as the header of the byte[] array it holds, and some lost bytes due @@ -98,21 +96,6 @@ class LiveVersionMap implements ReferenceManager.RefreshListener, Accountable { /** Tracks bytes used by tombstones (deletes) */ final AtomicLong ramBytesUsedTombstones = new AtomicLong(); - /** Sync'd because we replace old mgr. */ - synchronized void setManager(ReferenceManager newMgr) { - if (mgr != null) { - mgr.removeListener(this); - } - mgr = newMgr; - - // In case InternalEngine closes & opens a new IndexWriter/SearcherManager, all deletes are made visible, so we clear old and - // current here. This is safe because caller holds writeLock here (so no concurrent adds/deletes can be happeninge): - maps = new Maps(); - - // So we are notified when reopen starts and finishes - mgr.addListener(this); - } - @Override public void beforeRefresh() throws IOException { // Start sending all updates after this point to the new @@ -249,11 +232,6 @@ class LiveVersionMap implements ReferenceManager.RefreshListener, Accountable { // and this will lead to an assert trip. Presumably it's fine if our ramBytesUsedTombstones is non-zero after clear since the index // is being closed: //ramBytesUsedTombstones.set(0); - - if (mgr != null) { - mgr.removeListener(this); - mgr = null; - } } @Override diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 9f0f6b84b45..16c9ab3964a 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1005,6 +1005,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl } final long time = System.nanoTime(); final Engine.CommitId commitId = engine.flush(force, waitIfOngoing); + engine.refresh("flush"); // TODO this is technically wrong we should remove this in 7.0 flushMetric.inc(System.nanoTime() - time); return commitId; } @@ -1032,8 +1033,12 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl if (logger.isTraceEnabled()) { logger.trace("force merge with {}", forceMerge); } - getEngine().forceMerge(forceMerge.flush(), forceMerge.maxNumSegments(), + Engine engine = getEngine(); + engine.forceMerge(forceMerge.flush(), forceMerge.maxNumSegments(), forceMerge.onlyExpungeDeletes(), false, false); + if (forceMerge.flush()) { + engine.refresh("force_merge"); // TODO this is technically wrong we should remove this in 7.0 + } } /** @@ -1046,9 +1051,12 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl } org.apache.lucene.util.Version previousVersion = minimumCompatibleVersion(); // we just want to upgrade the segments, not actually forge merge to a single segment - getEngine().forceMerge(true, // we need to flush at the end to make sure the upgrade is durable + final Engine engine = getEngine(); + engine.forceMerge(true, // we need to flush at the end to make sure the upgrade is durable Integer.MAX_VALUE, // we just want to upgrade the segments, not actually optimize to a single segment false, true, upgrade.upgradeOnlyAncientSegments()); + engine.refresh("upgrade"); // TODO this is technically wrong we should remove this in 7.0 + org.apache.lucene.util.Version version = minimumCompatibleVersion(); if (logger.isTraceEnabled()) { logger.trace("upgraded segments for {} from version {} to version {}", shardId, previousVersion, version); @@ -1127,11 +1135,14 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl // fail the engine. This will cause this shard to also be removed from the node's index service. getEngine().failEngine(reason, e); } - public Engine.Searcher acquireSearcher(String source) { + return acquireSearcher(source, Engine.SearcherScope.EXTERNAL); + } + + private Engine.Searcher acquireSearcher(String source, Engine.SearcherScope scope) { readAllowed(); final Engine engine = getEngine(); - final Engine.Searcher searcher = engine.acquireSearcher(source); + final Engine.Searcher searcher = engine.acquireSearcher(source, scope); boolean success = false; try { final Engine.Searcher wrappedSearcher = searcherWrapper == null ? searcher : searcherWrapper.wrap(searcher); diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 5971cd38774..c15474af866 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -86,6 +86,7 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.lucene.Lucene; +import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver; import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndSeqNo; @@ -942,7 +943,7 @@ public class InternalEngineTests extends ESTestCase { engine.index(indexForDoc(doc)); final AtomicReference latestGetResult = new AtomicReference<>(); - final Function searcherFactory = engine::acquireSearcher; + final BiFunction searcherFactory = engine::acquireSearcher; latestGetResult.set(engine.get(newGet(true, doc), searcherFactory)); final AtomicBoolean flushFinished = new AtomicBoolean(false); final CyclicBarrier barrier = new CyclicBarrier(2); @@ -977,7 +978,7 @@ public class InternalEngineTests extends ESTestCase { MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(0)); searchResult.close(); - final Function searcherFactory = engine::acquireSearcher; + final BiFunction searcherFactory = engine::acquireSearcher; // create a document Document document = testDocumentWithTextField(); @@ -1002,6 +1003,12 @@ public class InternalEngineTests extends ESTestCase { assertThat(getResult.docIdAndVersion(), notNullValue()); getResult.release(); + // but not real time is not yet visible + getResult = engine.get(newGet(false, doc), searcherFactory); + assertThat(getResult.exists(), equalTo(false)); + getResult.release(); + + // refresh and it should be there engine.refresh("test"); @@ -1237,6 +1244,7 @@ public class InternalEngineTests extends ESTestCase { assertTrue(engine.tryRenewSyncCommit()); assertEquals(1, engine.segments(false).size()); } else { + engine.refresh("test"); assertBusy(() -> assertEquals(1, engine.segments(false).size())); } assertEquals(store.readLastCommittedSegmentsInfo().getUserData().get(Engine.SYNC_COMMIT_ID), syncId); @@ -1311,6 +1319,38 @@ public class InternalEngineTests extends ESTestCase { assertThat(indexResult.getVersion(), equalTo(1L)); } + /** + * simulates what an upsert / update API does + */ + public void testVersionedUpdate() throws IOException { + final BiFunction searcherFactory = engine::acquireSearcher; + + ParsedDocument doc = testParsedDocument("1", null, testDocument(), B_1, null); + Engine.Index create = new Engine.Index(newUid(doc), doc, Versions.MATCH_DELETED); + Engine.IndexResult indexResult = engine.index(create); + assertThat(indexResult.getVersion(), equalTo(1L)); + try (Engine.GetResult get = engine.get(new Engine.Get(true, doc.type(), doc.id(), create.uid()), searcherFactory)) { + assertEquals(1, get.version()); + } + + Engine.Index update_1 = new Engine.Index(newUid(doc), doc, 1); + Engine.IndexResult update_1_result = engine.index(update_1); + assertThat(update_1_result.getVersion(), equalTo(2L)); + + try (Engine.GetResult get = engine.get(new Engine.Get(true, doc.type(), doc.id(), create.uid()), searcherFactory)) { + assertEquals(2, get.version()); + } + + Engine.Index update_2 = new Engine.Index(newUid(doc), doc, 2); + Engine.IndexResult update_2_result = engine.index(update_2); + assertThat(update_2_result.getVersion(), equalTo(3L)); + + try (Engine.GetResult get = engine.get(new Engine.Get(true, doc.type(), doc.id(), create.uid()), searcherFactory)) { + assertEquals(3, get.version()); + } + + } + public void testVersioningNewIndex() throws IOException { ParsedDocument doc = testParsedDocument("1", null, testDocument(), B_1, null); Engine.Index index = indexForDoc(doc); @@ -1337,12 +1377,14 @@ public class InternalEngineTests extends ESTestCase { assertEquals(numDocs, test.reader().numDocs()); } engine.forceMerge(true, 1, false, false, false); + engine.refresh("test"); assertEquals(engine.segments(true).size(), 1); ParsedDocument doc = testParsedDocument(Integer.toString(0), null, testDocument(), B_1, null); Engine.Index index = indexForDoc(doc); engine.delete(new Engine.Delete(index.type(), index.id(), index.uid())); engine.forceMerge(true, 10, true, false, false); //expunge deletes + engine.refresh("test"); assertEquals(engine.segments(true).size(), 1); try (Engine.Searcher test = engine.acquireSearcher("test")) { @@ -1354,7 +1396,7 @@ public class InternalEngineTests extends ESTestCase { index = indexForDoc(doc); engine.delete(new Engine.Delete(index.type(), index.id(), index.uid())); engine.forceMerge(true, 10, false, false, false); //expunge deletes - + engine.refresh("test"); assertEquals(engine.segments(true).size(), 1); try (Engine.Searcher test = engine.acquireSearcher("test")) { assertEquals(numDocs - 2, test.reader().numDocs()); @@ -1561,6 +1603,7 @@ public class InternalEngineTests extends ESTestCase { } if (randomBoolean()) { engine.flush(); + engine.refresh("test"); } firstOp = false; } @@ -1716,11 +1759,12 @@ public class InternalEngineTests extends ESTestCase { } if (randomBoolean()) { engine.flush(); + engine.refresh("test"); } if (rarely()) { // simulate GC deletes - engine.refresh("gc_simulation"); + engine.refresh("gc_simulation", Engine.SearcherScope.INTERNAL); engine.clearDeletedTombstones(); if (docDeleted) { lastOpVersion = Versions.NOT_FOUND; @@ -1805,6 +1849,7 @@ public class InternalEngineTests extends ESTestCase { } if (randomBoolean()) { engine.flush(); + engine.refresh("test"); } } @@ -1884,7 +1929,7 @@ public class InternalEngineTests extends ESTestCase { ParsedDocument doc = testParsedDocument("1", null, testDocument(), bytesArray(""), null); final Term uidTerm = newUid(doc); engine.index(indexForDoc(doc)); - final Function searcherFactory = engine::acquireSearcher; + final BiFunction searcherFactory = engine::acquireSearcher; for (int i = 0; i < thread.length; i++) { thread[i] = new Thread(() -> { startGun.countDown(); @@ -2314,7 +2359,7 @@ public class InternalEngineTests extends ESTestCase { Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), newMergePolicy(), null))) { engine.config().setEnableGcDeletes(false); - final Function searcherFactory = engine::acquireSearcher; + final BiFunction searcherFactory = engine::acquireSearcher; // Add document Document document = testDocument(); @@ -2644,6 +2689,7 @@ public class InternalEngineTests extends ESTestCase { assertThat(indexResult.getVersion(), equalTo(1L)); if (flush) { engine.flush(); + engine.refresh("test"); } doc = testParsedDocument(Integer.toString(randomId), null, testDocument(), new BytesArray("{}"), null); @@ -3847,7 +3893,7 @@ public class InternalEngineTests extends ESTestCase { document.add(new Field(SourceFieldMapper.NAME, BytesReference.toBytes(B_1), SourceFieldMapper.Defaults.FIELD_TYPE)); final ParsedDocument doc = testParsedDocument("1", null, document, B_1, null); final Term uid = newUid(doc); - final Function searcherFactory = engine::acquireSearcher; + final BiFunction searcherFactory = engine::acquireSearcher; for (int i = 0; i < numberOfOperations; i++) { if (randomBoolean()) { final Engine.Index index = new Engine.Index( @@ -4203,4 +4249,58 @@ public class InternalEngineTests extends ESTestCase { IOUtils.close(recoveringEngine); } } + + + public void assertSameReader(Searcher left, Searcher right) { + List leftLeaves = ElasticsearchDirectoryReader.unwrap(left.getDirectoryReader()).leaves(); + List rightLeaves = ElasticsearchDirectoryReader.unwrap(right.getDirectoryReader()).leaves(); + assertEquals(rightLeaves.size(), leftLeaves.size()); + for (int i = 0; i < leftLeaves.size(); i++) { + assertSame(leftLeaves.get(i).reader(), rightLeaves.get(0).reader()); + } + } + + public void assertNotSameReader(Searcher left, Searcher right) { + List leftLeaves = ElasticsearchDirectoryReader.unwrap(left.getDirectoryReader()).leaves(); + List rightLeaves = ElasticsearchDirectoryReader.unwrap(right.getDirectoryReader()).leaves(); + if (rightLeaves.size() == leftLeaves.size()) { + for (int i = 0; i < leftLeaves.size(); i++) { + if (leftLeaves.get(i).reader() != rightLeaves.get(0).reader()) { + return; // all is well + } + } + fail("readers are same"); + } + } + + public void testRefreshScopedSearcher() throws IOException { + try (Searcher getSearcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL); + Searcher searchSearcher = engine.acquireSearcher("test", Engine.SearcherScope.EXTERNAL)){ + assertSameReader(getSearcher, searchSearcher); + } + for (int i = 0; i < 10; i++) { + final String docId = Integer.toString(i); + final ParsedDocument doc = + testParsedDocument(docId, null, testDocumentWithTextField(), SOURCE, null); + Engine.Index primaryResponse = indexForDoc(doc); + engine.index(primaryResponse); + } + assertTrue(engine.refreshNeeded()); + engine.refresh("test", Engine.SearcherScope.INTERNAL); + try (Searcher getSearcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL); + Searcher searchSearcher = engine.acquireSearcher("test", Engine.SearcherScope.EXTERNAL)){ + assertEquals(10, getSearcher.reader().numDocs()); + assertEquals(0, searchSearcher.reader().numDocs()); + assertNotSameReader(getSearcher, searchSearcher); + } + + engine.refresh("test", Engine.SearcherScope.EXTERNAL); + + try (Searcher getSearcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL); + Searcher searchSearcher = engine.acquireSearcher("test", Engine.SearcherScope.EXTERNAL)){ + assertEquals(10, getSearcher.reader().numDocs()); + assertEquals(10, searchSearcher.reader().numDocs()); + assertSameReader(getSearcher, searchSearcher); + } + } } diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index a01f0230f22..fc8c597e3e6 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -1160,7 +1160,7 @@ public class IndexShardTests extends IndexShardTestCase { indexDoc(shard, "test", "test"); try (Engine.GetResult ignored = shard.get(new Engine.Get(true, "test", "test", new Term(IdFieldMapper.NAME, Uid.encodeId("test"))))) { - assertThat(shard.refreshStats().getTotal(), equalTo(refreshCount + 1)); + assertThat(shard.refreshStats().getTotal(), equalTo(refreshCount)); } closeShards(shard); } diff --git a/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java b/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java index 1f24d0b079d..da90f8023d2 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java @@ -270,7 +270,6 @@ public class RefreshListenersTests extends ESTestCase { * Uses a bunch of threads to index, wait for refresh, and non-realtime get documents to validate that they are visible after waiting * regardless of what crazy sequence of events causes the refresh listener to fire. */ - @TestLogging("_root:debug,org.elasticsearch.index.engine.Engine.DW:trace") public void testLotsOfThreads() throws Exception { int threadCount = between(3, 10); maxListeners = between(1, threadCount * 2);