Use patched version of ReferenceManager to prevent infinite loop in

RefrenceManager#accquire()

See LUCENE-5436
This commit is contained in:
Simon Willnauer 2014-02-06 21:05:53 +01:00
parent aab2c7a444
commit 0fb8d982be
5 changed files with 517 additions and 15 deletions

View File

@ -982,7 +982,9 @@
<exclude>org/elasticsearch/plugins/PluginManager.class</exclude> <exclude>org/elasticsearch/plugins/PluginManager.class</exclude>
<exclude>org/elasticsearch/bootstrap/Bootstrap.class</exclude> <exclude>org/elasticsearch/bootstrap/Bootstrap.class</exclude>
<exclude>org/elasticsearch/Version.class</exclude> <exclude>org/elasticsearch/Version.class</exclude>
<exclude>org/elasticsearch/index/percolator/stats/ShardPercolateService$RamEstimator.class</exclude> <exclude>org/apache/lucene/search/XReferenceManager.class</exclude>
<exclude>org/apache/lucene/search/XSearcherManager.class</exclude>
<exclude>org/elasticsearch/index/percolator/stats/ShardPercolateService$RamEstimator.class</exclude>
<!-- end excludes for valid system-out --> <!-- end excludes for valid system-out -->
<!-- start excludes for Unsafe --> <!-- start excludes for Unsafe -->
<exclude>org/elasticsearch/common/util/UnsafeUtils.class</exclude> <exclude>org/elasticsearch/common/util/UnsafeUtils.class</exclude>

View File

