diff --git a/pom.xml b/pom.xml index fe866cbbcf5..492426d6ace 100644 --- a/pom.xml +++ b/pom.xml @@ -982,7 +982,9 @@ org/elasticsearch/plugins/PluginManager.class org/elasticsearch/bootstrap/Bootstrap.class org/elasticsearch/Version.class - org/elasticsearch/index/percolator/stats/ShardPercolateService$RamEstimator.class + org/apache/lucene/search/XReferenceManager.class + org/apache/lucene/search/XSearcherManager.class + org/elasticsearch/index/percolator/stats/ShardPercolateService$RamEstimator.class org/elasticsearch/common/util/UnsafeUtils.class diff --git a/src/main/java/org/apache/lucene/search/XReferenceManager.java b/src/main/java/org/apache/lucene/search/XReferenceManager.java new file mode 100644 index 00000000000..07fb066fa8c --- /dev/null +++ b/src/main/java/org/apache/lucene/search/XReferenceManager.java @@ -0,0 +1,326 @@ +package org.apache.lucene.search; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.lucene.store.AlreadyClosedException; +import org.apache.lucene.util.Version; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * Utility class to safely share instances of a certain type across multiple + * threads, while periodically refreshing them. This class ensures each + * reference is closed only once all threads have finished using it. It is + * recommended to consult the documentation of {@link org.apache.lucene.search.XReferenceManager} + * implementations for their {@link #maybeRefresh()} semantics. + * + * @param + * the concrete type that will be {@link #acquire() acquired} and + * {@link #release(Object) released}. + * + * @lucene.experimental + */ +public abstract class XReferenceManager implements Closeable { + static { + assert Version.LUCENE_46 == org.elasticsearch.Version.CURRENT.luceneVersion : "Remove this once we are on LUCENE_47 - see LUCENE-5436"; + } + + private static final String REFERENCE_MANAGER_IS_CLOSED_MSG = "this ReferenceManager is closed"; + + protected volatile G current; + + private final Lock refreshLock = new ReentrantLock(); + + private final List refreshListeners = new CopyOnWriteArrayList(); + + private void ensureOpen() { + if (current == null) { + throw new AlreadyClosedException(REFERENCE_MANAGER_IS_CLOSED_MSG); + } + } + + private synchronized void swapReference(G newReference) throws IOException { + ensureOpen(); + final G oldReference = current; + current = newReference; + release(oldReference); + } + + /** + * Decrement reference counting on the given reference. + * @throws java.io.IOException if reference decrement on the given resource failed. + * */ + protected abstract void decRef(G reference) throws IOException; + + /** + * Refresh the given reference if needed. Returns {@code null} if no refresh + * was needed, otherwise a new refreshed reference. + * @throws org.apache.lucene.store.AlreadyClosedException if the reference manager has been {@link #close() closed}. + * @throws java.io.IOException if the refresh operation failed + */ + protected abstract G refreshIfNeeded(G referenceToRefresh) throws IOException; + + /** + * Try to increment reference counting on the given reference. Return true if + * the operation was successful. + * @throws org.apache.lucene.store.AlreadyClosedException if the reference manager has been {@link #close() closed}. + */ + protected abstract boolean tryIncRef(G reference) throws IOException; + + /** + * Obtain the current reference. You must match every call to acquire with one + * call to {@link #release}; it's best to do so in a finally clause, and set + * the reference to {@code null} to prevent accidental usage after it has been + * released. + * @throws org.apache.lucene.store.AlreadyClosedException if the reference manager has been {@link #close() closed}. + */ + public final G acquire() throws IOException { + G ref; + + do { + if ((ref = current) == null) { + throw new AlreadyClosedException(REFERENCE_MANAGER_IS_CLOSED_MSG); + } + if (tryIncRef(ref)) { + return ref; + } + if (getRefCount(ref) == 0 && current == ref) { + assert ref != null; + /* if we can't increment the reader but we are + still the current reference the RM is in a + illegal states since we can't make any progress + anymore. The reference is closed but the RM still + holds on to it as the actual instance. + This can only happen if somebody outside of the RM + decrements the refcount without a corresponding increment + since the RM assigns the new reference before counting down + the reference. */ + throw new IllegalStateException("The managed reference has already closed - this is likely a bug when the reference count is modified outside of the ReferenceManager"); + } + } while (true); + } + + /** + *

+ * Closes this ReferenceManager to prevent future {@link #acquire() acquiring}. A + * reference manager should be closed if the reference to the managed resource + * should be disposed or the application using the {@link org.apache.lucene.search.XReferenceManager} + * is shutting down. The managed resource might not be released immediately, + * if the {@link org.apache.lucene.search.XReferenceManager} user is holding on to a previously + * {@link #acquire() acquired} reference. The resource will be released once + * when the last reference is {@link #release(Object) released}. Those + * references can still be used as if the manager was still active. + *

+ *

+ * Applications should not {@link #acquire() acquire} new references from this + * manager once this method has been called. {@link #acquire() Acquiring} a + * resource on a closed {@link org.apache.lucene.search.XReferenceManager} will throw an + * {@link org.apache.lucene.store.AlreadyClosedException}. + *

+ * + * @throws java.io.IOException + * if the underlying reader of the current reference could not be closed + */ + @Override + public final synchronized void close() throws IOException { + if (current != null) { + // make sure we can call this more than once + // closeable javadoc says: + // if this is already closed then invoking this method has no effect. + swapReference(null); + afterClose(); + } + } + + /** + * Returns the current reference count of the given reference. + */ + protected abstract int getRefCount(G reference); + + /** + * Called after close(), so subclass can free any resources. + * @throws java.io.IOException if the after close operation in a sub-class throws an {@link java.io.IOException} + * */ + protected void afterClose() throws IOException { + } + + private void doMaybeRefresh() throws IOException { + // it's ok to call lock() here (blocking) because we're supposed to get here + // from either maybeRefreh() or maybeRefreshBlocking(), after the lock has + // already been obtained. Doing that protects us from an accidental bug + // where this method will be called outside the scope of refreshLock. + // Per ReentrantLock's javadoc, calling lock() by the same thread more than + // once is ok, as long as unlock() is called a matching number of times. + refreshLock.lock(); + boolean refreshed = false; + try { + final G reference = acquire(); + try { + notifyRefreshListenersBefore(); + G newReference = refreshIfNeeded(reference); + if (newReference != null) { + assert newReference != reference : "refreshIfNeeded should return null if refresh wasn't needed"; + try { + swapReference(newReference); + refreshed = true; + } finally { + if (!refreshed) { + release(newReference); + } + } + } + } finally { + release(reference); + notifyRefreshListenersRefreshed(refreshed); + } + afterMaybeRefresh(); + } finally { + refreshLock.unlock(); + } + } + + /** + * You must call this (or {@link #maybeRefreshBlocking()}), periodically, if + * you want that {@link #acquire()} will return refreshed instances. + * + *

+ * Threads: it's fine for more than one thread to call this at once. + * Only the first thread will attempt the refresh; subsequent threads will see + * that another thread is already handling refresh and will return + * immediately. Note that this means if another thread is already refreshing + * then subsequent threads will return right away without waiting for the + * refresh to complete. + * + *

+ * If this method returns true it means the calling thread either refreshed or + * that there were no changes to refresh. If it returns false it means another + * thread is currently refreshing. + *

+ * @throws java.io.IOException if refreshing the resource causes an {@link java.io.IOException} + * @throws org.apache.lucene.store.AlreadyClosedException if the reference manager has been {@link #close() closed}. + */ + public final boolean maybeRefresh() throws IOException { + ensureOpen(); + + // Ensure only 1 thread does refresh at once; other threads just return immediately: + final boolean doTryRefresh = refreshLock.tryLock(); + if (doTryRefresh) { + try { + doMaybeRefresh(); + } finally { + refreshLock.unlock(); + } + } + + return doTryRefresh; + } + + /** + * You must call this (or {@link #maybeRefresh()}), periodically, if you want + * that {@link #acquire()} will return refreshed instances. + * + *

+ * Threads: unlike {@link #maybeRefresh()}, if another thread is + * currently refreshing, this method blocks until that thread completes. It is + * useful if you want to guarantee that the next call to {@link #acquire()} + * will return a refreshed instance. Otherwise, consider using the + * non-blocking {@link #maybeRefresh()}. + * @throws java.io.IOException if refreshing the resource causes an {@link java.io.IOException} + * @throws org.apache.lucene.store.AlreadyClosedException if the reference manager has been {@link #close() closed}. + */ + public final void maybeRefreshBlocking() throws IOException { + ensureOpen(); + + // Ensure only 1 thread does refresh at once + refreshLock.lock(); + try { + doMaybeRefresh(); + } finally { + refreshLock.unlock(); + } + } + + /** Called after a refresh was attempted, regardless of + * whether a new reference was in fact created. + * @throws java.io.IOException if a low level I/O exception occurs + **/ + protected void afterMaybeRefresh() throws IOException { + } + + /** + * Release the reference previously obtained via {@link #acquire()}. + *

+ * NOTE: it's safe to call this after {@link #close()}. + * @throws java.io.IOException if the release operation on the given resource throws an {@link java.io.IOException} + */ + public final void release(G reference) throws IOException { + assert reference != null; + decRef(reference); + } + + private void notifyRefreshListenersBefore() throws IOException { + for (RefreshListener refreshListener : refreshListeners) { + refreshListener.beforeRefresh(); + } + } + + private void notifyRefreshListenersRefreshed(boolean didRefresh) throws IOException { + for (RefreshListener refreshListener : refreshListeners) { + refreshListener.afterRefresh(didRefresh); + } + } + + /** + * Adds a listener, to be notified when a reference is refreshed/swapped. + */ + public void addListener(RefreshListener listener) { + if (listener == null) { + throw new NullPointerException("Listener cannot be null"); + } + refreshListeners.add(listener); + } + + /** + * Remove a listener added with {@link #addListener(RefreshListener)}. + */ + public void removeListener(RefreshListener listener) { + if (listener == null) { + throw new NullPointerException("Listener cannot be null"); + } + refreshListeners.remove(listener); + } + + /** Use to receive notification when a refresh has + * finished. See {@link #addListener}. */ + public interface RefreshListener { + + /** Called right before a refresh attempt starts. */ + void beforeRefresh() throws IOException; + + /** Called after the attempted refresh; if the refresh + * did open a new reference then didRefresh will be true + * and {@link #acquire()} is guaranteed to return the new + * reference. */ + void afterRefresh(boolean didRefresh) throws IOException; + } +} diff --git a/src/main/java/org/apache/lucene/search/XSearcherManager.java b/src/main/java/org/apache/lucene/search/XSearcherManager.java new file mode 100644 index 00000000000..36c74a08728 --- /dev/null +++ b/src/main/java/org/apache/lucene/search/XSearcherManager.java @@ -0,0 +1,177 @@ +package org.apache.lucene.search; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.IOException; + +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.store.Directory; +import org.apache.lucene.util.Version; + +/** + * Utility class to safely share {@link IndexSearcher} instances across multiple + * threads, while periodically reopening. This class ensures each searcher is + * closed only once all threads have finished using it. + * + *

+ * Use {@link #acquire} to obtain the current searcher, and {@link #release} to + * release it, like this: + * + *

+ * IndexSearcher s = manager.acquire();
+ * try {
+ *   // Do searching, doc retrieval, etc. with s
+ * } finally {
+ *   manager.release(s);
+ * }
+ * // Do not use s after this!
+ * s = null;
+ * 
+ * + *

+ * In addition you should periodically call {@link #maybeRefresh}. While it's + * possible to call this just before running each query, this is discouraged + * since it penalizes the unlucky queries that do the reopen. It's better to use + * a separate background thread, that periodically calls maybeReopen. Finally, + * be sure to call {@link #close} once you are done. + * + * @see SearcherFactory + * + * @lucene.experimental + */ +public final class XSearcherManager extends XReferenceManager { + + static { + assert Version.LUCENE_46 == org.elasticsearch.Version.CURRENT.luceneVersion : "Remove this once we are on LUCENE_47 - see LUCENE-5436"; + } + + private final SearcherFactory searcherFactory; + + /** + * Creates and returns a new XSearcherManager from the given + * {@link IndexWriter}. + * + * @param writer + * the IndexWriter to open the IndexReader from. + * @param applyAllDeletes + * If true, all buffered deletes will be applied (made + * visible) in the {@link IndexSearcher} / {@link DirectoryReader}. + * If false, the deletes may or may not be applied, but + * remain buffered (in IndexWriter) so that they will be applied in + * the future. Applying deletes can be costly, so if your app can + * tolerate deleted documents being returned you might gain some + * performance by passing false. See + * {@link DirectoryReader#openIfChanged(DirectoryReader, IndexWriter, boolean)}. + * @param searcherFactory + * An optional {@link SearcherFactory}. Pass null if you + * don't require the searcher to be warmed before going live or other + * custom behavior. + * + * @throws IOException if there is a low-level I/O error + */ + public XSearcherManager(IndexWriter writer, boolean applyAllDeletes, SearcherFactory searcherFactory) throws IOException { + if (searcherFactory == null) { + searcherFactory = new SearcherFactory(); + } + this.searcherFactory = searcherFactory; + current = getSearcher(searcherFactory, DirectoryReader.open(writer, applyAllDeletes)); + } + + /** + * Creates and returns a new XSearcherManager from the given {@link Directory}. + * @param dir the directory to open the DirectoryReader on. + * @param searcherFactory An optional {@link SearcherFactory}. Pass + * null if you don't require the searcher to be warmed + * before going live or other custom behavior. + * + * @throws IOException if there is a low-level I/O error + */ + public XSearcherManager(Directory dir, SearcherFactory searcherFactory) throws IOException { + if (searcherFactory == null) { + searcherFactory = new SearcherFactory(); + } + this.searcherFactory = searcherFactory; + current = getSearcher(searcherFactory, DirectoryReader.open(dir)); + } + + @Override + protected void decRef(IndexSearcher reference) throws IOException { + reference.getIndexReader().decRef(); + } + + @Override + protected IndexSearcher refreshIfNeeded(IndexSearcher referenceToRefresh) throws IOException { + final IndexReader r = referenceToRefresh.getIndexReader(); + assert r instanceof DirectoryReader: "searcher's IndexReader should be a DirectoryReader, but got " + r; + final IndexReader newReader = DirectoryReader.openIfChanged((DirectoryReader) r); + if (newReader == null) { + return null; + } else { + return getSearcher(searcherFactory, newReader); + } + } + + @Override + protected boolean tryIncRef(IndexSearcher reference) { + return reference.getIndexReader().tryIncRef(); + } + + @Override + protected int getRefCount(IndexSearcher reference) { + return reference.getIndexReader().getRefCount(); + } + + /** + * Returns true if no changes have occured since this searcher + * ie. reader was opened, otherwise false. + * @see DirectoryReader#isCurrent() + */ + public boolean isSearcherCurrent() throws IOException { + final IndexSearcher searcher = acquire(); + try { + final IndexReader r = searcher.getIndexReader(); + assert r instanceof DirectoryReader: "searcher's IndexReader should be a DirectoryReader, but got " + r; + return ((DirectoryReader) r).isCurrent(); + } finally { + release(searcher); + } + } + + /** Expert: creates a searcher from the provided {@link + * IndexReader} using the provided {@link + * SearcherFactory}. NOTE: this decRefs incoming reader + * on throwing an exception. */ + public static IndexSearcher getSearcher(SearcherFactory searcherFactory, IndexReader reader) throws IOException { + boolean success = false; + final IndexSearcher searcher; + try { + searcher = searcherFactory.newSearcher(reader); + if (searcher.getIndexReader() != reader) { + throw new IllegalStateException("SearcherFactory must wrap exactly the provided reader (got " + searcher.getIndexReader() + " but expected " + reader + ")"); + } + success = true; + } finally { + if (!success) { + reader.decRef(); + } + } + return searcher; + } +} diff --git a/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java b/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java index 29ab0e9ce2b..40af7ef31b3 100644 --- a/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java +++ b/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java @@ -22,10 +22,7 @@ package org.elasticsearch.index.engine.internal; import com.google.common.collect.Lists; import org.apache.lucene.index.*; import org.apache.lucene.index.IndexWriter.IndexReaderWarmer; -import org.apache.lucene.search.IndexSearcher; -import org.apache.lucene.search.Query; -import org.apache.lucene.search.SearcherFactory; -import org.apache.lucene.search.SearcherManager; +import org.apache.lucene.search.*; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.LockObtainFailedException; import org.apache.lucene.util.BytesRef; @@ -120,7 +117,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin private volatile IndexWriter indexWriter; private final SearcherFactory searcherFactory = new SearchFactory(); - private volatile SearcherManager searcherManager; + private volatile XSearcherManager searcherManager; private volatile boolean closed = false; @@ -680,7 +677,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin @Override public final Searcher acquireSearcher(String source) throws EngineException { - SearcherManager manager = this.searcherManager; + XSearcherManager manager = this.searcherManager; if (manager == null) { throw new EngineClosedException(shardId); } @@ -693,7 +690,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin } } - protected Searcher newSearcher(String source, IndexSearcher searcher, SearcherManager manager) { + protected Searcher newSearcher(String source, IndexSearcher searcher, XSearcherManager manager) { return new EngineSearcher(source, searcher, manager); } @@ -797,7 +794,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin translog.newTranslog(translogId); } - SearcherManager current = this.searcherManager; + XSearcherManager current = this.searcherManager; this.searcherManager = buildSearchManager(indexWriter); try { IOUtils.close(current); @@ -1458,18 +1455,18 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin } } - private SearcherManager buildSearchManager(IndexWriter indexWriter) throws IOException { - return new SearcherManager(indexWriter, true, searcherFactory); + private XSearcherManager buildSearchManager(IndexWriter indexWriter) throws IOException { + return new XSearcherManager(indexWriter, true, searcherFactory); } static class EngineSearcher implements Searcher { private final String source; private final IndexSearcher searcher; - private final SearcherManager manager; + private final XSearcherManager manager; private final AtomicBoolean released; - private EngineSearcher(String source, IndexSearcher searcher, SearcherManager manager) { + private EngineSearcher(String source, IndexSearcher searcher, XSearcherManager manager) { this.source = source; this.searcher = searcher; this.manager = manager; diff --git a/src/test/java/org/elasticsearch/test/engine/MockInternalEngine.java b/src/test/java/org/elasticsearch/test/engine/MockInternalEngine.java index f8add70d86e..f82cebacf72 100644 --- a/src/test/java/org/elasticsearch/test/engine/MockInternalEngine.java +++ b/src/test/java/org/elasticsearch/test/engine/MockInternalEngine.java @@ -22,7 +22,7 @@ package org.elasticsearch.test.engine; import org.apache.lucene.index.*; import org.apache.lucene.search.AssertingIndexSearcher; import org.apache.lucene.search.IndexSearcher; -import org.apache.lucene.search.SearcherManager; +import org.apache.lucene.search.XSearcherManager; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.inject.Inject; @@ -94,7 +94,7 @@ public final class MockInternalEngine extends InternalEngine implements Engine { } @Override - protected Searcher newSearcher(String source, IndexSearcher searcher, SearcherManager manager) throws EngineException { + protected Searcher newSearcher(String source, IndexSearcher searcher, XSearcherManager manager) throws EngineException { IndexReader reader = searcher.getIndexReader(); IndexReader wrappedReader = reader;