LUCENE-3761: generalize SearcherManager (port to trunk)

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1244000 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Shai Erera 2012-02-14 15:36:14 +00:00
parent 78a5be5e28
commit f4c6a4098e
5 changed files with 256 additions and 152 deletions

View File

@ -343,7 +343,7 @@ public class NRTManager implements Closeable {
}
boolean setSearchGen;
if (!mgr.isSearcherCurrent()) {
setSearchGen = mgr.maybeReopen();
setSearchGen = mgr.maybeRefresh();
} else {
setSearchGen = true;
}

View File

@ -0,0 +1,164 @@
package org.apache.lucene.search;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.Semaphore;
import org.apache.lucene.store.AlreadyClosedException;
/**
* Utility class to safely share instances of a certain type across multiple
* threads, while periodically refreshing them. This class ensures each
* reference is closed only once all threads have finished using it. It is
* recommended to consult the documentation of {@link ReferenceManager}
* implementations for their {@link #maybeRefresh()} semantics.
*
* @param <G>
* the concrete type that will be {@link #acquire() acquired} and
* {@link #release(Object) released}.
*
* @lucene.experimental
*/
public abstract class ReferenceManager<G> implements Closeable {
private static final String REFERENCE_MANAGER_IS_CLOSED_MSG = "this ReferenceManager is closed";
protected volatile G current;
private final Semaphore reopenLock = new Semaphore(1);
private void ensureOpen() {
if (current == null) {
throw new AlreadyClosedException(REFERENCE_MANAGER_IS_CLOSED_MSG);
}
}
private synchronized void swapReference(G newReference) throws IOException {
ensureOpen();
final G oldReference = current;
current = newReference;
release(oldReference);
}
/** Decrement reference counting on the given reference. */
protected abstract void decRef(G reference) throws IOException;
/**
* Refresh the given reference if needed. Returns {@code null} if no refresh
* was needed, otherwise a new refreshed reference.
*/
protected abstract G refreshIfNeeded(G referenceToRefresh) throws IOException;
/**
* Try to increment reference counting on the given reference. Return true if
* the operation was successful.
*/
protected abstract boolean tryIncRef(G reference);
/**
* Obtain the current reference. You must match every call to acquire with one
* call to {@link #release}; it's best to do so in a finally clause, and set
* the reference to {@code null} to prevent accidental usage after it has been
* released.
*/
public final G acquire() {
G ref;
do {
if ((ref = current) == null) {
throw new AlreadyClosedException(REFERENCE_MANAGER_IS_CLOSED_MSG);
}
} while (!tryIncRef(ref));
return ref;
}
/**
* Close this ReferenceManager to future {@link #acquire() acquiring}. Any
* references that were previously {@link #acquire() acquired} won't be
* affected, and they should still be {@link #release released} when they are
* not needed anymore.
*/
public final synchronized void close() throws IOException {
if (current != null) {
// make sure we can call this more than once
// closeable javadoc says:
// if this is already closed then invoking this method has no effect.
swapReference(null);
}
}
/**
* You must call this, periodically, if you want that {@link #acquire()} will
* return refreshed instances.
*
* <p>
* <b>Threads</b>: it's fine for more than one thread to call this at once.
* Only the first thread will attempt the refresh; subsequent threads will see
* that another thread is already handling refresh and will return
* immediately. Note that this means if another thread is already refreshing
* then subsequent threads will return right away without waiting for the
* refresh to complete.
*
* <p>
* This method returns true if the reference was in fact refreshed, or if the
* current reference has no pending changes.
*/
public final boolean maybeRefresh() throws IOException {
ensureOpen();
// Ensure only 1 thread does reopen at once; other threads just return immediately:
if (reopenLock.tryAcquire()) {
try {
final G reference = acquire();
try {
G newReference = refreshIfNeeded(reference);
if (newReference != null) {
assert newReference != reference : "refreshIfNeeded should return null if refresh wasn't needed";
boolean success = false;
try {
swapReference(newReference);
success = true;
} finally {
if (!success) {
release(newReference);
}
}
}
} finally {
release(reference);
}
return true;
} finally {
reopenLock.release();
}
} else {
return false;
}
}
/**
* Release the refernce previously obtained via {@link #acquire()}.
* <p>
* <b>NOTE:</b> it's safe to call this after {@link #close()}.
*/
public final void release(G reference) throws IOException {
assert reference != null;
decRef(reference);
}
}

