LUCENE-3769: simplify NRTManager: applyDeletes is only passed to ctor now

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1243656 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael McCandless 2012-02-13 19:08:28 +00:00
parent 087f1e3126
commit 7141c6c72d
4 changed files with 288 additions and 251 deletions

View File

@ -809,6 +809,11 @@ API Changes
supported in Lucene 4.0, just use a TokenStream.
(Mike McCandless, Robert Muir)
* LUCENE-3769: Simplified NRTManager by requiring applyDeletes to be
passed to ctor only; if an app needs to mix and match it's free to
create two NRTManagers (one always applying deletes and the other
never applying deletes). (MJB, Shai Erera, Mike McCandless)
New Features
* LUCENE-3593: Added a FieldValueFilter that accepts all documents that either

View File

@ -33,21 +33,38 @@ import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.IndexSearcher; // javadocs
import org.apache.lucene.search.SearcherFactory;
import org.apache.lucene.search.SearcherFactory; // javadocs
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.ThreadInterruptedException;
/**
* Utility class to manage sharing near-real-time searchers
* across multiple searching threads.
* across multiple searching thread. The difference vs
* SearcherManager is that this class enables individual
* requests to wait until specific indexing changes are
* visible.
*
* <p>NOTE: to use this class, you must call {@link #maybeReopen(boolean)}
* <p>You must create an IndexWriter, then create a {@link
* NRTManager.TrackingIndexWriter} from it, and pass that to the
* NRTManager. You may want to create two NRTManagers, once
* that always applies deletes on refresh and one that does
* not. In this case you should use a single {@link
* NRTManager.TrackingIndexWriter} instance for both.
*
* <p>Then, use {@link #getSearcherManager} to obtain the
* {@link SearcherManager} that you then use to
* acquire/release searchers. Don't call maybeReopen on
* that SearcherManager! Only call NRTManager's {@link
* #maybeReopen}.
*
* <p>NOTE: to use this class, you must call {@link #maybeReopen()}
* periodically. The {@link NRTManagerReopenThread} is a
* simple class to do this on a periodic basis. If you
* implement your own reopener, be sure to call {@link
* simple class to do this on a periodic basis, and reopens
* more quickly if a request is waiting. If you implement
* your own reopener, be sure to call {@link
* #addWaitingListener} so your reopener is notified when a
* caller is waiting for a specific generation searcher. </p>
* caller is waiting for a specific generation
* searcher. </p>
*
* @see SearcherFactory
*
@ -56,50 +73,57 @@ import org.apache.lucene.util.ThreadInterruptedException;
public class NRTManager implements Closeable {
private static final long MAX_SEARCHER_GEN = Long.MAX_VALUE;
private final IndexWriter writer;
private final SearcherManagerRef withoutDeletes;
private final SearcherManagerRef withDeletes;
private final AtomicLong indexingGen;
private final TrackingIndexWriter writer;
private final List<WaitingListener> waitingListeners = new CopyOnWriteArrayList<WaitingListener>();
private final ReentrantLock reopenLock = new ReentrantLock();
private final Condition newGeneration = reopenLock.newCondition();
private final SearcherManager mgr;
private volatile long searchingGen;
/**
* Create new NRTManager.
*
* @param writer IndexWriter to open near-real-time
* @param writer TrackingIndexWriter to open near-real-time
* readers
* @param searcherFactory An optional {@link SearcherFactory}. Pass
* <code>null</code> if you don't require the searcher to be warmed
* before going live or other custom behavior.
*/
public NRTManager(IndexWriter writer, SearcherFactory searcherFactory) throws IOException {
public NRTManager(TrackingIndexWriter writer, SearcherFactory searcherFactory) throws IOException {
this(writer, searcherFactory, true);
}
/**
* Expert: just like {@link
* #NRTManager(IndexWriter,SearcherFactory)},
* but you can also specify whether every searcher must
* #NRTManager(TrackingIndexWriter,SearcherFactory)},
* but you can also specify whether each reopened searcher must
* apply deletes. This is useful for cases where certain
* uses can tolerate seeing some deleted docs, since
* reopen time is faster if deletes need not be applied. */
public NRTManager(IndexWriter writer, SearcherFactory searcherFactory, boolean alwaysApplyDeletes) throws IOException {
public NRTManager(TrackingIndexWriter writer, SearcherFactory searcherFactory, boolean applyDeletes) throws IOException {
this.writer = writer;
if (alwaysApplyDeletes) {
withoutDeletes = withDeletes = new SearcherManagerRef(true, 0, new SearcherManager(writer, true, searcherFactory));
} else {
withDeletes = new SearcherManagerRef(true, 0, new SearcherManager(writer, true, searcherFactory));
withoutDeletes = new SearcherManagerRef(false, 0, new SearcherManager(writer, false, searcherFactory));
}
indexingGen = new AtomicLong(1);
mgr = new SearcherManager(writer.getIndexWriter(), applyDeletes, searcherFactory);
}
/**
* Returns the {@link SearcherManager} you should use to
* acquire/release searchers.
*
* <p><b>NOTE</b>: Never call maybeReopen on the returned
* SearcherManager; only call this NRTManager's {@link
* #maybeReopen}. Otherwise threads waiting for a
* generation may never return.
*/
public SearcherManager getSearcherManager() {
return mgr;
}
/** NRTManager invokes this interface to notify it when a
* caller is waiting for a specific generation searcher
* to be visible. */
public static interface WaitingListener {
public void waiting(boolean requiresDeletes, long targetGen);
public void waiting(long targetGen);
}
/** Adds a listener, to be notified when a caller is
@ -115,161 +139,181 @@ public class NRTManager implements Closeable {
waitingListeners.remove(l);
}
public long updateDocument(Term t, Iterable<? extends IndexableField> d, Analyzer a) throws IOException {
writer.updateDocument(t, d, a);
// Return gen as of when indexing finished:
return indexingGen.get();
}
/** Class that tracks changes to a delegated
* IndexWriter. Create this class (passing your
* IndexWriter), and then pass this class to NRTManager.
* Be sure to make all changes via the
* TrackingIndexWriter, otherwise NRTManager won't know
* about the changes.
*
* @lucene.experimental */
public static class TrackingIndexWriter {
private final IndexWriter writer;
private final AtomicLong indexingGen = new AtomicLong(1);
public long updateDocument(Term t, Iterable<? extends IndexableField> d) throws IOException {
writer.updateDocument(t, d);
// Return gen as of when indexing finished:
return indexingGen.get();
}
public TrackingIndexWriter(IndexWriter writer) {
this.writer = writer;
}
public long updateDocuments(Term t, Iterable<? extends Iterable<? extends IndexableField>> docs, Analyzer a) throws IOException {
writer.updateDocuments(t, docs, a);
// Return gen as of when indexing finished:
return indexingGen.get();
}
public long updateDocument(Term t, Iterable<? extends IndexableField> d, Analyzer a) throws IOException {
writer.updateDocument(t, d, a);
// Return gen as of when indexing finished:
return indexingGen.get();
}
public long updateDocuments(Term t, Iterable<? extends Iterable<? extends IndexableField>> docs) throws IOException {
writer.updateDocuments(t, docs);
// Return gen as of when indexing finished:
return indexingGen.get();
}
public long updateDocument(Term t, Iterable<? extends IndexableField> d) throws IOException {
writer.updateDocument(t, d);
// Return gen as of when indexing finished:
return indexingGen.get();
}
public long deleteDocuments(Term t) throws IOException {
writer.deleteDocuments(t);
// Return gen as of when indexing finished:
return indexingGen.get();
}
public long updateDocuments(Term t, Iterable<? extends Iterable<? extends IndexableField>> docs, Analyzer a) throws IOException {
writer.updateDocuments(t, docs, a);
// Return gen as of when indexing finished:
return indexingGen.get();
}
public long deleteDocuments(Term... terms) throws IOException {
writer.deleteDocuments(terms);
// Return gen as of when indexing finished:
return indexingGen.get();
}
public long updateDocuments(Term t, Iterable<? extends Iterable<? extends IndexableField>> docs) throws IOException {
writer.updateDocuments(t, docs);
// Return gen as of when indexing finished:
return indexingGen.get();
}
public long deleteDocuments(Query q) throws IOException {
writer.deleteDocuments(q);
// Return gen as of when indexing finished:
return indexingGen.get();
}
public long deleteDocuments(Term t) throws IOException {
writer.deleteDocuments(t);
// Return gen as of when indexing finished:
return indexingGen.get();
}
public long deleteDocuments(Query... queries) throws IOException {
writer.deleteDocuments(queries);
// Return gen as of when indexing finished:
return indexingGen.get();
}
public long deleteDocuments(Term... terms) throws IOException {
writer.deleteDocuments(terms);
// Return gen as of when indexing finished:
return indexingGen.get();
}
public long deleteAll() throws IOException {
writer.deleteAll();
// Return gen as of when indexing finished:
return indexingGen.get();
}
public long deleteDocuments(Query q) throws IOException {
writer.deleteDocuments(q);
// Return gen as of when indexing finished:
return indexingGen.get();
}
public long addDocument(Iterable<? extends IndexableField> d, Analyzer a) throws IOException {
writer.addDocument(d, a);
// Return gen as of when indexing finished:
return indexingGen.get();
}
public long deleteDocuments(Query... queries) throws IOException {
writer.deleteDocuments(queries);
// Return gen as of when indexing finished:
return indexingGen.get();
}
public long addDocuments(Iterable<? extends Iterable<? extends IndexableField>> docs, Analyzer a) throws IOException {
writer.addDocuments(docs, a);
// Return gen as of when indexing finished:
return indexingGen.get();
}
public long deleteAll() throws IOException {
writer.deleteAll();
// Return gen as of when indexing finished:
return indexingGen.get();
}
public long addDocument(Iterable<? extends IndexableField> d) throws IOException {
writer.addDocument(d);
// Return gen as of when indexing finished:
return indexingGen.get();
}
public long addDocument(Iterable<? extends IndexableField> d, Analyzer a) throws IOException {
writer.addDocument(d, a);
// Return gen as of when indexing finished:
return indexingGen.get();
}
public long addDocuments(Iterable<? extends Iterable<? extends IndexableField>> docs) throws IOException {
writer.addDocuments(docs);
// Return gen as of when indexing finished:
return indexingGen.get();
}
public long addDocuments(Iterable<? extends Iterable<? extends IndexableField>> docs, Analyzer a) throws IOException {
writer.addDocuments(docs, a);
// Return gen as of when indexing finished:
return indexingGen.get();
}
public long addIndexes(Directory... dirs) throws CorruptIndexException, IOException {
writer.addIndexes(dirs);
// Return gen as of when indexing finished:
return indexingGen.get();
}
public long addDocument(Iterable<? extends IndexableField> d) throws IOException {
writer.addDocument(d);
// Return gen as of when indexing finished:
return indexingGen.get();
}
public long addIndexes(IndexReader... readers) throws CorruptIndexException, IOException {
writer.addIndexes(readers);
// Return gen as of when indexing finished:
return indexingGen.get();
public long addDocuments(Iterable<? extends Iterable<? extends IndexableField>> docs) throws IOException {
writer.addDocuments(docs);
// Return gen as of when indexing finished:
return indexingGen.get();
}
public long addIndexes(Directory... dirs) throws CorruptIndexException, IOException {
writer.addIndexes(dirs);
// Return gen as of when indexing finished:
return indexingGen.get();
}
public long addIndexes(IndexReader... readers) throws CorruptIndexException, IOException {
writer.addIndexes(readers);
// Return gen as of when indexing finished:
return indexingGen.get();
}
public long getGeneration() {
return indexingGen.get();
}
public IndexWriter getIndexWriter() {
return writer;
}
long getAndIncrementGeneration() {
return indexingGen.getAndIncrement();
}
}
/**
* Waits for a given {@link SearcherManager} target generation to be available
* via {@link #getSearcherManager(boolean)}. If the current generation is less
* than the given target generation this method will block until the
* correspondent {@link SearcherManager} is reopened by another thread via
* {@link #maybeReopen(boolean)} or until the {@link NRTManager} is closed.
* Waits for the target generation to become visible in
* the searcher.
* If the current searcher is older than the
* target generation, this method will block
* until the searcher is reopened, by another via
* {@link #maybeReopen} or until the {@link NRTManager} is closed.
*
* @param targetGen the generation to wait for
* @param requireDeletes <code>true</code> iff the generation requires deletes to be applied otherwise <code>false</code>
* @return the {@link SearcherManager} with the given target generation
*/
public SearcherManager waitForGeneration(long targetGen, boolean requireDeletes) {
return waitForGeneration(targetGen, requireDeletes, -1, TimeUnit.NANOSECONDS);
public void waitForGeneration(long targetGen) {
waitForGeneration(targetGen, -1, TimeUnit.NANOSECONDS);
}
/**
* Waits for a given {@link SearcherManager} target generation to be available
* via {@link #getSearcherManager(boolean)}. If the current generation is less
* than the given target generation this method will block until the
* correspondent {@link SearcherManager} is reopened by another thread via
* {@link #maybeReopen(boolean)}, the given waiting time has elapsed, or until
* the {@link NRTManager} is closed.
* Waits for the target generation to become visible in
* the searcher. If the current searcher is older than
* the target generation, this method will block until the
* searcher has been reopened by another thread via
* {@link #maybeReopen}, the given waiting time has elapsed, or until
* the NRTManager is closed.
* <p>
* NOTE: if the waiting time elapses before the requested target generation is
* available the latest {@link SearcherManager} is returned instead.
* available the current {@link SearcherManager} is returned instead.
*
* @param targetGen
* the generation to wait for
* @param requireDeletes
* <code>true</code> iff the generation requires deletes to be
* applied otherwise <code>false</code>
* @param time
* the time to wait for the target generation
* @param unit
* the waiting time's time unit
* @return the {@link SearcherManager} with the given target generation or the
* latest {@link SearcherManager} if the waiting time elapsed before
* the requested generation is available.
*/
public SearcherManager waitForGeneration(long targetGen, boolean requireDeletes, long time, TimeUnit unit) {
public void waitForGeneration(long targetGen, long time, TimeUnit unit) {
try {
final long curGen = indexingGen.get();
final long curGen = writer.getGeneration();
if (targetGen > curGen) {
throw new IllegalArgumentException("targetGen=" + targetGen + " was never returned by this NRTManager instance (current gen=" + curGen + ")");
}
reopenLock.lockInterruptibly();
try {
if (targetGen > getCurrentSearchingGen(requireDeletes)) {
if (targetGen > searchingGen) {
for (WaitingListener listener : waitingListeners) {
listener.waiting(requireDeletes, targetGen);
listener.waiting(targetGen);
}
while (targetGen > getCurrentSearchingGen(requireDeletes)) {
while (targetGen > searchingGen) {
if (!waitOnGenCondition(time, unit)) {
return getSearcherManager(requireDeletes);
return;
}
}
}
} finally {
reopenLock.unlock();
}
} catch (InterruptedException ie) {
throw new ThreadInterruptedException(ie);
}
return getSearcherManager(requireDeletes);
}
private boolean waitOnGenCondition(long time, TimeUnit unit)
@ -284,38 +328,33 @@ public class NRTManager implements Closeable {
}
/** Returns generation of current searcher. */
public long getCurrentSearchingGen(boolean applyAllDeletes) {
if (applyAllDeletes) {
return withDeletes.generation;
} else {
return Math.max(withoutDeletes.generation, withDeletes.generation);
}
public long getCurrentSearchingGen() {
return searchingGen;
}
public boolean maybeReopen(boolean applyAllDeletes) throws IOException {
public void maybeReopen() throws IOException {
if (reopenLock.tryLock()) {
try {
final SearcherManagerRef reference = applyAllDeletes ? withDeletes : withoutDeletes;
// Mark gen as of when reopen started:
final long newSearcherGen = indexingGen.getAndIncrement();
boolean setSearchGen = false;
if (reference.generation == MAX_SEARCHER_GEN) {
final long newSearcherGen = writer.getAndIncrementGeneration();
if (searchingGen == MAX_SEARCHER_GEN) {
newGeneration.signalAll(); // wake up threads if we have a new generation
return false;
return;
}
if (!(setSearchGen = reference.manager.isSearcherCurrent())) {
setSearchGen = reference.manager.maybeReopen();
boolean setSearchGen;
if (!mgr.isSearcherCurrent()) {
setSearchGen = mgr.maybeReopen();
} else {
setSearchGen = true;
}
if (setSearchGen) {
reference.generation = newSearcherGen;// update searcher gen
searchingGen = newSearcherGen;// update searcher gen
newGeneration.signalAll(); // wake up threads if we have a new generation
}
return setSearchGen;
} finally {
reopenLock.unlock();
}
}
return false;
}
/**
@ -330,49 +369,14 @@ public class NRTManager implements Closeable {
reopenLock.lock();
try {
try {
IOUtils.close(withDeletes, withoutDeletes);
// max it out to make sure nobody can wait on another gen
searchingGen = MAX_SEARCHER_GEN;
mgr.close();
} finally { // make sure we signal even if close throws an exception
newGeneration.signalAll();
}
} finally {
reopenLock.unlock();
assert withDeletes.generation == MAX_SEARCHER_GEN && withoutDeletes.generation == MAX_SEARCHER_GEN;
}
}
/**
* Returns a {@link SearcherManager}. If <code>applyAllDeletes</code> is
* <code>true</code> the returned manager is guaranteed to have all deletes
* applied on the last reopen. Otherwise the latest manager with or without deletes
* is returned.
*/
public SearcherManager getSearcherManager(boolean applyAllDeletes) {
if (applyAllDeletes) {
return withDeletes.manager;
} else {
if (withDeletes.generation > withoutDeletes.generation) {
return withDeletes.manager;
} else {
return withoutDeletes.manager;
}
}
}
static final class SearcherManagerRef implements Closeable {
final boolean applyDeletes;
volatile long generation;
final SearcherManager manager;
SearcherManagerRef(boolean applyDeletes, long generation, SearcherManager manager) {
super();
this.applyDeletes = applyDeletes;
this.generation = generation;
this.manager = manager;
}
public void close() throws IOException {
generation = MAX_SEARCHER_GEN; // max it out to make sure nobody can wait on another gen
manager.close();
}
}
}

View File

@ -88,7 +88,6 @@ public class NRTManagerReopenThread extends Thread implements NRTManager.Waiting
private final long targetMinStaleNS;
private boolean finish;
private long waitingGen;
private boolean waitingNeedsDeletes;
/**
* Create NRTManagerReopenThread, to periodically reopen the NRT searcher.
@ -126,11 +125,10 @@ public class NRTManagerReopenThread extends Thread implements NRTManager.Waiting
}
}
public synchronized void waiting(boolean needsDeletes, long targetGen) {
waitingNeedsDeletes |= needsDeletes;
public synchronized void waiting(long targetGen) {
waitingGen = Math.max(waitingGen, targetGen);
notify();
//System.out.println(Thread.currentThread().getName() + ": force wakeup waitingGen=" + waitingGen + " applyDeletes=" + applyDeletes + " waitingNeedsDeletes=" + waitingNeedsDeletes);
//System.out.println(Thread.currentThread().getName() + ": force wakeup waitingGen=" + waitingGen + " applyDeletes=" + applyDeletes);
}
@Override
@ -153,7 +151,7 @@ public class NRTManagerReopenThread extends Thread implements NRTManager.Waiting
//System.out.println("reopen: cycle");
// True if we have someone waiting for reopen'd searcher:
hasWaiting = waitingGen > manager.getCurrentSearchingGen(waitingNeedsDeletes);
hasWaiting = waitingGen > manager.getCurrentSearchingGen();
final long nextReopenStartNS = lastReopenStartNS + (hasWaiting ? targetMinStaleNS : targetMaxStaleNS);
final long sleepNS = nextReopenStartNS - System.nanoTime();
@ -183,7 +181,7 @@ public class NRTManagerReopenThread extends Thread implements NRTManager.Waiting
lastReopenStartNS = System.nanoTime();
try {
//final long t0 = System.nanoTime();
manager.maybeReopen(waitingNeedsDeletes);
manager.maybeReopen();
//System.out.println("reopen took " + ((System.nanoTime()-t0)/1000000.0) + " msec");
} catch (IOException ioe) {
//System.out.println(Thread.currentThread().getName() + ": IOE");

View File

@ -34,8 +34,6 @@ import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.ThreadedIndexingAndSearchingTestCase;
import org.apache.lucene.search.NRTManagerReopenThread;
import org.apache.lucene.search.SearcherFactory;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.store.NRTCachingDirectory;
@ -58,8 +56,8 @@ public class TestNRTManager extends ThreadedIndexingAndSearchingTestCase {
if (VERBOSE) {
System.out.println("TEST: finalSearcher maxGen=" + maxGen);
}
final SearcherManager manager = nrt.waitForGeneration(maxGen, true);
return manager.acquire();
nrtDeletes.waitForGeneration(maxGen);
return nrtDeletes.getSearcherManager().acquire();
}
@Override
@ -78,22 +76,22 @@ public class TestNRTManager extends ThreadedIndexingAndSearchingTestCase {
@Override
protected void updateDocuments(Term id, List<? extends Iterable<? extends IndexableField>> docs) throws Exception {
final long gen = nrt.updateDocuments(id, docs);
final long gen = genWriter.updateDocuments(id, docs);
// Randomly verify the update "took":
if (random.nextInt(20) == 2) {
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": nrt: verify " + id);
}
SearcherManager manager = nrt.waitForGeneration(gen, true);
final IndexSearcher s = manager.acquire();
nrtDeletes.waitForGeneration(gen);
final IndexSearcher s = nrtDeletes.getSearcherManager().acquire();
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s);
}
try {
assertEquals(docs.size(), s.search(new TermQuery(id), 10).totalHits);
} finally {
manager.release(s);
nrtDeletes.getSearcherManager().release(s);
}
}
@ -102,21 +100,21 @@ public class TestNRTManager extends ThreadedIndexingAndSearchingTestCase {
@Override
protected void addDocuments(Term id, List<? extends Iterable<? extends IndexableField>> docs) throws Exception {
final long gen = nrt.addDocuments(docs);
final long gen = genWriter.addDocuments(docs);
// Randomly verify the add "took":
if (random.nextInt(20) == 2) {
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": nrt: verify " + id);
}
final SearcherManager manager = nrt.waitForGeneration(gen, false);
final IndexSearcher s = manager.acquire();
nrtNoDeletes.waitForGeneration(gen);
final IndexSearcher s = nrtNoDeletes.getSearcherManager().acquire();
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s);
}
try {
assertEquals(docs.size(), s.search(new TermQuery(id), 10).totalHits);
} finally {
manager.release(s);
nrtNoDeletes.getSearcherManager().release(s);
}
}
lastGens.set(gen);
@ -124,22 +122,22 @@ public class TestNRTManager extends ThreadedIndexingAndSearchingTestCase {
@Override
protected void addDocument(Term id, Iterable<? extends IndexableField> doc) throws Exception {
final long gen = nrt.addDocument(doc);
final long gen = genWriter.addDocument(doc);
// Randomly verify the add "took":
if (random.nextInt(20) == 2) {
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": nrt: verify " + id);
}
final SearcherManager manager = nrt.waitForGeneration(gen, false);
final IndexSearcher s = manager.acquire();
nrtNoDeletes.waitForGeneration(gen);
final IndexSearcher s = nrtNoDeletes.getSearcherManager().acquire();
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s);
}
try {
assertEquals(1, s.search(new TermQuery(id), 10).totalHits);
} finally {
manager.release(s);
nrtNoDeletes.getSearcherManager().release(s);
}
}
lastGens.set(gen);
@ -147,21 +145,21 @@ public class TestNRTManager extends ThreadedIndexingAndSearchingTestCase {
@Override
protected void updateDocument(Term id, Iterable<? extends IndexableField> doc) throws Exception {
final long gen = nrt.updateDocument(id, doc);
final long gen = genWriter.updateDocument(id, doc);
// Randomly verify the udpate "took":
if (random.nextInt(20) == 2) {
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": nrt: verify " + id);
}
final SearcherManager manager = nrt.waitForGeneration(gen, true);
final IndexSearcher s = manager.acquire();
nrtDeletes.waitForGeneration(gen);
final IndexSearcher s = nrtDeletes.getSearcherManager().acquire();
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s);
}
try {
assertEquals(1, s.search(new TermQuery(id), 10).totalHits);
} finally {
manager.release(s);
nrtDeletes.getSearcherManager().release(s);
}
}
lastGens.set(gen);
@ -169,28 +167,37 @@ public class TestNRTManager extends ThreadedIndexingAndSearchingTestCase {
@Override
protected void deleteDocuments(Term id) throws Exception {
final long gen = nrt.deleteDocuments(id);
final long gen = genWriter.deleteDocuments(id);
// randomly verify the delete "took":
if (random.nextInt(20) == 7) {
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": nrt: verify del " + id);
}
final SearcherManager manager = nrt.waitForGeneration(gen, true);
final IndexSearcher s = manager.acquire();
nrtDeletes.waitForGeneration(gen);
final IndexSearcher s = nrtDeletes.getSearcherManager().acquire();
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s);
}
try {
assertEquals(0, s.search(new TermQuery(id), 10).totalHits);
} finally {
manager.release(s);
nrtDeletes.getSearcherManager().release(s);
}
}
lastGens.set(gen);
}
private NRTManager nrt;
private NRTManagerReopenThread nrtThread;
// Not guaranteed to reflect deletes:
private NRTManager nrtNoDeletes;
// Is guaranteed to reflect deletes:
private NRTManager nrtDeletes;
private NRTManager.TrackingIndexWriter genWriter;
private NRTManagerReopenThread nrtDeletesThread;
private NRTManagerReopenThread nrtNoDeletesThread;
@Override
protected void doAfterWriter(final ExecutorService es) throws Exception {
final double minReopenSec = 0.01 + 0.05 * random.nextDouble();
@ -200,22 +207,32 @@ public class TestNRTManager extends ThreadedIndexingAndSearchingTestCase {
System.out.println("TEST: make NRTManager maxReopenSec=" + maxReopenSec + " minReopenSec=" + minReopenSec);
}
nrt = new NRTManager(writer,
new SearcherFactory() {
@Override
public IndexSearcher newSearcher(IndexReader r) throws IOException {
TestNRTManager.this.warmCalled = true;
IndexSearcher s = new IndexSearcher(r, es);
s.search(new TermQuery(new Term("body", "united")), 10);
return s;
}
}, false);
genWriter = new NRTManager.TrackingIndexWriter(writer);
final SearcherFactory sf = new SearcherFactory() {
@Override
public IndexSearcher newSearcher(IndexReader r) throws IOException {
TestNRTManager.this.warmCalled = true;
IndexSearcher s = new IndexSearcher(r, es);
s.search(new TermQuery(new Term("body", "united")), 10);
return s;
}
};
nrtNoDeletes = new NRTManager(genWriter, sf, false);
nrtDeletes = new NRTManager(genWriter, sf, true);
nrtThread = new NRTManagerReopenThread(nrt, maxReopenSec, minReopenSec);
nrtThread.setName("NRT Reopen Thread");
nrtThread.setPriority(Math.min(Thread.currentThread().getPriority()+2, Thread.MAX_PRIORITY));
nrtThread.setDaemon(true);
nrtThread.start();
nrtDeletesThread = new NRTManagerReopenThread(nrtDeletes, maxReopenSec, minReopenSec);
nrtDeletesThread.setName("NRTDeletes Reopen Thread");
nrtDeletesThread.setPriority(Math.min(Thread.currentThread().getPriority()+2, Thread.MAX_PRIORITY));
nrtDeletesThread.setDaemon(true);
nrtDeletesThread.start();
nrtNoDeletesThread = new NRTManagerReopenThread(nrtNoDeletes, maxReopenSec, minReopenSec);
nrtNoDeletesThread.setName("NRTNoDeletes Reopen Thread");
nrtNoDeletesThread.setPriority(Math.min(Thread.currentThread().getPriority()+2, Thread.MAX_PRIORITY));
nrtNoDeletesThread.setDaemon(true);
nrtNoDeletesThread.start();
}
@Override
@ -241,14 +258,23 @@ public class TestNRTManager extends ThreadedIndexingAndSearchingTestCase {
protected IndexSearcher getCurrentSearcher() throws Exception {
// Test doesn't assert deletions until the end, so we
// can randomize whether dels must be applied
return nrt.getSearcherManager(random.nextBoolean()).acquire();
final NRTManager nrt;
if (random.nextBoolean()) {
nrt = nrtDeletes;
} else {
nrt = nrtNoDeletes;
}
return nrt.getSearcherManager().acquire();
}
@Override
protected void releaseSearcher(IndexSearcher s) throws Exception {
// Test doesn't assert deletions until the end, so we
// can randomize whether dels must be applied
nrt.getSearcherManager(random.nextBoolean()).release(s);
// NOTE: a bit iffy... technically you should release
// against the same NRT mgr you acquired from... but
// both impls just decRef the underlying reader so we
// can get away w/ cheating:
nrtNoDeletes.getSearcherManager().release(s);
}
@Override
@ -257,8 +283,10 @@ public class TestNRTManager extends ThreadedIndexingAndSearchingTestCase {
if (VERBOSE) {
System.out.println("TEST: now close NRTManager");
}
nrtThread.close();
nrt.close();
nrtDeletesThread.close();
nrtDeletes.close();
nrtNoDeletesThread.close();
nrtNoDeletes.close();
}
/*
@ -270,20 +298,21 @@ public class TestNRTManager extends ThreadedIndexingAndSearchingTestCase {
final CountDownLatch latch = new CountDownLatch(1);
final CountDownLatch signal = new CountDownLatch(1);
LatchedIndexWriter writer = new LatchedIndexWriter(d, conf, latch, signal);
LatchedIndexWriter _writer = new LatchedIndexWriter(d, conf, latch, signal);
final NRTManager.TrackingIndexWriter writer = new NRTManager.TrackingIndexWriter(_writer);
final NRTManager manager = new NRTManager(writer, null, false);
Document doc = new Document();
doc.add(newField("test","test", TextField.TYPE_STORED));
long gen = manager.addDocument(doc);
assertTrue(manager.maybeReopen(false));
assertFalse(gen < manager.getCurrentSearchingGen(false));
long gen = writer.addDocument(doc);
manager.maybeReopen();
assertFalse(gen < manager.getCurrentSearchingGen());
Thread t = new Thread() {
public void run() {
try {
signal.await();
assertTrue(manager.maybeReopen(false));
manager.deleteDocuments(new TermQuery(new Term("foo", "barista")));
manager.maybeReopen(false); // kick off another reopen so we inc. the internal gen
manager.maybeReopen();
writer.deleteDocuments(new TermQuery(new Term("foo", "barista")));
manager.maybeReopen(); // kick off another reopen so we inc. the internal gen
} catch (Exception e) {
e.printStackTrace();
} finally {
@ -292,15 +321,16 @@ public class TestNRTManager extends ThreadedIndexingAndSearchingTestCase {
}
};
t.start();
writer.waitAfterUpdate = true; // wait in addDocument to let some reopens go through
final long lastGen = manager.updateDocument(new Term("foo", "bar"), doc); // once this returns the doc is already reflected in the last reopen
assertFalse(manager.getSearcherManager(false).isSearcherCurrent()); // false since there is a delete in the queue
_writer.waitAfterUpdate = true; // wait in addDocument to let some reopens go through
final long lastGen = writer.updateDocument(new Term("foo", "bar"), doc); // once this returns the doc is already reflected in the last reopen
assertFalse(manager.getSearcherManager().isSearcherCurrent()); // false since there is a delete in the queue
IndexSearcher acquire = manager.getSearcherManager(false).acquire();
IndexSearcher acquire = manager.getSearcherManager().acquire();
try {
assertEquals(2, acquire.getIndexReader().numDocs());
} finally {
acquire.getIndexReader().decRef();
manager.getSearcherManager().release(acquire);
}
NRTManagerReopenThread thread = new NRTManagerReopenThread(manager, 0.01, 0.01);
thread.start(); // start reopening
@ -311,12 +341,12 @@ public class TestNRTManager extends ThreadedIndexingAndSearchingTestCase {
final AtomicBoolean finished = new AtomicBoolean(false);
Thread waiter = new Thread() {
public void run() {
manager.waitForGeneration(lastGen, false);
manager.waitForGeneration(lastGen);
finished.set(true);
}
};
waiter.start();
manager.maybeReopen(false);
manager.maybeReopen();
waiter.join(1000);
if (!finished.get()) {
waiter.interrupt();
@ -324,7 +354,7 @@ public class TestNRTManager extends ThreadedIndexingAndSearchingTestCase {
}
thread.close();
thread.join();
IOUtils.close(manager, writer, d);
IOUtils.close(manager, _writer, d);
}
public static class LatchedIndexWriter extends IndexWriter {