@ -0,0 +1,326 @@
package org.apache.lucene.search;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.util.Version;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* Utility class to safely share instances of a certain type across multiple
* threads, while periodically refreshing them. This class ensures each
* reference is closed only once all threads have finished using it. It is
* recommended to consult the documentation of {@link org.apache.lucene.search.XReferenceManager}
* implementations for their {@link #maybeRefresh()} semantics.
*
* @param <G>
* the concrete type that will be {@link #acquire() acquired} and
* {@link #release(Object) released}.
*
* @lucene.experimental
*/
public abstract class XReferenceManager<G> implements Closeable {
static {
assert Version.LUCENE_46 == org.elasticsearch.Version.CURRENT.luceneVersion : "Remove this once we are on LUCENE_47 - see LUCENE-5436";
}
private static final String REFERENCE_MANAGER_IS_CLOSED_MSG = "this ReferenceManager is closed";
protected volatile G current;
private final Lock refreshLock = new ReentrantLock();
private final List<RefreshListener> refreshListeners = new CopyOnWriteArrayList<RefreshListener>();
private void ensureOpen() {
if (current == null) {
throw new AlreadyClosedException(REFERENCE_MANAGER_IS_CLOSED_MSG);
}
}
private synchronized void swapReference(G newReference) throws IOException {
ensureOpen();
final G oldReference = current;
current = newReference;
release(oldReference);
}
/**
* Decrement reference counting on the given reference.
* @throws java.io.IOException if reference decrement on the given resource failed.
* */
protected abstract void decRef(G reference) throws IOException;
/**
* Refresh the given reference if needed. Returns {@code null} if no refresh
* was needed, otherwise a new refreshed reference.
* @throws org.apache.lucene.store.AlreadyClosedException if the reference manager has been {@link #close() closed}.
* @throws java.io.IOException if the refresh operation failed
*/
protected abstract G refreshIfNeeded(G referenceToRefresh) throws IOException;
/**
* Try to increment reference counting on the given reference. Return true if
* the operation was successful.
* @throws org.apache.lucene.store.AlreadyClosedException if the reference manager has been {@link #close() closed}.
*/
protected abstract boolean tryIncRef(G reference) throws IOException;
/**
* Obtain the current reference. You must match every call to acquire with one
* call to {@link #release}; it's best to do so in a finally clause, and set
* the reference to {@code null} to prevent accidental usage after it has been
* released.
* @throws org.apache.lucene.store.AlreadyClosedException if the reference manager has been {@link #close() closed}.
*/
public final G acquire() throws IOException {
G ref;
do {
if ((ref = current) == null) {
throw new AlreadyClosedException(REFERENCE_MANAGER_IS_CLOSED_MSG);
}
if (tryIncRef(ref)) {
return ref;
}
if (getRefCount(ref) == 0 && current == ref) {
assert ref != null;
/* if we can't increment the reader but we are
still the current reference the RM is in a
illegal states since we can't make any progress
anymore. The reference is closed but the RM still
holds on to it as the actual instance.
This can only happen if somebody outside of the RM
decrements the refcount without a corresponding increment
since the RM assigns the new reference before counting down
the reference. */
throw new IllegalStateException("The managed reference has already closed - this is likely a bug when the reference count is modified outside of the ReferenceManager");
}
} while (true);
}
/**
* <p>
* Closes this ReferenceManager to prevent future {@link #acquire() acquiring}. A
* reference manager should be closed if the reference to the managed resource
* should be disposed or the application using the {@link org.apache.lucene.search.XReferenceManager}
* is shutting down. The managed resource might not be released immediately,
* if the {@link org.apache.lucene.search.XReferenceManager} user is holding on to a previously
* {@link #acquire() acquired} reference. The resource will be released once
* when the last reference is {@link #release(Object) released}. Those
* references can still be used as if the manager was still active.
* </p>
* <p>
* Applications should not {@link #acquire() acquire} new references from this
* manager once this method has been called. {@link #acquire() Acquiring} a
* resource on a closed {@link org.apache.lucene.search.XReferenceManager} will throw an
* {@link org.apache.lucene.store.AlreadyClosedException}.
* </p>
*
* @throws java.io.IOException
* if the underlying reader of the current reference could not be closed
*/
@Override
public final synchronized void close() throws IOException {
if (current != null) {
// make sure we can call this more than once
// closeable javadoc says:
// if this is already closed then invoking this method has no effect.
swapReference(null);
afterClose();
}
}
/**
* Returns the current reference count of the given reference.
*/
protected abstract int getRefCount(G reference);
/**
* Called after close(), so subclass can free any resources.
* @throws java.io.IOException if the after close operation in a sub-class throws an {@link java.io.IOException}
* */
protected void afterClose() throws IOException {
}
private void doMaybeRefresh() throws IOException {
// it's ok to call lock() here (blocking) because we're supposed to get here
// from either maybeRefreh() or maybeRefreshBlocking(), after the lock has
// already been obtained. Doing that protects us from an accidental bug
// where this method will be called outside the scope of refreshLock.
// Per ReentrantLock's javadoc, calling lock() by the same thread more than
// once is ok, as long as unlock() is called a matching number of times.
refreshLock.lock();
boolean refreshed = false;
try {
final G reference = acquire();
try {
notifyRefreshListenersBefore();
G newReference = refreshIfNeeded(reference);
if (newReference != null) {
assert newReference != reference : "refreshIfNeeded should return null if refresh wasn't needed";
try {
swapReference(newReference);
refreshed = true;
} finally {
if (!refreshed) {
release(newReference);
}
}
}
} finally {
release(reference);
notifyRefreshListenersRefreshed(refreshed);
}
afterMaybeRefresh();
} finally {
refreshLock.unlock();
}
}
/**
* You must call this (or {@link #maybeRefreshBlocking()}), periodically, if
* you want that {@link #acquire()} will return refreshed instances.
*
* <p>
* <b>Threads</b>: it's fine for more than one thread to call this at once.
* Only the first thread will attempt the refresh; subsequent threads will see
* that another thread is already handling refresh and will return
* immediately. Note that this means if another thread is already refreshing
* then subsequent threads will return right away without waiting for the
* refresh to complete.
*
* <p>
* If this method returns true it means the calling thread either refreshed or
* that there were no changes to refresh. If it returns false it means another
* thread is currently refreshing.
* </p>
* @throws java.io.IOException if refreshing the resource causes an {@link java.io.IOException}
* @throws org.apache.lucene.store.AlreadyClosedException if the reference manager has been {@link #close() closed}.
*/
public final boolean maybeRefresh() throws IOException {
ensureOpen();
// Ensure only 1 thread does refresh at once; other threads just return immediately:
final boolean doTryRefresh = refreshLock.tryLock();
if (doTryRefresh) {
try {
doMaybeRefresh();
} finally {
refreshLock.unlock();
}
}
return doTryRefresh;
}
/**
* You must call this (or {@link #maybeRefresh()}), periodically, if you want
* that {@link #acquire()} will return refreshed instances.
*
* <p>
* <b>Threads</b>: unlike {@link #maybeRefresh()}, if another thread is
* currently refreshing, this method blocks until that thread completes. It is
* useful if you want to guarantee that the next call to {@link #acquire()}
* will return a refreshed instance. Otherwise, consider using the
* non-blocking {@link #maybeRefresh()}.
* @throws java.io.IOException if refreshing the resource causes an {@link java.io.IOException}
* @throws org.apache.lucene.store.AlreadyClosedException if the reference manager has been {@link #close() closed}.
*/
public final void maybeRefreshBlocking() throws IOException {
ensureOpen();
// Ensure only 1 thread does refresh at once
refreshLock.lock();
try {
doMaybeRefresh();
} finally {
refreshLock.unlock();
}
}
/** Called after a refresh was attempted, regardless of
* whether a new reference was in fact created.
* @throws java.io.IOException if a low level I/O exception occurs
**/
protected void afterMaybeRefresh() throws IOException {
}
/**
* Release the reference previously obtained via {@link #acquire()}.
* <p>
* <b>NOTE:</b> it's safe to call this after {@link #close()}.
* @throws java.io.IOException if the release operation on the given resource throws an {@link java.io.IOException}
*/
public final void release(G reference) throws IOException {
assert reference != null;
decRef(reference);
}
private void notifyRefreshListenersBefore() throws IOException {
for (RefreshListener refreshListener : refreshListeners) {
refreshListener.beforeRefresh();
}
}
private void notifyRefreshListenersRefreshed(boolean didRefresh) throws IOException {
for (RefreshListener refreshListener : refreshListeners) {
refreshListener.afterRefresh(didRefresh);
}
}
/**
* Adds a listener, to be notified when a reference is refreshed/swapped.
*/
public void addListener(RefreshListener listener) {
if (listener == null) {
throw new NullPointerException("Listener cannot be null");
}
refreshListeners.add(listener);
}
/**
* Remove a listener added with {@link #addListener(RefreshListener)}.
*/
public void removeListener(RefreshListener listener) {
if (listener == null) {
throw new NullPointerException("Listener cannot be null");
}
refreshListeners.remove(listener);
}
/** Use to receive notification when a refresh has
* finished. See {@link #addListener}. */
public interface RefreshListener {
/** Called right before a refresh attempt starts. */
void beforeRefresh() throws IOException;
/** Called after the attempted refresh; if the refresh
* did open a new reference then didRefresh will be true
* and {@link #acquire()} is guaranteed to return the new
* reference. */
void afterRefresh(boolean didRefresh) throws IOException;
}
}

View File

@ -0,0 +1,177 @@
package org.apache.lucene.search;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.IOException;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.Version;
/**
* Utility class to safely share {@link IndexSearcher} instances across multiple
* threads, while periodically reopening. This class ensures each searcher is
* closed only once all threads have finished using it.
*
* <p>
* Use {@link #acquire} to obtain the current searcher, and {@link #release} to
* release it, like this:
*
* <pre class="prettyprint">
* IndexSearcher s = manager.acquire();
* try {
* // Do searching, doc retrieval, etc. with s
* } finally {
* manager.release(s);
* }
* // Do not use s after this!
* s = null;
* </pre>
*
* <p>
* In addition you should periodically call {@link #maybeRefresh}. While it's
* possible to call this just before running each query, this is discouraged
* since it penalizes the unlucky queries that do the reopen. It's better to use
* a separate background thread, that periodically calls maybeReopen. Finally,
* be sure to call {@link #close} once you are done.
*
* @see SearcherFactory
*
* @lucene.experimental
*/
public final class XSearcherManager extends XReferenceManager<IndexSearcher> {
static {
assert Version.LUCENE_46 == org.elasticsearch.Version.CURRENT.luceneVersion : "Remove this once we are on LUCENE_47 - see LUCENE-5436";
}
private final SearcherFactory searcherFactory;
/**
* Creates and returns a new XSearcherManager from the given
* {@link IndexWriter}.
*
* @param writer
* the IndexWriter to open the IndexReader from.
* @param applyAllDeletes
* If <code>true</code>, all buffered deletes will be applied (made
* visible) in the {@link IndexSearcher} / {@link DirectoryReader}.
* If <code>false</code>, the deletes may or may not be applied, but
* remain buffered (in IndexWriter) so that they will be applied in
* the future. Applying deletes can be costly, so if your app can
* tolerate deleted documents being returned you might gain some
* performance by passing <code>false</code>. See
* {@link DirectoryReader#openIfChanged(DirectoryReader, IndexWriter, boolean)}.
* @param searcherFactory
* An optional {@link SearcherFactory}. Pass <code>null</code> if you
* don't require the searcher to be warmed before going live or other
* custom behavior.
*
* @throws IOException if there is a low-level I/O error
*/
public XSearcherManager(IndexWriter writer, boolean applyAllDeletes, SearcherFactory searcherFactory) throws IOException {
if (searcherFactory == null) {
searcherFactory = new SearcherFactory();
}
this.searcherFactory = searcherFactory;
current = getSearcher(searcherFactory, DirectoryReader.open(writer, applyAllDeletes));
}
/**
* Creates and returns a new XSearcherManager from the given {@link Directory}.
* @param dir the directory to open the DirectoryReader on.
* @param searcherFactory An optional {@link SearcherFactory}. Pass
* <code>null</code> if you don't require the searcher to be warmed
* before going live or other custom behavior.
*
* @throws IOException if there is a low-level I/O error
*/
public XSearcherManager(Directory dir, SearcherFactory searcherFactory) throws IOException {
if (searcherFactory == null) {
searcherFactory = new SearcherFactory();
}
this.searcherFactory = searcherFactory;
current = getSearcher(searcherFactory, DirectoryReader.open(dir));
}
@Override
protected void decRef(IndexSearcher reference) throws IOException {
reference.getIndexReader().decRef();
}
@Override
protected IndexSearcher refreshIfNeeded(IndexSearcher referenceToRefresh) throws IOException {
final IndexReader r = referenceToRefresh.getIndexReader();
assert r instanceof DirectoryReader: "searcher's IndexReader should be a DirectoryReader, but got " + r;
final IndexReader newReader = DirectoryReader.openIfChanged((DirectoryReader) r);
if (newReader == null) {
return null;
} else {
return getSearcher(searcherFactory, newReader);
}
}
@Override
protected boolean tryIncRef(IndexSearcher reference) {
return reference.getIndexReader().tryIncRef();
}
@Override
protected int getRefCount(IndexSearcher reference) {
return reference.getIndexReader().getRefCount();
}
/**
* Returns <code>true</code> if no changes have occured since this searcher
* ie. reader was opened, otherwise <code>false</code>.
* @see DirectoryReader#isCurrent()
*/
public boolean isSearcherCurrent() throws IOException {
final IndexSearcher searcher = acquire();
try {
final IndexReader r = searcher.getIndexReader();
assert r instanceof DirectoryReader: "searcher's IndexReader should be a DirectoryReader, but got " + r;
return ((DirectoryReader) r).isCurrent();
} finally {
release(searcher);
}
}
/** Expert: creates a searcher from the provided {@link
* IndexReader} using the provided {@link
* SearcherFactory}. NOTE: this decRefs incoming reader
* on throwing an exception. */
public static IndexSearcher getSearcher(SearcherFactory searcherFactory, IndexReader reader) throws IOException {
boolean success = false;
final IndexSearcher searcher;
try {
searcher = searcherFactory.newSearcher(reader);
if (searcher.getIndexReader() != reader) {
throw new IllegalStateException("SearcherFactory must wrap exactly the provided reader (got " + searcher.getIndexReader() + " but expected " + reader + ")");
}
success = true;
} finally {
if (!success) {
reader.decRef();
}
}
return searcher;
}
}

View File

@ -22,10 +22,7 @@ package org.elasticsearch.index.engine.internal;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import org.apache.lucene.index.*; import org.apache.lucene.index.*;
import org.apache.lucene.index.IndexWriter.IndexReaderWarmer; import org.apache.lucene.index.IndexWriter.IndexReaderWarmer;
import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.*;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.SearcherFactory;
import org.apache.lucene.search.SearcherManager;
import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.LockObtainFailedException; import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRef;
@ -120,7 +117,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
private volatile IndexWriter indexWriter; private volatile IndexWriter indexWriter;
private final SearcherFactory searcherFactory = new SearchFactory(); private final SearcherFactory searcherFactory = new SearchFactory();
private volatile SearcherManager searcherManager; private volatile XSearcherManager searcherManager;
private volatile boolean closed = false; private volatile boolean closed = false;
@ -680,7 +677,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
@Override @Override
public final Searcher acquireSearcher(String source) throws EngineException { public final Searcher acquireSearcher(String source) throws EngineException {
SearcherManager manager = this.searcherManager; XSearcherManager manager = this.searcherManager;
if (manager == null) { if (manager == null) {
throw new EngineClosedException(shardId); throw new EngineClosedException(shardId);
} }
@ -693,7 +690,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
} }
} }
protected Searcher newSearcher(String source, IndexSearcher searcher, SearcherManager manager) { protected Searcher newSearcher(String source, IndexSearcher searcher, XSearcherManager manager) {
return new EngineSearcher(source, searcher, manager); return new EngineSearcher(source, searcher, manager);
} }
@ -797,7 +794,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
translog.newTranslog(translogId); translog.newTranslog(translogId);
} }
SearcherManager current = this.searcherManager; XSearcherManager current = this.searcherManager;
this.searcherManager = buildSearchManager(indexWriter); this.searcherManager = buildSearchManager(indexWriter);
try { try {
IOUtils.close(current); IOUtils.close(current);
@ -1458,18 +1455,18 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
} }
} }
private SearcherManager buildSearchManager(IndexWriter indexWriter) throws IOException { private XSearcherManager buildSearchManager(IndexWriter indexWriter) throws IOException {
return new SearcherManager(indexWriter, true, searcherFactory); return new XSearcherManager(indexWriter, true, searcherFactory);
} }
static class EngineSearcher implements Searcher { static class EngineSearcher implements Searcher {
private final String source; private final String source;
private final IndexSearcher searcher; private final IndexSearcher searcher;
private final SearcherManager manager; private final XSearcherManager manager;
private final AtomicBoolean released; private final AtomicBoolean released;
private EngineSearcher(String source, IndexSearcher searcher, SearcherManager manager) { private EngineSearcher(String source, IndexSearcher searcher, XSearcherManager manager) {
this.source = source; this.source = source;
this.searcher = searcher; this.searcher = searcher;
this.manager = manager; this.manager = manager;

View File

@ -22,7 +22,7 @@ package org.elasticsearch.test.engine;
import org.apache.lucene.index.*; import org.apache.lucene.index.*;
import org.apache.lucene.search.AssertingIndexSearcher; import org.apache.lucene.search.AssertingIndexSearcher;
import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.SearcherManager; import org.apache.lucene.search.XSearcherManager;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
@ -94,7 +94,7 @@ public final class MockInternalEngine extends InternalEngine implements Engine {
} }
@Override @Override
protected Searcher newSearcher(String source, IndexSearcher searcher, SearcherManager manager) throws EngineException { protected Searcher newSearcher(String source, IndexSearcher searcher, XSearcherManager manager) throws EngineException {
IndexReader reader = searcher.getIndexReader(); IndexReader reader = searcher.getIndexReader();
IndexReader wrappedReader = reader; IndexReader wrappedReader = reader;