View File

@ -17,17 +17,11 @@ package org.apache.lucene.search;
* limitations under the License.
*/
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.Semaphore;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.search.NRTManager; // javadocs
import org.apache.lucene.search.IndexSearcher; // javadocs
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
/**
@ -51,42 +45,40 @@ import org.apache.lucene.store.Directory;
* </pre>
*
* <p>
* In addition you should periodically call {@link #maybeReopen}. While it's
* In addition you should periodically call {@link #maybeRefresh}. While it's
* possible to call this just before running each query, this is discouraged
* since it penalizes the unlucky queries that do the reopen. It's better to use
* a separate background thread, that periodically calls maybeReopen. Finally,
* be sure to call {@link #close} once you are done.
*
* <p>
* <b>NOTE</b>: if you have an {@link IndexWriter}, it's better to use
* {@link NRTManager} since that class pulls near-real-time readers from the
* IndexWriter.
*
* @see SearcherFactory
*
* @lucene.experimental
*/
public final class SearcherManager extends ReferenceManager<IndexSearcher> {
public final class SearcherManager implements Closeable {
private volatile IndexSearcher currentSearcher;
private final SearcherFactory searcherFactory;
private final Semaphore reopenLock = new Semaphore(1);
/**
* Creates and returns a new SearcherManager from the given {@link IndexWriter}.
* @param writer the IndexWriter to open the IndexReader from.
* @param applyAllDeletes If <code>true</code>, all buffered deletes will
* be applied (made visible) in the {@link IndexSearcher} / {@link DirectoryReader}.
* If <code>false</code>, the deletes may or may not be applied, but remain buffered
* (in IndexWriter) so that they will be applied in the future.
* Applying deletes can be costly, so if your app can tolerate deleted documents
* being returned you might gain some performance by passing <code>false</code>.
* See {@link DirectoryReader#openIfChanged(DirectoryReader, IndexWriter, boolean)}.
* @param searcherFactory An optional {@link SearcherFactory}. Pass
* <code>null</code> if you don't require the searcher to be warmed
* before going live or other custom behavior.
*
* Creates and returns a new SearcherManager from the given
* {@link IndexWriter}.
*
* @param writer
* the IndexWriter to open the IndexReader from.
* @param applyAllDeletes
* If <code>true</code>, all buffered deletes will be applied (made
* visible) in the {@link IndexSearcher} / {@link DirectoryReader}.
* If <code>false</code>, the deletes may or may not be applied, but
* remain buffered (in IndexWriter) so that they will be applied in
* the future. Applying deletes can be costly, so if your app can
* tolerate deleted documents being returned you might gain some
* performance by passing <code>false</code>. See
* {@link DirectoryReader#openIfChanged(DirectoryReader, IndexWriter, boolean)}.
* @param searcherFactory
* An optional {@link SearcherFactory}. Pass <code>null</code> if you
* don't require the searcher to be warmed before going live or other
* custom behavior.
*
* @throws IOException
*/
public SearcherManager(IndexWriter writer, boolean applyAllDeletes, SearcherFactory searcherFactory) throws IOException {
@ -94,7 +86,29 @@ public final class SearcherManager implements Closeable {
searcherFactory = new SearcherFactory();
}
this.searcherFactory = searcherFactory;
currentSearcher = searcherFactory.newSearcher(DirectoryReader.open(writer, applyAllDeletes));
current = searcherFactory.newSearcher(DirectoryReader.open(writer, applyAllDeletes));
}
@Override
protected void decRef(IndexSearcher reference) throws IOException {
reference.getIndexReader().decRef();
}
@Override
protected IndexSearcher refreshIfNeeded(IndexSearcher referenceToRefresh) throws IOException {
final IndexReader r = referenceToRefresh.getIndexReader();
final IndexReader newReader = (r instanceof DirectoryReader) ?
DirectoryReader.openIfChanged((DirectoryReader) r) : null;
if (newReader == null) {
return null;
} else {
return searcherFactory.newSearcher(newReader);
}
}
@Override
protected boolean tryIncRef(IndexSearcher reference) {
return reference.getIndexReader().tryIncRef();
}
/**
@ -111,75 +125,15 @@ public final class SearcherManager implements Closeable {
searcherFactory = new SearcherFactory();
}
this.searcherFactory = searcherFactory;
currentSearcher = searcherFactory.newSearcher(DirectoryReader.open(dir));
current = searcherFactory.newSearcher(DirectoryReader.open(dir));
}
/**
* You must call this, periodically, to perform a reopen. This calls
* {@link DirectoryReader#openIfChanged(DirectoryReader)} with the underlying reader, and if that returns a
* new reader, it's warmed (if you provided a {@link SearcherFactory} and then
* swapped into production.
*
* <p>
* <b>Threads</b>: it's fine for more than one thread to call this at once.
* Only the first thread will attempt the reopen; subsequent threads will see
* that another thread is already handling reopen and will return immediately.
* Note that this means if another thread is already reopening then subsequent
* threads will return right away without waiting for the reader reopen to
* complete.
* </p>
*
* <p>
* This method returns true if a new reader was in fact opened or
* if the current searcher has no pending changes.
* </p>
*/
public boolean maybeReopen() throws IOException {
ensureOpen();
// Ensure only 1 thread does reopen at once; other
// threads just return immediately:
if (reopenLock.tryAcquire()) {
try {
// IR.openIfChanged preserves NRT and applyDeletes
// in the newly returned reader:
final IndexReader newReader;
final IndexSearcher searcherToReopen = acquire();
try {
final IndexReader r = searcherToReopen.getIndexReader();
newReader = (r instanceof DirectoryReader) ?
DirectoryReader.openIfChanged((DirectoryReader) r) :
null;
} finally {
release(searcherToReopen);
}
if (newReader != null) {
final IndexSearcher newSearcher = searcherFactory.newSearcher(newReader);
boolean success = false;
try {
swapSearcher(newSearcher);
success = true;
} finally {
if (!success) {
release(newSearcher);
}
}
}
return true;
} finally {
reopenLock.release();
}
} else {
return false;
}
}
/**
* Returns <code>true</code> if no changes have occured since this searcher
* ie. reader was opened, otherwise <code>false</code>.
* @see DirectoryReader#isCurrent()
*/
public boolean isSearcherCurrent() throws CorruptIndexException,
IOException {
public boolean isSearcherCurrent() throws IOException {
final IndexSearcher searcher = acquire();
try {
final IndexReader r = searcher.getIndexReader();
@ -190,56 +144,5 @@ public final class SearcherManager implements Closeable {
release(searcher);
}
}
/**
* Release the searcher previously obtained with {@link #acquire}.
*
* <p>
* <b>NOTE</b>: it's safe to call this after {@link #close}.
*/
public void release(IndexSearcher searcher) throws IOException {
assert searcher != null;
searcher.getIndexReader().decRef();
}
/**
* Close this SearcherManager to future searching. Any searches still in
* process in other threads won't be affected, and they should still call
* {@link #release} after they are done.
*/
public synchronized void close() throws IOException {
if (currentSearcher != null) {
// make sure we can call this more than once
// closeable javadoc says:
// if this is already closed then invoking this method has no effect.
swapSearcher(null);
}
}
/**
* Obtain the current IndexSearcher. You must match every call to acquire with
* one call to {@link #release}; it's best to do so in a finally clause.
*/
public IndexSearcher acquire() {
IndexSearcher searcher;
do {
if ((searcher = currentSearcher) == null) {
throw new AlreadyClosedException("this SearcherManager is closed");
}
} while (!searcher.getIndexReader().tryIncRef());
return searcher;
}
private void ensureOpen() {
if (currentSearcher == null) {
throw new AlreadyClosedException("this SearcherManager is closed");
}
}
private synchronized void swapSearcher(IndexSearcher newSearcher) throws IOException {
ensureOpen();
final IndexSearcher oldSearcher = currentSearcher;
currentSearcher = newSearcher;
release(oldSearcher);
}
}

View File

@ -31,9 +31,9 @@ import org.apache.lucene.document.Document;
import org.apache.lucene.index.ConcurrentMergeScheduler;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.ThreadedIndexingAndSearchingTestCase;
import org.apache.lucene.search.SearcherFactory;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.LuceneTestCase.UseNoMemoryExpensiveCodec;
@ -57,7 +57,7 @@ public class TestSearcherManager extends ThreadedIndexingAndSearchingTestCase {
if (!isNRT) {
writer.commit();
}
assertTrue(mgr.maybeReopen() || mgr.isSearcherCurrent());
assertTrue(mgr.maybeRefresh() || mgr.isSearcherCurrent());
return mgr.acquire();
}
@ -105,7 +105,7 @@ public class TestSearcherManager extends ThreadedIndexingAndSearchingTestCase {
Thread.sleep(_TestUtil.nextInt(random, 1, 100));
writer.commit();
Thread.sleep(_TestUtil.nextInt(random, 1, 5));
if (mgr.maybeReopen()) {
if (mgr.maybeRefresh()) {
lifetimeMGR.prune(pruner);
}
}
@ -134,7 +134,7 @@ public class TestSearcherManager extends ThreadedIndexingAndSearchingTestCase {
// synchronous to your search threads, but still we
// test as apps will presumably do this for
// simplicity:
if (mgr.maybeReopen()) {
if (mgr.maybeRefresh()) {
lifetimeMGR.prune(pruner);
}
}
@ -237,7 +237,7 @@ public class TestSearcherManager extends ThreadedIndexingAndSearchingTestCase {
if (VERBOSE) {
System.out.println("NOW call maybeReopen");
}
searcherManager.maybeReopen();
searcherManager.maybeRefresh();
success.set(true);
} catch (AlreadyClosedException e) {
// expected
@ -280,4 +280,41 @@ public class TestSearcherManager extends ThreadedIndexingAndSearchingTestCase {
es.awaitTermination(1, TimeUnit.SECONDS);
}
}
public void testCloseTwice() throws Exception {
// test that we can close SM twice (per Closeable's contract).
Directory dir = newDirectory();
new IndexWriter(dir, new IndexWriterConfig(TEST_VERSION_CURRENT, null)).close();
SearcherManager sm = new SearcherManager(dir, null);
sm.close();
sm.close();
dir.close();
}
public void testEnsureOpen() throws Exception {
Directory dir = newDirectory();
new IndexWriter(dir, new IndexWriterConfig(TEST_VERSION_CURRENT, null)).close();
SearcherManager sm = new SearcherManager(dir, null);
IndexSearcher s = sm.acquire();
sm.close();
// this should succeed;
sm.release(s);
try {
// this should fail
sm.acquire();
} catch (AlreadyClosedException e) {
// ok
}
try {
// this should fail
sm.maybeRefresh();
} catch (AlreadyClosedException e) {
// ok
}
dir.close();
}
}

View File

@ -485,7 +485,7 @@ public abstract class ShardSearchingTestBase extends LuceneTestCase {
final IndexSearcher before = mgr.acquire();
mgr.release(before);
mgr.maybeReopen();
mgr.maybeRefresh();
final IndexSearcher after = mgr.acquire();
try {
if (after != before) {