diff --git a/core/src/main/java/org/elasticsearch/index/engine/Engine.java b/core/src/main/java/org/elasticsearch/index/engine/Engine.java index 42805a19b34..99410d9f624 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -36,7 +36,7 @@ import org.apache.lucene.index.SegmentReader; import org.apache.lucene.index.SnapshotDeletionPolicy; import org.apache.lucene.index.Term; import org.apache.lucene.search.IndexSearcher; -import org.apache.lucene.search.SearcherManager; +import org.apache.lucene.search.ReferenceManager; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.Directory; import org.apache.lucene.store.IOContext; @@ -170,7 +170,7 @@ public abstract class Engine implements Closeable { return IndexWriter.SOURCE_MERGE.equals(source); } - protected Searcher newSearcher(String source, IndexSearcher searcher, SearcherManager manager) { + protected Searcher newSearcher(String source, IndexSearcher searcher, ReferenceManager manager) { return new EngineSearcher(source, searcher, manager, store, logger); } @@ -531,7 +531,7 @@ public abstract class Engine implements Closeable { * the searcher is acquired. */ store.incRef(); try { - final SearcherManager manager = getSearcherManager(source, scope); // can never be null + final ReferenceManager manager = getSearcherManager(source, scope); // can never be null /* This might throw NPE but that's fine we will run ensureOpen() * in the catch block and throw the right exception */ final IndexSearcher searcher = manager.acquire(); @@ -585,7 +585,7 @@ public abstract class Engine implements Closeable { /** * Read the last segments info from the commit pointed to by the searcher manager */ - protected static SegmentInfos readLastCommittedSegmentInfos(final SearcherManager sm, final Store store) throws IOException { + protected static SegmentInfos readLastCommittedSegmentInfos(final ReferenceManager sm, final Store store) throws IOException { IndexSearcher searcher = sm.acquire(); try { IndexCommit latestCommit = ((DirectoryReader) searcher.getIndexReader()).getIndexCommit(); @@ -787,13 +787,19 @@ public abstract class Engine implements Closeable { public final boolean refreshNeeded() { if (store.tryIncRef()) { /* - we need to inc the store here since searcherManager.isSearcherCurrent() - acquires a searcher internally and that might keep a file open on the + we need to inc the store here since we acquire a searcher and that might keep a file open on the store. this violates the assumption that all files are closed when the store is closed so we need to make sure we increment it here */ try { - return getSearcherManager("refresh_needed", SearcherScope.EXTERNAL).isSearcherCurrent() == false; + ReferenceManager manager = getSearcherManager("refresh_needed", SearcherScope.EXTERNAL); + final IndexSearcher searcher = manager.acquire(); + try { + final IndexReader r = searcher.getIndexReader(); + return ((DirectoryReader) r).isCurrent() == false; + } finally { + manager.release(searcher); + } } catch (IOException e) { logger.error("failed to access searcher manager", e); failEngine("failed to access searcher manager", e); @@ -1331,7 +1337,7 @@ public abstract class Engine implements Closeable { } } - protected abstract SearcherManager getSearcherManager(String source, SearcherScope scope); + protected abstract ReferenceManager getSearcherManager(String source, SearcherScope scope); /** * Method to close the engine while the write lock is held. diff --git a/core/src/main/java/org/elasticsearch/index/engine/EngineSearcher.java b/core/src/main/java/org/elasticsearch/index/engine/EngineSearcher.java index a53ac1dd415..c72ec543e71 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/EngineSearcher.java +++ b/core/src/main/java/org/elasticsearch/index/engine/EngineSearcher.java @@ -21,6 +21,7 @@ package org.elasticsearch.index.engine; import org.apache.logging.log4j.Logger; import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.ReferenceManager; import org.apache.lucene.search.SearcherManager; import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.index.store.Store; @@ -32,12 +33,12 @@ import java.util.concurrent.atomic.AtomicBoolean; * Searcher for an Engine */ public class EngineSearcher extends Engine.Searcher { - private final SearcherManager manager; + private final ReferenceManager manager; private final AtomicBoolean released = new AtomicBoolean(false); private final Store store; private final Logger logger; - public EngineSearcher(String source, IndexSearcher searcher, SearcherManager manager, Store store, Logger logger) { + public EngineSearcher(String source, IndexSearcher searcher, ReferenceManager manager, Store store, Logger logger) { super(source, searcher); this.manager = manager; this.store = store; diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index cb07cf5e696..6449c979de4 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -48,6 +48,7 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lucene.LoggerInfoStream; @@ -57,7 +58,6 @@ import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver; import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndSeqNo; import org.elasticsearch.common.metrics.CounterMetric; -import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.KeyedLock; import org.elasticsearch.common.util.concurrent.ReleasableLock; @@ -108,7 +108,7 @@ public class InternalEngine extends Engine { private final IndexWriter indexWriter; - private final SearcherManager externalSearcherManager; + private final ExternalSearcherManager externalSearcherManager; private final SearcherManager internalSearcherManager; private final Lock flushLock = new ReentrantLock(); @@ -172,7 +172,7 @@ public class InternalEngine extends Engine { store.incRef(); IndexWriter writer = null; Translog translog = null; - SearcherManager externalSearcherManager = null; + ExternalSearcherManager externalSearcherManager = null; SearcherManager internalSearcherManager = null; EngineMergeScheduler scheduler = null; boolean success = false; @@ -224,8 +224,8 @@ public class InternalEngine extends Engine { throw e; } } - internalSearcherManager = createSearcherManager(new SearcherFactory(), false); - externalSearcherManager = createSearcherManager(new SearchFactory(logger, isClosed, engineConfig), true); + externalSearcherManager = createSearcherManager(new SearchFactory(logger, isClosed, engineConfig)); + internalSearcherManager = externalSearcherManager.internalSearcherManager; this.internalSearcherManager = internalSearcherManager; this.externalSearcherManager = externalSearcherManager; internalSearcherManager.addListener(versionMap); @@ -238,7 +238,7 @@ public class InternalEngine extends Engine { success = true; } finally { if (success == false) { - IOUtils.closeWhileHandlingException(writer, translog, externalSearcherManager, internalSearcherManager, scheduler); + IOUtils.closeWhileHandlingException(writer, translog, internalSearcherManager, externalSearcherManager, scheduler); if (isClosed.get() == false) { // failure we need to dec the store reference store.decRef(); @@ -248,6 +248,75 @@ public class InternalEngine extends Engine { logger.trace("created new InternalEngine"); } + /** + * This reference manager delegates all it's refresh calls to another (internal) SearcherManager + * The main purpose for this is that if we have external refreshes happening we don't issue extra + * refreshes to clear version map memory etc. this can cause excessive segment creation if heavy indexing + * is happening and the refresh interval is low (ie. 1 sec) + * + * This also prevents segment starvation where an internal reader holds on to old segments literally forever + * since no indexing is happening and refreshes are only happening to the external reader manager, while with + * this specialized implementation an external refresh will immediately be reflected on the internal reader + * and old segments can be released in the same way previous version did this (as a side-effect of _refresh) + */ + @SuppressForbidden(reason = "reference counting is required here") + private static final class ExternalSearcherManager extends ReferenceManager { + private final SearcherFactory searcherFactory; + private final SearcherManager internalSearcherManager; + + ExternalSearcherManager(SearcherManager internalSearcherManager, SearcherFactory searcherFactory) throws IOException { + IndexSearcher acquire = internalSearcherManager.acquire(); + try { + IndexReader indexReader = acquire.getIndexReader(); + assert indexReader instanceof ElasticsearchDirectoryReader: + "searcher's IndexReader should be an ElasticsearchDirectoryReader, but got " + indexReader; + indexReader.incRef(); // steal the reader - getSearcher will decrement if it fails + current = SearcherManager.getSearcher(searcherFactory, indexReader, null); + } finally { + internalSearcherManager.release(acquire); + } + this.searcherFactory = searcherFactory; + this.internalSearcherManager = internalSearcherManager; + } + + @Override + protected IndexSearcher refreshIfNeeded(IndexSearcher referenceToRefresh) throws IOException { + // we simply run a blocking refresh on the internal reference manager and then steal it's reader + // it's a save operation since we acquire the reader which incs it's reference but then down the road + // steal it by calling incRef on the "stolen" reader + internalSearcherManager.maybeRefreshBlocking(); + IndexSearcher acquire = internalSearcherManager.acquire(); + final IndexReader previousReader = referenceToRefresh.getIndexReader(); + assert previousReader instanceof ElasticsearchDirectoryReader: + "searcher's IndexReader should be an ElasticsearchDirectoryReader, but got " + previousReader; + try { + final IndexReader newReader = acquire.getIndexReader(); + if (newReader == previousReader) { + // nothing has changed - both ref managers share the same instance so we can use reference equality + return null; + } else { + newReader.incRef(); // steal the reader - getSearcher will decrement if it fails + return SearcherManager.getSearcher(searcherFactory, newReader, previousReader); + } + } finally { + internalSearcherManager.release(acquire); + } + } + + @Override + protected boolean tryIncRef(IndexSearcher reference) { + return reference.getIndexReader().tryIncRef(); + } + + @Override + protected int getRefCount(IndexSearcher reference) { + return reference.getIndexReader().getRefCount(); + } + + @Override + protected void decRef(IndexSearcher reference) throws IOException { reference.getIndexReader().decRef(); } + } + @Override public void restoreLocalCheckpointFromTranslog() throws IOException { try (ReleasableLock ignored = writeLock.acquire()) { @@ -456,18 +525,18 @@ public class InternalEngine extends Engine { return uuid; } - private SearcherManager createSearcherManager(SearcherFactory searcherFactory, boolean readSegmentsInfo) throws EngineException { + private ExternalSearcherManager createSearcherManager(SearchFactory externalSearcherFactory) throws EngineException { boolean success = false; - SearcherManager searcherManager = null; + SearcherManager internalSearcherManager = null; try { try { final DirectoryReader directoryReader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(indexWriter), shardId); - searcherManager = new SearcherManager(directoryReader, searcherFactory); - if (readSegmentsInfo) { - lastCommittedSegmentInfos = readLastCommittedSegmentInfos(searcherManager, store); - } + internalSearcherManager = new SearcherManager(directoryReader, new SearcherFactory()); + lastCommittedSegmentInfos = readLastCommittedSegmentInfos(internalSearcherManager, store); + ExternalSearcherManager externalSearcherManager = new ExternalSearcherManager(internalSearcherManager, + externalSearcherFactory); success = true; - return searcherManager; + return externalSearcherManager; } catch (IOException e) { maybeFailEngine("start", e); try { @@ -479,7 +548,7 @@ public class InternalEngine extends Engine { } } finally { if (success == false) { // release everything we created on a failure - IOUtils.closeWhileHandlingException(searcherManager, indexWriter); + IOUtils.closeWhileHandlingException(internalSearcherManager, indexWriter); } } } @@ -1229,24 +1298,24 @@ public class InternalEngine extends Engine { } final void refresh(String source, SearcherScope scope) throws EngineException { - long bytes = 0; // we obtain a read lock here, since we don't want a flush to happen while we are refreshing // since it flushes the index as well (though, in terms of concurrency, we are allowed to do it) + // both refresh types will result in an internal refresh but only the external will also + // pass the new reader reference to the external reader manager. + + // this will also cause version map ram to be freed hence we always account for it. + final long bytes = indexWriter.ramBytesUsed() + versionMap.ramBytesUsedForRefresh(); + writingBytes.addAndGet(bytes); try (ReleasableLock lock = readLock.acquire()) { ensureOpen(); - bytes = indexWriter.ramBytesUsed(); switch (scope) { case EXTERNAL: // even though we maintain 2 managers we really do the heavy-lifting only once. // the second refresh will only do the extra work we have to do for warming caches etc. - writingBytes.addAndGet(bytes); externalSearcherManager.maybeRefreshBlocking(); // the break here is intentional we never refresh both internal / external together break; case INTERNAL: - final long versionMapBytes = versionMap.ramBytesUsedForRefresh(); - bytes += versionMapBytes; - writingBytes.addAndGet(bytes); internalSearcherManager.maybeRefreshBlocking(); break; default: @@ -1709,7 +1778,7 @@ public class InternalEngine extends Engine { } @Override - protected SearcherManager getSearcherManager(String source, SearcherScope scope) { + protected ReferenceManager getSearcherManager(String source, SearcherScope scope) { switch (scope) { case INTERNAL: return internalSearcherManager; diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index e196c6b4d0b..eb0d4b8afa2 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -3882,7 +3882,7 @@ public class InternalEngineTests extends EngineTestCase { List rightLeaves = ElasticsearchDirectoryReader.unwrap(right.getDirectoryReader()).leaves(); assertEquals(rightLeaves.size(), leftLeaves.size()); for (int i = 0; i < leftLeaves.size(); i++) { - assertSame(leftLeaves.get(i).reader(), rightLeaves.get(0).reader()); + assertSame(leftLeaves.get(i).reader(), rightLeaves.get(i).reader()); } } @@ -3891,7 +3891,7 @@ public class InternalEngineTests extends EngineTestCase { List rightLeaves = ElasticsearchDirectoryReader.unwrap(right.getDirectoryReader()).leaves(); if (rightLeaves.size() == leftLeaves.size()) { for (int i = 0; i < leftLeaves.size(); i++) { - if (leftLeaves.get(i).reader() != rightLeaves.get(0).reader()) { + if (leftLeaves.get(i).reader() != rightLeaves.get(i).reader()) { return; // all is well } } @@ -3919,7 +3919,6 @@ public class InternalEngineTests extends EngineTestCase { assertEquals(0, searchSearcher.reader().numDocs()); assertNotSameReader(getSearcher, searchSearcher); } - engine.refresh("test", Engine.SearcherScope.EXTERNAL); try (Searcher getSearcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL); @@ -3928,6 +3927,36 @@ public class InternalEngineTests extends EngineTestCase { assertEquals(10, searchSearcher.reader().numDocs()); assertSameReader(getSearcher, searchSearcher); } + + // now ensure external refreshes are reflected on the internal reader + final String docId = Integer.toString(10); + final ParsedDocument doc = + testParsedDocument(docId, null, testDocumentWithTextField(), SOURCE, null); + Engine.Index primaryResponse = indexForDoc(doc); + engine.index(primaryResponse); + + engine.refresh("test", Engine.SearcherScope.EXTERNAL); + + try (Searcher getSearcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL); + Searcher searchSearcher = engine.acquireSearcher("test", Engine.SearcherScope.EXTERNAL)){ + assertEquals(11, getSearcher.reader().numDocs()); + assertEquals(11, searchSearcher.reader().numDocs()); + assertSameReader(getSearcher, searchSearcher); + } + + try (Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)){ + engine.refresh("test", Engine.SearcherScope.INTERNAL); + try (Searcher nextSearcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)){ + assertSame(searcher.searcher(), nextSearcher.searcher()); + } + } + + try (Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.EXTERNAL)){ + engine.refresh("test", Engine.SearcherScope.EXTERNAL); + try (Searcher nextSearcher = engine.acquireSearcher("test", Engine.SearcherScope.EXTERNAL)){ + assertSame(searcher.searcher(), nextSearcher.searcher()); + } + } } public void testSeqNoGenerator() throws IOException { diff --git a/test/framework/src/main/java/org/elasticsearch/test/engine/MockEngineSupport.java b/test/framework/src/main/java/org/elasticsearch/test/engine/MockEngineSupport.java index 4c5bd0d3267..144b2be1b02 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/engine/MockEngineSupport.java +++ b/test/framework/src/main/java/org/elasticsearch/test/engine/MockEngineSupport.java @@ -26,6 +26,7 @@ import org.apache.lucene.search.AssertingIndexSearcher; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.QueryCache; import org.apache.lucene.search.QueryCachingPolicy; +import org.apache.lucene.search.ReferenceManager; import org.apache.lucene.search.SearcherManager; import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.ElasticsearchException; @@ -133,7 +134,8 @@ public final class MockEngineSupport { } } - public AssertingIndexSearcher newSearcher(String source, IndexSearcher searcher, SearcherManager manager) throws EngineException { + public AssertingIndexSearcher newSearcher(String source, IndexSearcher searcher, + ReferenceManager manager) throws EngineException { IndexReader reader = searcher.getIndexReader(); IndexReader wrappedReader = reader; assert reader != null; @@ -182,7 +184,8 @@ public final class MockEngineSupport { } - public Engine.Searcher wrapSearcher(String source, Engine.Searcher engineSearcher, IndexSearcher searcher, SearcherManager manager) { + public Engine.Searcher wrapSearcher(String source, Engine.Searcher engineSearcher, IndexSearcher searcher, + ReferenceManager manager) { final AssertingIndexSearcher assertingIndexSearcher = newSearcher(source, searcher, manager); assertingIndexSearcher.setSimilarity(searcher.getSimilarity(true)); // pass the original searcher to the super.newSearcher() method to make sure this is the searcher that will diff --git a/test/framework/src/main/java/org/elasticsearch/test/engine/MockInternalEngine.java b/test/framework/src/main/java/org/elasticsearch/test/engine/MockInternalEngine.java index fe8c4daec8d..92c7b4d9fc0 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/engine/MockInternalEngine.java +++ b/test/framework/src/main/java/org/elasticsearch/test/engine/MockInternalEngine.java @@ -20,6 +20,7 @@ package org.elasticsearch.test.engine; import org.apache.lucene.index.FilterDirectoryReader; import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.ReferenceManager; import org.apache.lucene.search.SearcherManager; import org.elasticsearch.index.engine.EngineConfig; import org.elasticsearch.index.engine.EngineException; @@ -78,7 +79,7 @@ final class MockInternalEngine extends InternalEngine { } @Override - protected Searcher newSearcher(String source, IndexSearcher searcher, SearcherManager manager) throws EngineException { + protected Searcher newSearcher(String source, IndexSearcher searcher, ReferenceManager manager) throws EngineException { final Searcher engineSearcher = super.newSearcher(source, searcher, manager); return support().wrapSearcher(source, engineSearcher, searcher, manager); }