LUCENE-3488: Factor out SearcherManager out of NRTManager

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1179956 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Simon Willnauer 2011-10-07 08:05:11 +00:00
parent 12a93306a8
commit 70ee6dbdb6
6 changed files with 478 additions and 398 deletions

View File

@ -176,6 +176,11 @@ API Changes
new FieldValueQueryNode<CharSequence>, which this last one implements
FieldableQueryNode and thew new ValueQueryNode
(Vinicius Barros via Uwe Schindler)
* LUCENE-3488: Factored out SearcherManager from NRTManager. NRTManager
now manages SearcherManager instances instead of IndexSearcher directly.
Acquiring a SearcherManager is non-blocking unless the caller explicitly
requires to acquire a certain SearcherManager generation. (Simon Willnauer)
Optimizations

View File

@ -22,23 +22,25 @@ import java.io.IOException;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.index.IndexReader; // javadocs
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.index.IndexReader; // javadocs
import org.apache.lucene.search.IndexSearcher; // javadocs
import org.apache.lucene.search.Query;
import org.apache.lucene.search.SearcherManager;
import org.apache.lucene.search.SearcherWarmer;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.ThreadInterruptedException;
// TODO
// - we could make this work also w/ "normal" reopen/commit?
/**
* Utility class to manage sharing near-real-time searchers
* across multiple searching threads.
*
* <p>NOTE: to use this class, you must call reopen
* <p>NOTE: to use this class, you must call {@link #maybeReopen(boolean)}
* periodically. The {@link NRTManagerReopenThread} is a
* simple class to do this on a periodic basis. If you
* implement your own reopener, be sure to call {@link
@ -50,15 +52,12 @@ import org.apache.lucene.util.ThreadInterruptedException;
public class NRTManager implements Closeable {
private final IndexWriter writer;
private final ExecutorService es;
private final SearcherManagerRef withoutDeletes;
private final SearcherManagerRef withDeletes;
private final AtomicLong indexingGen;
private final AtomicLong searchingGen;
private final AtomicLong noDeletesSearchingGen;
private final SearcherWarmer warmer;
private final List<WaitingListener> waitingListeners = new CopyOnWriteArrayList<WaitingListener>();
private volatile IndexSearcher currentSearcher;
private volatile IndexSearcher noDeletesCurrentSearcher;
private final ReentrantLock reopenLock = new ReentrantLock();
private final Condition newGeneration = reopenLock.newCondition();
/**
* Create new NRTManager.
@ -66,9 +65,8 @@ public class NRTManager implements Closeable {
* @param writer IndexWriter to open near-real-time
* readers
* @param es optional ExecutorService so different segments can
* be searched concurrently (see {@link
* IndexSearcher#IndexSearcher(IndexReader,ExecutorService)}. Pass null
* to search segments sequentially.
* be searched concurrently (see {@link IndexSearcher#IndexSearcher(IndexReader, ExecutorService)}.
* Pass <code>null</code> to search segments sequentially.
* @param warmer optional {@link SearcherWarmer}. Pass
* null if you don't require the searcher to warmed
* before going live. If this is non-null then a
@ -79,29 +77,30 @@ public class NRTManager implements Closeable {
* not invoked for the initial searcher; you should
* warm it yourself if necessary.
*/
public NRTManager(IndexWriter writer, ExecutorService es, SearcherWarmer warmer) throws IOException {
this.writer = writer;
this.es = es;
this.warmer = warmer;
indexingGen = new AtomicLong(1);
searchingGen = new AtomicLong(-1);
noDeletesSearchingGen = new AtomicLong(-1);
// Create initial reader:
swapSearcher(new IndexSearcher(IndexReader.open(writer, true), es), 0, true);
if (this.warmer != null) {
writer.getConfig().setMergedSegmentWarmer(
new IndexWriter.IndexReaderWarmer() {
@Override
public void warm(IndexReader reader) throws IOException {
NRTManager.this.warmer.warm(new IndexSearcher(reader, NRTManager.this.es));
}
});
}
public NRTManager(IndexWriter writer, ExecutorService es,
SearcherWarmer warmer) throws IOException {
this(writer, es, warmer, true);
}
/**
* Expert: just like {@link
* #NRTManager(IndexWriter,ExecutorService,SearcherWarmer)},
* but you can also specify whether every searcher must
* apply deletes. This is useful for cases where certain
* uses can tolerate seeing some deleted docs, since
* reopen time is faster if deletes need not be applied. */
public NRTManager(IndexWriter writer, ExecutorService es,
SearcherWarmer warmer, boolean alwaysApplyDeletes) throws IOException {
this.writer = writer;
if (alwaysApplyDeletes) {
withoutDeletes = withDeletes = new SearcherManagerRef(true, 0, SearcherManager.open(writer, true, warmer, es));
} else {
withDeletes = new SearcherManagerRef(true, 0, SearcherManager.open(writer, true, warmer, es));
withoutDeletes = new SearcherManagerRef(false, 0, SearcherManager.open(writer, false, warmer, es));
}
indexingGen = new AtomicLong(1);
}
/** NRTManager invokes this interface to notify it when a
* caller is waiting for a specific generation searcher
* to be visible. */
@ -182,201 +181,162 @@ public class NRTManager implements Closeable {
return indexingGen.get();
}
/** Returns the most current searcher. If you require a
* certain indexing generation be visible in the returned
* searcher, call {@link #get(long)}
* instead.
/**
* Waits for a given {@link SearcherManager} target generation to be available
* via {@link #getSearcherManager(boolean)}. If the current generation is less
* than the given target generation this method will block until the
* correspondent {@link SearcherManager} is reopened by another thread via
* {@link #maybeReopen(boolean)} or until the {@link NRTManager} is closed.
*
* @param targetGen the generation to wait for
* @param requireDeletes <code>true</code> iff the generation requires deletes to be applied otherwise <code>false</code>
* @return the {@link SearcherManager} with the given target generation
*/
public synchronized IndexSearcher get() {
return get(true);
public SearcherManager waitForGeneration(long targetGen, boolean requireDeletes) {
return waitForGeneration(targetGen, requireDeletes, -1, TimeUnit.NANOSECONDS);
}
/** Just like {@link #get}, but by passing <code>false</code> for
* requireDeletes, you can get faster reopen time, but
* the returned reader is allowed to not reflect all
* deletions. See {@link IndexReader#open(IndexWriter,boolean)} */
public synchronized IndexSearcher get(boolean requireDeletes) {
final IndexSearcher s;
if (requireDeletes) {
s = currentSearcher;
} else if (noDeletesSearchingGen.get() > searchingGen.get()) {
s = noDeletesCurrentSearcher;
} else {
s = currentSearcher;
}
s.getIndexReader().incRef();
return s;
}
/** Call this if you require a searcher reflecting all
* changes as of the target generation.
*
* @param targetGen Returned searcher must reflect changes
* as of this generation
/**
* Waits for a given {@link SearcherManager} target generation to be available
* via {@link #getSearcherManager(boolean)}. If the current generation is less
* than the given target generation this method will block until the
* correspondent {@link SearcherManager} is reopened by another thread via
* {@link #maybeReopen(boolean)}, the given waiting time has elapsed, or until
* the {@link NRTManager} is closed.
* <p>
* NOTE: if the waiting time elapses before the requested target generation is
* available the latest {@link SearcherManager} is returned instead.
*
* @param targetGen
* the generation to wait for
* @param requireDeletes
* <code>true</code> iff the generation requires deletes to be
* applied otherwise <code>false</code>
* @param time
* the time to wait for the target generation
* @param unit
* the waiting time's time unit
* @return the {@link SearcherManager} with the given target generation or the
* latest {@link SearcherManager} if the waiting time elapsed before
* the requested generation is available.
*/
public synchronized IndexSearcher get(long targetGen) {
return get(targetGen, true);
}
/** Call this if you require a searcher reflecting all
* changes as of the target generation, and you don't
* require deletions to be reflected. Note that the
* returned searcher may still reflect some or all
* deletions.
*
* @param targetGen Returned searcher must reflect changes
* as of this generation
*
* @param requireDeletes If true, the returned searcher must
* reflect all deletions. This can be substantially more
* costly than not applying deletes. Note that if you
* pass false, it's still possible that some or all
* deletes may have been applied.
**/
public synchronized IndexSearcher get(long targetGen, boolean requireDeletes) {
assert noDeletesSearchingGen.get() >= searchingGen.get(): "noDeletesSearchingGen=" + noDeletesSearchingGen.get() + " searchingGen=" + searchingGen.get();
if (targetGen > getCurrentSearchingGen(requireDeletes)) {
// Must wait
//final long t0 = System.nanoTime();
for(WaitingListener listener : waitingListeners) {
listener.waiting(requireDeletes, targetGen);
}
while (targetGen > getCurrentSearchingGen(requireDeletes)) {
//System.out.println(Thread.currentThread().getName() + ": wait fresh searcher targetGen=" + targetGen + " vs searchingGen=" + getCurrentSearchingGen(requireDeletes) + " requireDeletes=" + requireDeletes);
try {
wait();
} catch (InterruptedException ie) {
throw new ThreadInterruptedException(ie);
public SearcherManager waitForGeneration(long targetGen, boolean requireDeletes, long time, TimeUnit unit) {
try {
reopenLock.lockInterruptibly();
try {
if (targetGen > getCurrentSearchingGen(requireDeletes)) {
for (WaitingListener listener : waitingListeners) {
listener.waiting(requireDeletes, targetGen);
}
while (targetGen > getCurrentSearchingGen(requireDeletes)) {
if (!waitOnGenCondition(time, unit)) {
return getSearcherManager(requireDeletes);
}
}
}
}
//final long waitNS = System.nanoTime()-t0;
//System.out.println(Thread.currentThread().getName() + ": done wait fresh searcher targetGen=" + targetGen + " vs searchingGen=" + getCurrentSearchingGen(requireDeletes) + " requireDeletes=" + requireDeletes + " WAIT msec=" + (waitNS/1000000.0));
}
return get(requireDeletes);
} finally {
reopenLock.unlock();
}
} catch (InterruptedException ie) {
throw new ThreadInterruptedException(ie);
}
return getSearcherManager(requireDeletes);
}
private boolean waitOnGenCondition(long time, TimeUnit unit)
throws InterruptedException {
assert reopenLock.isHeldByCurrentThread();
if (time < 0) {
newGeneration.await();
return true;
} else {
return newGeneration.await(time, unit);
}
}
/** Returns generation of current searcher. */
public long getCurrentSearchingGen(boolean requiresDeletes) {
return requiresDeletes ? searchingGen.get() : noDeletesSearchingGen.get();
}
/** Release the searcher obtained from {@link
* #get()} or {@link #get(long)}.
*
* <p><b>NOTE</b>: it's safe to call this after {@link
* #close}. */
public void release(IndexSearcher s) throws IOException {
s.getIndexReader().decRef();
}
/** Call this when you need the NRT reader to reopen.
*
* @param applyDeletes If true, the newly opened reader
* will reflect all deletes
*/
public boolean reopen(boolean applyDeletes) throws IOException {
// Mark gen as of when reopen started:
final long newSearcherGen = indexingGen.getAndIncrement();
if (applyDeletes && currentSearcher.getIndexReader().isCurrent()) {
//System.out.println("reopen: skip: isCurrent both force gen=" + newSearcherGen + " vs current gen=" + searchingGen);
searchingGen.set(newSearcherGen);
noDeletesSearchingGen.set(newSearcherGen);
synchronized(this) {
notifyAll();
}
//System.out.println("reopen: skip: return");
return false;
} else if (!applyDeletes && noDeletesCurrentSearcher.getIndexReader().isCurrent()) {
//System.out.println("reopen: skip: isCurrent force gen=" + newSearcherGen + " vs current gen=" + noDeletesSearchingGen);
noDeletesSearchingGen.set(newSearcherGen);
synchronized(this) {
notifyAll();
}
//System.out.println("reopen: skip: return");
return false;
}
//System.out.println("indexingGen now " + indexingGen);
// .reopen() returns a new reference:
// Start from whichever searcher is most current:
final IndexSearcher startSearcher = noDeletesSearchingGen.get() > searchingGen.get() ? noDeletesCurrentSearcher : currentSearcher;
IndexReader nextReader = IndexReader.openIfChanged(startSearcher.getIndexReader(), writer, applyDeletes);
if (nextReader == null) {
// NOTE: doesn't happen currently in Lucene (reopen on
// NRT reader always returns new reader), but could in
// the future:
nextReader = startSearcher.getIndexReader();
nextReader.incRef();
}
if (nextReader != startSearcher.getIndexReader()) {
final IndexSearcher nextSearcher = new IndexSearcher(nextReader, es);
if (warmer != null) {
boolean success = false;
try {
warmer.warm(nextSearcher);
success = true;
} finally {
if (!success) {
nextReader.decRef();
}
}
}
// Transfer reference to swapSearcher:
swapSearcher(nextSearcher,
newSearcherGen,
applyDeletes);
return true;
public long getCurrentSearchingGen(boolean applyAllDeletes) {
if (applyAllDeletes) {
return withDeletes.generation;
} else {
return false;
return Math.max(withoutDeletes.generation, withDeletes.generation);
}
}
// Steals a reference from newSearcher:
private synchronized void swapSearcher(IndexSearcher newSearcher, long newSearchingGen, boolean applyDeletes) throws IOException {
//System.out.println(Thread.currentThread().getName() + ": swap searcher gen=" + newSearchingGen + " applyDeletes=" + applyDeletes);
public boolean maybeReopen(boolean applyAllDeletes) throws IOException {
if (reopenLock.tryLock()) {
try {
final SearcherManagerRef reference = applyAllDeletes ? withDeletes : withoutDeletes;
// Mark gen as of when reopen started:
final long newSearcherGen = indexingGen.getAndIncrement();
boolean setSearchGen = false;
if (!(setSearchGen = reference.manager.isSearcherCurrent())) {
setSearchGen = reference.manager.maybeReopen();
}
if (setSearchGen) {
reference.generation = newSearcherGen;// update searcher gen
newGeneration.signalAll(); // wake up threads if we have a new generation
}
return setSearchGen;
} finally {
reopenLock.unlock();
}
}
return false;
}
/**
* Close this NRTManager to future searching. Any searches still in process in
* other threads won't be affected, and they should still call
* {@link SearcherManager#release(IndexSearcher)} after they are done.
*
* <p>
* <b>NOTE</b>: caller must separately close the writer.
*/
public synchronized void close() throws IOException {
reopenLock.lock();
try {
IOUtils.close(withDeletes, withoutDeletes);
newGeneration.signalAll();
} finally {
reopenLock.unlock();
}
}
/**
* Returns a {@link SearcherManager}. If <code>applyAllDeletes</code> is
* <code>true</code> the returned manager is guaranteed to have all deletes
* applied on the last reopen. Otherwise the latest manager with or without deletes
* is returned.
*/
public SearcherManager getSearcherManager(boolean applyAllDeletes) {
if (applyAllDeletes) {
return withDeletes.manager;
} else {
if (withDeletes.generation > withoutDeletes.generation) {
return withDeletes.manager;
} else {
return withoutDeletes.manager;
}
}
}
static final class SearcherManagerRef implements Closeable {
final boolean applyDeletes;
volatile long generation;
final SearcherManager manager;
SearcherManagerRef(boolean applyDeletes, long generation, SearcherManager manager) {
super();
this.applyDeletes = applyDeletes;
this.generation = generation;
this.manager = manager;
}
// Always replace noDeletesCurrentSearcher:
if (noDeletesCurrentSearcher != null) {
noDeletesCurrentSearcher.getIndexReader().decRef();
public void close() throws IOException {
generation = Long.MAX_VALUE; // max it out to make sure nobody can wait on another gen
manager.close();
}
noDeletesCurrentSearcher = newSearcher;
assert newSearchingGen > noDeletesSearchingGen.get(): "newSearchingGen=" + newSearchingGen + " noDeletesSearchingGen=" + noDeletesSearchingGen;
noDeletesSearchingGen.set(newSearchingGen);
if (applyDeletes) {
// Deletes were applied, so we also update currentSearcher:
if (currentSearcher != null) {
currentSearcher.getIndexReader().decRef();
}
currentSearcher = newSearcher;
if (newSearcher != null) {
newSearcher.getIndexReader().incRef();
}
assert newSearchingGen > searchingGen.get(): "newSearchingGen=" + newSearchingGen + " searchingGen=" + searchingGen;
searchingGen.set(newSearchingGen);
}
notifyAll();
//System.out.println(Thread.currentThread().getName() + ": done");
}
/** Close this NRTManager to future searching. Any
* searches still in process in other threads won't be
* affected, and they should still call {@link #release}
* after they are done.
*
* <p><b>NOTE</b>: caller must separately close the writer. */
@Override
public void close() throws IOException {
swapSearcher(null, indexingGen.getAndIncrement(), true);
}
}

View File

@ -80,12 +80,13 @@ import org.apache.lucene.util.ThreadInterruptedException;
*/
public class NRTManagerReopenThread extends Thread implements NRTManager.WaitingListener, Closeable {
private final NRTManager manager;
private final long targetMaxStaleNS;
private final long targetMinStaleNS;
private boolean finish;
private boolean waitingNeedsDeletes;
private long waitingGen;
private boolean waitingNeedsDeletes;
/**
* Create NRTManagerReopenThread, to periodically reopen the NRT searcher.
@ -131,7 +132,7 @@ public class NRTManagerReopenThread extends Thread implements NRTManager.Waiting
}
@Override
public void run() {
public void run() {
// TODO: maybe use private thread ticktock timer, in
// case clock shift messes up nanoTime?
long lastReopenStartNS = System.nanoTime();
@ -140,8 +141,6 @@ public class NRTManagerReopenThread extends Thread implements NRTManager.Waiting
try {
while (true) {
final boolean doApplyDeletes;
boolean hasWaiting = false;
synchronized(this) {
@ -176,16 +175,13 @@ public class NRTManagerReopenThread extends Thread implements NRTManager.Waiting
//System.out.println("reopen: finish");
return;
}
doApplyDeletes = hasWaiting ? waitingNeedsDeletes : true;
waitingNeedsDeletes = false;
//System.out.println("reopen: start hasWaiting=" + hasWaiting);
}
lastReopenStartNS = System.nanoTime();
try {
//final long t0 = System.nanoTime();
manager.reopen(doApplyDeletes);
manager.maybeReopen(waitingNeedsDeletes);
//System.out.println("reopen took " + ((System.nanoTime()-t0)/1000000.0) + " msec");
} catch (IOException ioe) {
//System.out.println(Thread.currentThread().getName() + ": IOE");

View File

@ -17,11 +17,11 @@ package org.apache.lucene.search;
* limitations under the License.
*/
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.NRTManager; // javadocs
@ -29,119 +29,83 @@ import org.apache.lucene.search.IndexSearcher; // javadocs
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
/** Utility class to safely share {@link IndexSearcher} instances
* across multiple threads, while periodically reopening.
* This class ensures each IndexSearcher instance is not
* closed until it is no longer needed.
*
* <p>Use {@link #acquire} to obtain the current searcher, and
* {@link #release} to release it, like this:
*
* <pre>
* IndexSearcher s = manager.acquire();
* try {
* // Do searching, doc retrieval, etc. with s
* } finally {
* manager.release(s);
* }
* // Do not use s after this!
* s = null;
* </pre>
*
* <p>In addition you should periodically call {@link
* #maybeReopen}. While it's possible to call this just
* before running each query, this is discouraged since it
* penalizes the unlucky queries that do the reopen. It's
* better to use a separate background thread, that
* periodically calls maybeReopen. Finally, be sure to
* call {@link #close} once you are done.
*
* <p><b>NOTE</b>: if you have an {@link IndexWriter}, it's
* better to use {@link NRTManager} since that class pulls
* near-real-time readers from the IndexWriter.
*
* @lucene.experimental
/**
* Utility class to safely share {@link IndexSearcher} instances across multiple
* threads, while periodically reopening. This class ensures each searcher is
* closed only once all threads have finished using it.
*
* <p>
* Use {@link #acquire} to obtain the current searcher, and {@link #release} to
* release it, like this:
*
* <pre>
* IndexSearcher s = manager.acquire();
* try {
* // Do searching, doc retrieval, etc. with s
* } finally {
* manager.release(s);
* }
* // Do not use s after this!
* s = null;
* </pre>
*
* <p>
* In addition you should periodically call {@link #maybeReopen}. While it's
* possible to call this just before running each query, this is discouraged
* since it penalizes the unlucky queries that do the reopen. It's better to use
* a separate background thread, that periodically calls maybeReopen. Finally,
* be sure to call {@link #close} once you are done.
*
* <p>
* <b>NOTE</b>: if you have an {@link IndexWriter}, it's better to use
* {@link NRTManager} since that class pulls near-real-time readers from the
* IndexWriter.
*
* @lucene.experimental
*/
public class SearcherManager implements Closeable {
public abstract class SearcherManager {
// Current searcher
private volatile IndexSearcher currentSearcher;
private final SearcherWarmer warmer;
private final Semaphore reopening = new Semaphore(1);
private final ExecutorService es;
/** Opens an initial searcher from the Directory.
*
* @param dir Directory to open the searcher from
*
* @param warmer optional {@link SearcherWarmer}. Pass
* null if you don't require the searcher to warmed
* before going live.
*
* <p><b>NOTE</b>: the provided {@link SearcherWarmer} is
* not invoked for the initial searcher; you should
* warm it yourself if necessary.
*/
public SearcherManager(Directory dir, SearcherWarmer warmer) throws IOException {
this(dir, warmer, null);
}
/** Opens an initial searcher from the Directory.
*
* @param dir Directory to open the searcher from
*
* @param warmer optional {@link SearcherWarmer}. Pass
* null if you don't require the searcher to warmed
* before going live.
*
* @param es optional ExecutorService so different segments can
* be searched concurrently (see {@link
* IndexSearcher#IndexSearcher(IndexReader,ExecutorService)}. Pass null
* to search segments sequentially.
*
* <p><b>NOTE</b>: the provided {@link SearcherWarmer} is
* not invoked for the initial searcher; you should
* warm it yourself if necessary.
*/
public SearcherManager(Directory dir, SearcherWarmer warmer, ExecutorService es) throws IOException {
protected volatile IndexSearcher currentSearcher;
protected final ExecutorService es;
protected final SearcherWarmer warmer;
protected final Semaphore reopenLock = new Semaphore(1);
protected SearcherManager(IndexReader openedReader, SearcherWarmer warmer,
ExecutorService es) throws IOException {
this.es = es;
currentSearcher = new IndexSearcher(IndexReader.open(dir), this.es);
this.warmer = warmer;
currentSearcher = new IndexSearcher(openedReader, es);
}
/** You must call this, periodically, to perform a
* reopen. This calls {@link IndexReader#openIfChanged} on the
* underlying reader, and if that returns a new reader,
* it's warmed (if you provided a {@link SearcherWarmer}
* and then swapped into production.
*
* <p><b>Threads</b>: it's fine for more than one thread to
* call this at once. Only the first thread will attempt
* the reopen; subsequent threads will see that another
* thread is already handling reopen and will return
* immediately. Note that this means if another thread
* is already reopening then subsequent threads will
* return right away without waiting for the reader
* reopen to complete.</p>
*
* <p>This method returns true if a new reader was in
* fact opened.</p>
/**
* You must call this, periodically, to perform a reopen. This calls
* {@link #openIfChanged(IndexReader)} with the underlying reader, and if that returns a
* new reader, it's warmed (if you provided a {@link SearcherWarmer} and then
* swapped into production.
*
* <p>
* <b>Threads</b>: it's fine for more than one thread to call this at once.
* Only the first thread will attempt the reopen; subsequent threads will see
* that another thread is already handling reopen and will return immediately.
* Note that this means if another thread is already reopening then subsequent
* threads will return right away without waiting for the reader reopen to
* complete.
* </p>
*
* <p>
* This method returns true if a new reader was in fact opened.
* </p>
*/
public boolean maybeReopen()
throws IOException {
if (currentSearcher == null) {
throw new AlreadyClosedException("this SearcherManager is closed");
}
public boolean maybeReopen() throws IOException {
ensureOpen();
// Ensure only 1 thread does reopen at once; other
// threads just return immediately:
if (reopening.tryAcquire()) {
if (reopenLock.tryAcquire()) {
try {
IndexReader newReader = IndexReader.openIfChanged(currentSearcher.getIndexReader());
final IndexReader newReader = openIfChanged(currentSearcher.getIndexReader());
if (newReader != null) {
IndexSearcher newSearcher = new IndexSearcher(newReader, es);
final IndexSearcher newSearcher = new IndexSearcher(newReader, es);
boolean success = false;
try {
if (warmer != null) {
@ -159,16 +123,57 @@ public class SearcherManager implements Closeable {
return false;
}
} finally {
reopening.release();
reopenLock.release();
}
} else {
return false;
}
}
/** Obtain the current IndexSearcher. You must match
* every call to acquire with one call to {@link #release};
* it's best to do so in a finally clause. */
/**
* Returns <code>true</code> if no changes have occured since this searcher
* ie. reader was opened, otherwise <code>false</code>.
* @see IndexReader#isCurrent()
*/
public boolean isSearcherCurrent() throws CorruptIndexException,
IOException {
final IndexSearcher searcher = acquire();
try {
return searcher.getIndexReader().isCurrent();
} finally {
release(searcher);
}
}
/**
* Release the searcher previously obtained with {@link #acquire}.
*
* <p>
* <b>NOTE</b>: it's safe to call this after {@link #close}.
*/
public void release(IndexSearcher searcher) throws IOException {
assert searcher != null;
searcher.getIndexReader().decRef();
}
/**
* Close this SearcherManager to future searching. Any searches still in
* process in other threads won't be affected, and they should still call
* {@link #release} after they are done.
*/
public synchronized void close() throws IOException {
if (currentSearcher != null) {
// make sure we can call this more than once
// closeable javadoc says:
// if this is already closed then invoking this method has no effect.
swapSearcher(null);
}
}
/**
* Obtain the current IndexSearcher. You must match every call to acquire with
* one call to {@link #release}; it's best to do so in a finally clause.
*/
public IndexSearcher acquire() {
IndexSearcher searcher;
do {
@ -177,40 +182,130 @@ public class SearcherManager implements Closeable {
}
} while (!searcher.getIndexReader().tryIncRef());
return searcher;
}
/** Release the searcher previously obtained with {@link
* #acquire}.
*
* <p><b>NOTE</b>: it's safe to call this after {@link
* #close}. */
public void release(IndexSearcher searcher)
throws IOException {
searcher.getIndexReader().decRef();
}
// Replaces old searcher with new one - needs to be synced to make close() work
private synchronized void swapSearcher(IndexSearcher newSearcher)
throws IOException {
IndexSearcher oldSearcher = currentSearcher;
if (oldSearcher == null) {
private void ensureOpen() {
if (currentSearcher == null) {
throw new AlreadyClosedException("this SearcherManager is closed");
}
}
protected synchronized void swapSearcher(IndexSearcher newSearcher) throws IOException {
ensureOpen();
final IndexSearcher oldSearcher = currentSearcher;
currentSearcher = newSearcher;
release(oldSearcher);
}
/** Close this SearcherManager to future searching. Any
* searches still in process in other threads won't be
* affected, and they should still call {@link #release}
* after they are done. */
@Override
public synchronized void close() throws IOException {
if (currentSearcher != null) {
// make sure we can call this more than once
// closeable javadoc says:
// if this is already closed then invoking this method has no effect.
swapSearcher(null);
protected abstract IndexReader openIfChanged(IndexReader oldReader)
throws IOException;
/**
* Creates and returns a new SearcherManager from the given {@link IndexWriter}.
* @param writer the IndexWriter to open the IndexReader from.
* @param applyAllDeletes If <code>true</code>, all buffered deletes will
* be applied (made visible) in the {@link IndexSearcher} / {@link IndexReader}.
* If <code>false</code>, the deletes are not applied but remain buffered
* (in IndexWriter) so that they will be applied in the future.
* Applying deletes can be costly, so if your app can tolerate deleted documents
* being returned you might gain some performance by passing <code>false</code>.
* @param warmer An optional {@link SearcherWarmer}. Pass
* <code>null</code> if you don't require the searcher to warmed
* before going live. If this is <code>non-null</code> then a
* merged segment warmer is installed on the
* provided IndexWriter's config.
* @param es An optional {@link ExecutorService} so different segments can
* be searched concurrently (see {@link
* IndexSearcher#IndexSearcher(IndexReader,ExecutorService)}. Pass <code>null</code>
* to search segments sequentially.
*
* @see IndexReader#openIfChanged(IndexReader, IndexWriter, boolean)
* @throws IOException
*/
public static SearcherManager open(IndexWriter writer, boolean applyAllDeletes,
SearcherWarmer warmer, ExecutorService es) throws IOException {
final IndexReader open = IndexReader.open(writer, true);
boolean success = false;
try {
SearcherManager manager = new NRTSearcherManager(writer, applyAllDeletes,
open, warmer, es);
success = true;
return manager;
} finally {
if (!success) {
open.close();
}
}
}
/**
* Creates and returns a new SearcherManager from the given {@link Directory}.
* @param dir the directory to open the IndexReader on.
* @param warmer An optional {@link SearcherWarmer}. Pass
* <code>null</code> if you don't require the searcher to warmed
* before going live. If this is <code>non-null</code> then a
* merged segment warmer is installed on the
* provided IndexWriter's config.
* @param es And optional {@link ExecutorService} so different segments can
* be searched concurrently (see {@link
* IndexSearcher#IndexSearcher(IndexReader,ExecutorService)}. Pass <code>null</code>
* to search segments sequentially.
*
* @throws IOException
*/
public static SearcherManager open(Directory dir, SearcherWarmer warmer,
ExecutorService es) throws IOException {
final IndexReader open = IndexReader.open(dir, true);
boolean success = false;
try {
SearcherManager manager = new DirectorySearchManager(open, warmer, es);
success = true;
return manager;
} finally {
if (!success) {
open.close();
}
}
}
static final class NRTSearcherManager extends SearcherManager {
private final IndexWriter writer;
private final boolean applyDeletes;
NRTSearcherManager(final IndexWriter writer, final boolean applyDeletes,
final IndexReader openedReader, final SearcherWarmer warmer, final ExecutorService es)
throws IOException {
super(openedReader, warmer, es);
this.writer = writer;
this.applyDeletes = applyDeletes;
if (warmer != null) {
writer.getConfig().setMergedSegmentWarmer(
new IndexWriter.IndexReaderWarmer() {
@Override
public void warm(IndexReader reader) throws IOException {
warmer.warm(new IndexSearcher(reader, es));
}
});
}
}
@Override
protected IndexReader openIfChanged(IndexReader oldReader)
throws IOException {
return IndexReader.openIfChanged(oldReader, writer, applyDeletes);
}
}
static final class DirectorySearchManager extends SearcherManager {
DirectorySearchManager(IndexReader openedReader,
SearcherWarmer warmer, ExecutorService es) throws IOException {
super(openedReader, warmer, es);
}
@Override
protected IndexReader openIfChanged(IndexReader oldReader)
throws IOException {
return IndexReader.openIfChanged(oldReader, true);
}
}
}

View File

@ -22,6 +22,7 @@ import java.util.List;
import java.util.concurrent.ExecutorService;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.SearcherManager;
import org.apache.lucene.search.SearcherWarmer;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.store.Directory;
@ -41,7 +42,8 @@ public class TestNRTManager extends ThreadedIndexingAndSearchingTestCase {
if (VERBOSE) {
System.out.println("TEST: finalSearcher maxGen=" + maxGen);
}
return nrt.get(maxGen, true);
final SearcherManager manager = nrt.waitForGeneration(maxGen, true);
return manager.acquire();
}
@Override
@ -67,14 +69,15 @@ public class TestNRTManager extends ThreadedIndexingAndSearchingTestCase {
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": nrt: verify " + id);
}
final IndexSearcher s = nrt.get(gen, true);
SearcherManager manager = nrt.waitForGeneration(gen, true);
final IndexSearcher s = manager.acquire();
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s);
}
try {
assertEquals(docs.size(), s.search(new TermQuery(id), 10).totalHits);
} finally {
nrt.release(s);
manager.release(s);
}
}
@ -89,14 +92,15 @@ public class TestNRTManager extends ThreadedIndexingAndSearchingTestCase {
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": nrt: verify " + id);
}
final IndexSearcher s = nrt.get(gen, false);
final SearcherManager manager = nrt.waitForGeneration(gen, false);
final IndexSearcher s = manager.acquire();// nocommit get(gen, false);
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s);
}
try {
assertEquals(docs.size(), s.search(new TermQuery(id), 10).totalHits);
} finally {
nrt.release(s);
manager.release(s);
}
}
lastGens.set(gen);
@ -111,14 +115,15 @@ public class TestNRTManager extends ThreadedIndexingAndSearchingTestCase {
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": nrt: verify " + id);
}
final IndexSearcher s = nrt.get(gen, false);
final SearcherManager manager = nrt.waitForGeneration(gen, false);
final IndexSearcher s = manager.acquire();
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s);
}
try {
assertEquals(1, s.search(new TermQuery(id), 10).totalHits);
} finally {
nrt.release(s);
manager.release(s);
}
}
lastGens.set(gen);
@ -132,14 +137,15 @@ public class TestNRTManager extends ThreadedIndexingAndSearchingTestCase {
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": nrt: verify " + id);
}
final IndexSearcher s = nrt.get(gen, true);
final SearcherManager manager = nrt.waitForGeneration(gen, true);
final IndexSearcher s = manager.acquire();
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s);
}
try {
assertEquals(1, s.search(new TermQuery(id), 10).totalHits);
} finally {
nrt.release(s);
manager.release(s);
}
}
lastGens.set(gen);
@ -153,14 +159,15 @@ public class TestNRTManager extends ThreadedIndexingAndSearchingTestCase {
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": nrt: verify del " + id);
}
final IndexSearcher s = nrt.get(gen, true);
final SearcherManager manager = nrt.waitForGeneration(gen, true);
final IndexSearcher s = manager.acquire();
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s);
}
try {
assertEquals(0, s.search(new TermQuery(id), 10).totalHits);
} finally {
nrt.release(s);
manager.release(s);
}
}
lastGens.set(gen);
@ -168,7 +175,6 @@ public class TestNRTManager extends ThreadedIndexingAndSearchingTestCase {
private NRTManager nrt;
private NRTManagerReopenThread nrtThread;
@Override
protected void doAfterWriter(ExecutorService es) throws Exception {
final double minReopenSec = 0.01 + 0.05 * random.nextDouble();
@ -185,7 +191,8 @@ public class TestNRTManager extends ThreadedIndexingAndSearchingTestCase {
TestNRTManager.this.warmCalled = true;
s.search(new TermQuery(new Term("body", "united")), 10);
}
});
}, false);
nrtThread = new NRTManagerReopenThread(nrt, maxReopenSec, minReopenSec);
nrtThread.setName("NRT Reopen Thread");
nrtThread.setPriority(Math.min(Thread.currentThread().getPriority()+2, Thread.MAX_PRIORITY));
@ -214,12 +221,12 @@ public class TestNRTManager extends ThreadedIndexingAndSearchingTestCase {
@Override
protected IndexSearcher getCurrentSearcher() throws Exception {
return nrt.get(random.nextBoolean());
return nrt.getSearcherManager(false).acquire();
}
@Override
protected void releaseSearcher(IndexSearcher s) throws Exception {
nrt.release(s);
nrt.getSearcherManager(false).release(s);
}
@Override

View File

@ -20,6 +20,8 @@ package org.apache.lucene.search;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.lucene.analysis.MockAnalyzer;
@ -29,6 +31,7 @@ import org.apache.lucene.index.Term;
import org.apache.lucene.index.ThreadedIndexingAndSearchingTestCase;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.NamedThreadFactory;
import org.apache.lucene.util._TestUtil;
public class TestSearcherManager extends ThreadedIndexingAndSearchingTestCase {
@ -41,25 +44,35 @@ public class TestSearcherManager extends ThreadedIndexingAndSearchingTestCase {
@Override
protected IndexSearcher getFinalSearcher() throws Exception {
writer.commit();
mgr.maybeReopen();
if (!isNRT) {
writer.commit();
}
assertTrue(mgr.maybeReopen() || mgr.isSearcherCurrent());
return mgr.acquire();
}
private SearcherManager mgr;
private boolean isNRT;
@Override
protected void doAfterWriter(ExecutorService es) throws Exception {
// SearcherManager needs to see empty commit:
writer.commit();
mgr = new SearcherManager(dir,
new SearcherWarmer() {
@Override
public void warm(IndexSearcher s) throws IOException {
TestSearcherManager.this.warmCalled = true;
s.search(new TermQuery(new Term("body", "united")), 10);
}
}, es);
final SearcherWarmer warmer = new SearcherWarmer() {
@Override
public void warm(IndexSearcher s) throws IOException {
TestSearcherManager.this.warmCalled = true;
s.search(new TermQuery(new Term("body", "united")), 10);
}
};
if (random.nextBoolean()) {
mgr = SearcherManager.open(writer, true, warmer, es);
isNRT = true;
} else {
writer.commit();
mgr = SearcherManager.open(dir, warmer, es);
isNRT = false;
}
}
@Override
@ -126,19 +139,20 @@ public class TestSearcherManager extends ThreadedIndexingAndSearchingTestCase {
writer.commit();
final CountDownLatch awaitEnterWarm = new CountDownLatch(1);
final CountDownLatch awaitClose = new CountDownLatch(1);
final SearcherManager searcherManager = new SearcherManager(dir,
new SearcherWarmer() {
@Override
public void warm(IndexSearcher s) throws IOException {
try {
awaitEnterWarm.countDown();
awaitClose.await();
} catch (InterruptedException e) {
//
}
}
});
final ExecutorService es = random.nextBoolean() ? null : Executors.newCachedThreadPool(new NamedThreadFactory("testIntermediateClose"));
final SearcherWarmer warmer = new SearcherWarmer() {
@Override
public void warm(IndexSearcher s) throws IOException {
try {
awaitEnterWarm.countDown();
awaitClose.await();
} catch (InterruptedException e) {
//
}
}
};
final SearcherManager searcherManager = random.nextBoolean() ? SearcherManager.open(dir,
warmer, es) : SearcherManager.open(writer, random.nextBoolean(), warmer, es);
IndexSearcher searcher = searcherManager.acquire();
try {
assertEquals(1, searcher.getIndexReader().numDocs());
@ -185,6 +199,9 @@ public class TestSearcherManager extends ThreadedIndexingAndSearchingTestCase {
assertNull("" + exc[0], exc[0]);
writer.close();
dir.close();
if (es != null) {
es.shutdown();
es.awaitTermination(1, TimeUnit.SECONDS);
}
}
}