LUCENE-4967: move NRTManager entirely into a reopen thread so it can interact with any ReferenceManager

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1478438 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael McCandless 2013-05-02 16:44:36 +00:00
parent 618608fa64
commit 7b90561178
9 changed files with 507 additions and 656 deletions

View File

@ -47,6 +47,11 @@ Changes in backwards compatibility policy
(a, ab, b, bc, c) instead of (a, b, c, ab, bc) and doesn't trim trailing
whitespaces. (Adrien Grand)
* LUCENE-4967: NRTManager is replaced by
ControlledRealTimeReopenThread, for controlling which requests must
see which indexing changes, so that it can work with any
ReferenceManager (Mike McCandless)
Bug Fixes
* LUCENE-4935: CustomScoreQuery wrongly applied its query boost twice

View File

@ -0,0 +1,204 @@
package org.apache.lucene.index;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.search.ControlledRealTimeReopenThread; // javadocs
import org.apache.lucene.search.Query;
import org.apache.lucene.store.Directory;
/** Class that tracks changes to a delegated
* IndexWriter, used by {@link
* ControlledRealTimeReopenThread} to ensure specific
* changes are visible. Create this class (passing your
* IndexWriter), and then pass this class to {@link
* ControlledRealTimeReopenThread}.
* Be sure to make all changes via the
* TrackingIndexWriter, otherwise {@link
* ControlledRealTimeReopenThread} won't know about the changes.
*
* @lucene.experimental */
public class TrackingIndexWriter {
private final IndexWriter writer;
private final AtomicLong indexingGen = new AtomicLong(1);
/** Create a {@code TrackingIndexWriter} wrapping the
* provided {@link IndexWriter}. */
public TrackingIndexWriter(IndexWriter writer) {
this.writer = writer;
}
/** Calls {@link
* IndexWriter#updateDocument(Term,IndexDocument,Analyzer)}
* and returns the generation that reflects this change. */
public long updateDocument(Term t, IndexDocument d, Analyzer a) throws IOException {
writer.updateDocument(t, d, a);
// Return gen as of when indexing finished:
return indexingGen.get();
}
/** Calls {@link
* IndexWriter#updateDocument(Term,IndexDocument)} and
* returns the generation that reflects this change. */
public long updateDocument(Term t, IndexDocument d) throws IOException {
writer.updateDocument(t, d);
// Return gen as of when indexing finished:
return indexingGen.get();
}
/** Calls {@link
* IndexWriter#updateDocuments(Term,Iterable,Analyzer)}
* and returns the generation that reflects this change. */
public long updateDocuments(Term t, Iterable<? extends IndexDocument> docs, Analyzer a) throws IOException {
writer.updateDocuments(t, docs, a);
// Return gen as of when indexing finished:
return indexingGen.get();
}
/** Calls {@link
* IndexWriter#updateDocuments(Term,Iterable)} and returns
* the generation that reflects this change. */
public long updateDocuments(Term t, Iterable<? extends IndexDocument> docs) throws IOException {
writer.updateDocuments(t, docs);
// Return gen as of when indexing finished:
return indexingGen.get();
}
/** Calls {@link IndexWriter#deleteDocuments(Term)} and
* returns the generation that reflects this change. */
public long deleteDocuments(Term t) throws IOException {
writer.deleteDocuments(t);
// Return gen as of when indexing finished:
return indexingGen.get();
}
/** Calls {@link IndexWriter#deleteDocuments(Term...)} and
* returns the generation that reflects this change. */
public long deleteDocuments(Term... terms) throws IOException {
writer.deleteDocuments(terms);
// Return gen as of when indexing finished:
return indexingGen.get();
}
/** Calls {@link IndexWriter#deleteDocuments(Query)} and
* returns the generation that reflects this change. */
public long deleteDocuments(Query q) throws IOException {
writer.deleteDocuments(q);
// Return gen as of when indexing finished:
return indexingGen.get();
}
/** Calls {@link IndexWriter#deleteDocuments(Query...)}
* and returns the generation that reflects this change. */
public long deleteDocuments(Query... queries) throws IOException {
writer.deleteDocuments(queries);
// Return gen as of when indexing finished:
return indexingGen.get();
}
/** Calls {@link IndexWriter#deleteAll} and returns the
* generation that reflects this change. */
public long deleteAll() throws IOException {
writer.deleteAll();
// Return gen as of when indexing finished:
return indexingGen.get();
}
/** Calls {@link
* IndexWriter#addDocument(IndexDocument,Analyzer)} and
* returns the generation that reflects this change. */
public long addDocument(IndexDocument d, Analyzer a) throws IOException {
writer.addDocument(d, a);
// Return gen as of when indexing finished:
return indexingGen.get();
}
/** Calls {@link
* IndexWriter#addDocuments(Iterable,Analyzer)} and
* returns the generation that reflects this change. */
public long addDocuments(Iterable<? extends IndexDocument> docs, Analyzer a) throws IOException {
writer.addDocuments(docs, a);
// Return gen as of when indexing finished:
return indexingGen.get();
}
/** Calls {@link IndexWriter#addDocument(IndexDocument)}
* and returns the generation that reflects this change. */
public long addDocument(IndexDocument d) throws IOException {
writer.addDocument(d);
// Return gen as of when indexing finished:
return indexingGen.get();
}
/** Calls {@link IndexWriter#addDocuments(Iterable)} and
* returns the generation that reflects this change. */
public long addDocuments(Iterable<? extends IndexDocument> docs) throws IOException {
writer.addDocuments(docs);
// Return gen as of when indexing finished:
return indexingGen.get();
}
/** Calls {@link IndexWriter#addIndexes(Directory...)} and
* returns the generation that reflects this change. */
public long addIndexes(Directory... dirs) throws IOException {
writer.addIndexes(dirs);
// Return gen as of when indexing finished:
return indexingGen.get();
}
/** Calls {@link IndexWriter#addIndexes(IndexReader...)}
* and returns the generation that reflects this change. */
public long addIndexes(IndexReader... readers) throws IOException {
writer.addIndexes(readers);
// Return gen as of when indexing finished:
return indexingGen.get();
}
/** Return the current generation being indexed. */
public long getGeneration() {
return indexingGen.get();
}
/** Return the wrapped {@link IndexWriter}. */
public IndexWriter getIndexWriter() {
return writer;
}
/** Return and increment current gen.
*
* @lucene.internal */
public long getAndIncrementGeneration() {
return indexingGen.getAndIncrement();
}
/** Cals {@link
* IndexWriter#tryDeleteDocument(IndexReader,int)} and
* returns the generation that reflects this change. */
public long tryDeleteDocument(IndexReader reader, int docID) throws IOException {
if (writer.tryDeleteDocument(reader, docID)) {
return indexingGen.get();
} else {
return -1;
}
}
}

View File

@ -0,0 +1,247 @@
package org.apache.lucene.search;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.TrackingIndexWriter;
import org.apache.lucene.util.ThreadInterruptedException;
/** Utility class that runs a thread to manage periodicc
* reopens of a {@link ReferenceManager}, with methods to wait for a specific
* index changes to become visible. To use this class you
* must first wrap your {@link IndexWriter} with a {@link
* TrackingIndexWriter} and always use it to make changes
* to the index, saving the returned generation. Then,
* when a given search request needs to see a specific
* index change, call the {#waitForGeneration} to wait for
* that change to be visible. Note that this will only
* scale well if most searches do not need to wait for a
* specific index generation.
*
* @lucene.experimental */
public class ControlledRealTimeReopenThread<T> extends Thread implements Closeable {
private final ReferenceManager<T> manager;
private final long targetMaxStaleNS;
private final long targetMinStaleNS;
private final TrackingIndexWriter writer;
private volatile boolean finish;
private volatile long waitingGen;
private volatile long searchingGen;
private long refreshStartGen;
private final ReentrantLock reopenLock = new ReentrantLock();
private final Condition reopenCond = reopenLock.newCondition();
/**
* Create ControlledRealTimeReopenThread, to periodically
* reopen the a {@link ReferenceManager}.
*
* @param targetMaxStaleSec Maximum time until a new
* reader must be opened; this sets the upper bound
* on how slowly reopens may occur, when no
* caller is waiting for a specific generation to
* become visible.
*
* @param targetMinStaleSec Mininum time until a new
* reader can be opened; this sets the lower bound
* on how quickly reopens may occur, when a caller
* is waiting for a specific generation to
* become visible.
*/
public ControlledRealTimeReopenThread(TrackingIndexWriter writer, ReferenceManager<T> manager, double targetMaxStaleSec, double targetMinStaleSec) {
if (targetMaxStaleSec < targetMinStaleSec) {
throw new IllegalArgumentException("targetMaxScaleSec (= " + targetMaxStaleSec + ") < targetMinStaleSec (=" + targetMinStaleSec + ")");
}
this.writer = writer;
this.manager = manager;
this.targetMaxStaleNS = (long) (1000000000*targetMaxStaleSec);
this.targetMinStaleNS = (long) (1000000000*targetMinStaleSec);
manager.addListener(new HandleRefresh());
}
private class HandleRefresh implements ReferenceManager.RefreshListener {
@Override
public void beforeRefresh() {
}
@Override
public void afterRefresh(boolean didRefresh) {
refreshDone(didRefresh);
}
}
private synchronized void refreshDone(boolean didRefresh) {
searchingGen = refreshStartGen;
notifyAll();
}
@Override
public synchronized void close() {
//System.out.println("NRT: set finish");
finish = true;
// So thread wakes up and notices it should finish:
reopenLock.lock();
try {
reopenCond.signal();
} finally {
reopenLock.unlock();
}
try {
join();
} catch (InterruptedException ie) {
throw new ThreadInterruptedException(ie);
}
// Max it out so any waiting search threads will return:
searchingGen = Long.MAX_VALUE;
notifyAll();
}
/**
* Waits for the target generation to become visible in
* the searcher.
* If the current searcher is older than the
* target generation, this method will block
* until the searcher is reopened, by another via
* {@link ReferenceManager#maybeRefresh} or until the {@link ReferenceManager} is closed.
*
* @param targetGen the generation to wait for
*/
public void waitForGeneration(long targetGen) throws InterruptedException {
waitForGeneration(targetGen, -1);
}
/**
* Waits for the target generation to become visible in
* the searcher, up to a maximum specified milli-seconds.
* If the current searcher is older than the target
* generation, this method will block until the
* searcher has been reopened by another thread via
* {@link ReferenceManager#maybeRefresh}, the given waiting time has elapsed, or until
* the {@link ReferenceManager} is closed.
* <p>
* NOTE: if the waiting time elapses before the requested target generation is
* available the current {@link SearcherManager} is returned instead.
*
* @param targetGen
* the generation to wait for
* @param maxMS
* maximum milliseconds to wait, or -1 to wait indefinitely
* @return true if the targetGeneration is now available,
* or false if maxMS wait time was exceeded
*/
public synchronized boolean waitForGeneration(long targetGen, int maxMS) throws InterruptedException {
final long curGen = writer.getGeneration();
if (targetGen > curGen) {
throw new IllegalArgumentException("targetGen=" + targetGen + " was never returned by the ReferenceManager instance (current gen=" + curGen + ")");
}
if (targetGen > searchingGen) {
waitingGen = Math.max(waitingGen, targetGen);
// Notify the reopen thread that the waitingGen has
// changed, so it may wake up and realize it should
// not sleep for much or any longer before reopening:
reopenLock.lock();
try {
reopenCond.signal();
} finally {
reopenLock.unlock();
}
long startMS = System.nanoTime()/1000000;
while (targetGen > searchingGen) {
if (maxMS < 0) {
wait();
} else {
long msLeft = ((startMS + maxMS) - (System.nanoTime())/1000000);
if (msLeft <= 0) {
return false;
} else {
wait(msLeft);
}
}
}
}
return true;
}
@Override
public void run() {
// TODO: maybe use private thread ticktock timer, in
// case clock shift messes up nanoTime?
long lastReopenStartNS = System.nanoTime();
//System.out.println("reopen: start");
while (!finish) {
// TODO: try to guestimate how long reopen might
// take based on past data?
// Loop until we've waiting long enough before the
// next reopen:
while (!finish) {
// True if we have someone waiting for reopened searcher:
boolean hasWaiting = waitingGen > searchingGen;
final long nextReopenStartNS = lastReopenStartNS + (hasWaiting ? targetMinStaleNS : targetMaxStaleNS);
final long sleepNS = nextReopenStartNS - System.nanoTime();
if (sleepNS > 0) {
reopenLock.lock();
try {
reopenCond.awaitNanos(sleepNS);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
return;
} finally {
reopenLock.unlock();
}
} else {
break;
}
}
if (finish) {
break;
}
lastReopenStartNS = System.nanoTime();
// Save the gen as of when we started the reopen; the
// listener (HandleRefresh above) copies this to
// searchingGen once the reopen completes:
refreshStartGen = writer.getAndIncrementGeneration();
try {
manager.maybeRefreshBlocking();
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
}
}
}

View File

@ -1,404 +0,0 @@
package org.apache.lucene.search;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexDocument;
import org.apache.lucene.index.IndexReader; // javadocs
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.IndexSearcher; // javadocs
import org.apache.lucene.search.SearcherFactory; // javadocs
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.ThreadInterruptedException;
/**
* Utility class to manage sharing near-real-time searchers
* across multiple searching thread. The difference vs
* SearcherManager is that this class enables individual
* requests to wait until specific indexing changes are
* visible.
*
* <p>You must create an IndexWriter, then create a {@link
* NRTManager.TrackingIndexWriter} from it, and pass that to the
* NRTManager. You may want to create two NRTManagers, once
* that always applies deletes on refresh and one that does
* not. In this case you should use a single {@link
* NRTManager.TrackingIndexWriter} instance for both.
*
* <p>Then, use {@link #acquire} to obtain the
* {@link IndexSearcher}, and {@link #release} (ideally,
* from within a <code>finally</code> clause) to release it.
*
* <p>NOTE: to use this class, you must call {@link #maybeRefresh()}
* periodically. The {@link NRTManagerReopenThread} is a
* simple class to do this on a periodic basis, and reopens
* more quickly if a request is waiting. If you implement
* your own reopener, be sure to call {@link
* #addWaitingListener} so your reopener is notified when a
* caller is waiting for a specific generation
* searcher. </p>
*
* @see SearcherFactory
*
* @lucene.experimental
*/
public final class NRTManager extends ReferenceManager<IndexSearcher> {
private static final long MAX_SEARCHER_GEN = Long.MAX_VALUE;
private final TrackingIndexWriter writer;
private final List<WaitingListener> waitingListeners = new CopyOnWriteArrayList<WaitingListener>();
private final ReentrantLock genLock = new ReentrantLock();;
private final Condition newGeneration = genLock.newCondition();
private final SearcherFactory searcherFactory;
private volatile long searchingGen;
/**
* Create new NRTManager.
*
* @param writer TrackingIndexWriter to open near-real-time
* readers
* @param searcherFactory An optional {@link SearcherFactory}. Pass
* <code>null</code> if you don't require the searcher to be warmed
* before going live or other custom behavior.
*/
public NRTManager(TrackingIndexWriter writer, SearcherFactory searcherFactory) throws IOException {
this(writer, searcherFactory, true);
}
/**
* Expert: just like {@link
* #NRTManager(TrackingIndexWriter,SearcherFactory)},
* but you can also specify whether each reopened searcher must
* apply deletes. This is useful for cases where certain
* uses can tolerate seeing some deleted docs, since
* reopen time is faster if deletes need not be applied. */
public NRTManager(TrackingIndexWriter writer, SearcherFactory searcherFactory, boolean applyAllDeletes) throws IOException {
this.writer = writer;
if (searcherFactory == null) {
searcherFactory = new SearcherFactory();
}
this.searcherFactory = searcherFactory;
current = SearcherManager.getSearcher(searcherFactory, DirectoryReader.open(writer.getIndexWriter(), applyAllDeletes));
}
@Override
protected void decRef(IndexSearcher reference) throws IOException {
reference.getIndexReader().decRef();
}
@Override
protected boolean tryIncRef(IndexSearcher reference) {
return reference.getIndexReader().tryIncRef();
}
/** NRTManager invokes this interface to notify it when a
* caller is waiting for a specific generation searcher
* to be visible. */
public static interface WaitingListener {
public void waiting(long targetGen);
}
/** Adds a listener, to be notified when a caller is
* waiting for a specific generation searcher to be
* visible. */
public void addWaitingListener(WaitingListener l) {
waitingListeners.add(l);
}
/** Remove a listener added with {@link
* #addWaitingListener}. */
public void removeWaitingListener(WaitingListener l) {
waitingListeners.remove(l);
}
/** Class that tracks changes to a delegated
* IndexWriter. Create this class (passing your
* IndexWriter), and then pass this class to NRTManager.
* Be sure to make all changes via the
* TrackingIndexWriter, otherwise NRTManager won't know
* about the changes.
*
* @lucene.experimental */
public static class TrackingIndexWriter {
private final IndexWriter writer;
private final AtomicLong indexingGen = new AtomicLong(1);
public TrackingIndexWriter(IndexWriter writer) {
this.writer = writer;
}
public long updateDocument(Term t, IndexDocument d, Analyzer a) throws IOException {
writer.updateDocument(t, d, a);
// Return gen as of when indexing finished:
return indexingGen.get();
}
public long updateDocument(Term t, IndexDocument d) throws IOException {
writer.updateDocument(t, d);
// Return gen as of when indexing finished:
return indexingGen.get();
}
public long updateDocuments(Term t, Iterable<? extends IndexDocument> docs, Analyzer a) throws IOException {
writer.updateDocuments(t, docs, a);
// Return gen as of when indexing finished:
return indexingGen.get();
}
public long updateDocuments(Term t, Iterable<? extends IndexDocument> docs) throws IOException {
writer.updateDocuments(t, docs);
// Return gen as of when indexing finished:
return indexingGen.get();
}
public long deleteDocuments(Term t) throws IOException {
writer.deleteDocuments(t);
// Return gen as of when indexing finished:
return indexingGen.get();
}
public long deleteDocuments(Term... terms) throws IOException {
writer.deleteDocuments(terms);
// Return gen as of when indexing finished:
return indexingGen.get();
}
public long deleteDocuments(Query q) throws IOException {
writer.deleteDocuments(q);
// Return gen as of when indexing finished:
return indexingGen.get();
}
public long deleteDocuments(Query... queries) throws IOException {
writer.deleteDocuments(queries);
// Return gen as of when indexing finished:
return indexingGen.get();
}
public long deleteAll() throws IOException {
writer.deleteAll();
// Return gen as of when indexing finished:
return indexingGen.get();
}
public long addDocument(IndexDocument d, Analyzer a) throws IOException {
writer.addDocument(d, a);
// Return gen as of when indexing finished:
return indexingGen.get();
}
public long addDocuments(Iterable<? extends IndexDocument> docs, Analyzer a) throws IOException {
writer.addDocuments(docs, a);
// Return gen as of when indexing finished:
return indexingGen.get();
}
public long addDocument(IndexDocument d) throws IOException {
writer.addDocument(d);
// Return gen as of when indexing finished:
return indexingGen.get();
}
public long addDocuments(Iterable<? extends IndexDocument> docs) throws IOException {
writer.addDocuments(docs);
// Return gen as of when indexing finished:
return indexingGen.get();
}
public long addIndexes(Directory... dirs) throws IOException {
writer.addIndexes(dirs);
// Return gen as of when indexing finished:
return indexingGen.get();
}
public long addIndexes(IndexReader... readers) throws IOException {
writer.addIndexes(readers);
// Return gen as of when indexing finished:
return indexingGen.get();
}
public long getGeneration() {
return indexingGen.get();
}
public IndexWriter getIndexWriter() {
return writer;
}
long getAndIncrementGeneration() {
return indexingGen.getAndIncrement();
}
public long tryDeleteDocument(IndexReader reader, int docID) throws IOException {
if (writer.tryDeleteDocument(reader, docID)) {
return indexingGen.get();
} else {
return -1;
}
}
}
/**
* Waits for the target generation to become visible in
* the searcher.
* If the current searcher is older than the
* target generation, this method will block
* until the searcher is reopened, by another via
* {@link #maybeRefresh} or until the {@link NRTManager} is closed.
*
* @param targetGen the generation to wait for
*/
public void waitForGeneration(long targetGen) {
waitForGeneration(targetGen, -1, TimeUnit.NANOSECONDS);
}
/**
* Waits for the target generation to become visible in
* the searcher. If the current searcher is older than
* the target generation, this method will block until the
* searcher has been reopened by another thread via
* {@link #maybeRefresh}, the given waiting time has elapsed, or until
* the NRTManager is closed.
* <p>
* NOTE: if the waiting time elapses before the requested target generation is
* available the current {@link SearcherManager} is returned instead.
*
* @param targetGen
* the generation to wait for
* @param time
* the time to wait for the target generation
* @param unit
* the waiting time's time unit
*/
public void waitForGeneration(long targetGen, long time, TimeUnit unit) {
try {
final long curGen = writer.getGeneration();
if (targetGen > curGen) {
throw new IllegalArgumentException("targetGen=" + targetGen + " was never returned by this NRTManager instance (current gen=" + curGen + ")");
}
genLock.lockInterruptibly();
try {
if (targetGen > searchingGen) {
for (WaitingListener listener : waitingListeners) {
listener.waiting(targetGen);
}
while (targetGen > searchingGen) {
if (!waitOnGenCondition(time, unit)) {
return;
}
}
}
} finally {
genLock.unlock();
}
} catch (InterruptedException ie) {
throw new ThreadInterruptedException(ie);
}
}
private boolean waitOnGenCondition(long time, TimeUnit unit)
throws InterruptedException {
assert genLock.isHeldByCurrentThread();
if (time < 0) {
newGeneration.await();
return true;
} else {
return newGeneration.await(time, unit);
}
}
/** Returns generation of current searcher. */
public long getCurrentSearchingGen() {
return searchingGen;
}
private long lastRefreshGen;
@Override
protected IndexSearcher refreshIfNeeded(IndexSearcher referenceToRefresh) throws IOException {
// Record gen as of when reopen started:
lastRefreshGen = writer.getAndIncrementGeneration();
final IndexReader r = referenceToRefresh.getIndexReader();
assert r instanceof DirectoryReader: "searcher's IndexReader should be a DirectoryReader, but got " + r;
final DirectoryReader dirReader = (DirectoryReader) r;
IndexSearcher newSearcher = null;
if (!dirReader.isCurrent()) {
final IndexReader newReader = DirectoryReader.openIfChanged(dirReader);
if (newReader != null) {
newSearcher = SearcherManager.getSearcher(searcherFactory, newReader);
}
}
return newSearcher;
}
@Override
protected void afterMaybeRefresh() {
genLock.lock();
try {
if (searchingGen != MAX_SEARCHER_GEN) {
// update searchingGen:
assert lastRefreshGen >= searchingGen;
searchingGen = lastRefreshGen;
}
// wake up threads if we have a new generation:
newGeneration.signalAll();
} finally {
genLock.unlock();
}
}
@Override
protected synchronized void afterClose() throws IOException {
genLock.lock();
try {
// max it out to make sure nobody can wait on another gen
searchingGen = MAX_SEARCHER_GEN;
newGeneration.signalAll();
} finally {
genLock.unlock();
}
}
/**
* Returns <code>true</code> if no changes have occured since this searcher
* ie. reader was opened, otherwise <code>false</code>.
* @see DirectoryReader#isCurrent()
*/
public boolean isSearcherCurrent() throws IOException {
final IndexSearcher searcher = acquire();
try {
final IndexReader r = searcher.getIndexReader();
assert r instanceof DirectoryReader: "searcher's IndexReader should be a DirectoryReader, but got " + r;
return ((DirectoryReader) r).isCurrent();
} finally {
release(searcher);
}
}
}

View File

@ -1,200 +0,0 @@
package org.apache.lucene.search;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.Closeable;
import java.io.IOException;
import org.apache.lucene.util.ThreadInterruptedException;
/**
* Utility class that runs a reopen thread to periodically
* reopen the NRT searchers in the provided {@link
* NRTManager}.
*
* <p> Typical usage looks like this:
*
* <pre class="prettyprint">
* ... open your own writer ...
*
* NRTManager manager = new NRTManager(writer);
*
* // Refreshes searcher every 5 seconds when nobody is waiting, and up to 100 msec delay
* // when somebody is waiting:
* NRTManagerReopenThread reopenThread = new NRTManagerReopenThread(manager, 5.0, 0.1);
* reopenThread.setName("NRT Reopen Thread");
* reopenThread.setPriority(Math.min(Thread.currentThread().getPriority()+2, Thread.MAX_PRIORITY));
* reopenThread.setDaemon(true);
* reopenThread.start();
* </pre>
*
* Then, for each incoming query, do this:
*
* <pre class="prettyprint">
* // For each incoming query:
* IndexSearcher searcher = manager.get();
* try {
* // Use searcher to search...
* } finally {
* manager.release(searcher);
* }
* </pre>
*
* You should make changes using the <code>NRTManager</code>; if you later need to obtain
* a searcher reflecting those changes:
*
* <pre class="prettyprint">
* // ... or updateDocument, deleteDocuments, etc:
* long gen = manager.addDocument(...);
*
* // Returned searcher is guaranteed to reflect the just added document
* IndexSearcher searcher = manager.get(gen);
* try {
* // Use searcher to search...
* } finally {
* manager.release(searcher);
* }
* </pre>
*
*
* When you are done be sure to close both the manager and the reopen thread:
* <pre class="prettyprint">
* reopenThread.close();
* manager.close();
* </pre>
*
* @lucene.experimental
*/
public class NRTManagerReopenThread extends Thread implements NRTManager.WaitingListener, Closeable {
private final NRTManager manager;
private final long targetMaxStaleNS;
private final long targetMinStaleNS;
private boolean finish;
private long waitingGen;
/**
* Create NRTManagerReopenThread, to periodically reopen the NRT searcher.
*
* @param targetMaxStaleSec Maximum time until a new
* reader must be opened; this sets the upper bound
* on how slowly reopens may occur
*
* @param targetMinStaleSec Mininum time until a new
* reader can be opened; this sets the lower bound
* on how quickly reopens may occur, when a caller
* is waiting for a specific indexing change to
* become visible.
*/
public NRTManagerReopenThread(NRTManager manager, double targetMaxStaleSec, double targetMinStaleSec) {
if (targetMaxStaleSec < targetMinStaleSec) {
throw new IllegalArgumentException("targetMaxScaleSec (= " + targetMaxStaleSec + ") < targetMinStaleSec (=" + targetMinStaleSec + ")");
}
this.manager = manager;
this.targetMaxStaleNS = (long) (1000000000*targetMaxStaleSec);
this.targetMinStaleNS = (long) (1000000000*targetMinStaleSec);
manager.addWaitingListener(this);
}
@Override
public synchronized void close() {
//System.out.println("NRT: set finish");
manager.removeWaitingListener(this);
this.finish = true;
notify();
try {
join();
} catch (InterruptedException ie) {
throw new ThreadInterruptedException(ie);
}
}
@Override
public synchronized void waiting(long targetGen) {
waitingGen = Math.max(waitingGen, targetGen);
notify();
//System.out.println(Thread.currentThread().getName() + ": force wakeup waitingGen=" + waitingGen + " applyDeletes=" + applyDeletes);
}
@Override
public void run() {
// TODO: maybe use private thread ticktock timer, in
// case clock shift messes up nanoTime?
long lastReopenStartNS = System.nanoTime();
//System.out.println("reopen: start");
try {
while (true) {
boolean hasWaiting = false;
synchronized(this) {
// TODO: try to guestimate how long reopen might
// take based on past data?
while (!finish) {
//System.out.println("reopen: cycle");
// True if we have someone waiting for reopen'd searcher:
hasWaiting = waitingGen > manager.getCurrentSearchingGen();
final long nextReopenStartNS = lastReopenStartNS + (hasWaiting ? targetMinStaleNS : targetMaxStaleNS);
final long sleepNS = nextReopenStartNS - System.nanoTime();
if (sleepNS > 0) {
//System.out.println("reopen: sleep " + (sleepNS/1000000.0) + " ms (hasWaiting=" + hasWaiting + ")");
try {
wait(sleepNS/1000000, (int) (sleepNS%1000000));
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
//System.out.println("NRT: set finish on interrupt");
finish = true;
break;
}
} else {
break;
}
}
if (finish) {
//System.out.println("reopen: finish");
return;
}
//System.out.println("reopen: start hasWaiting=" + hasWaiting);
}
lastReopenStartNS = System.nanoTime();
try {
//final long t0 = System.nanoTime();
manager.maybeRefresh();
//System.out.println("reopen took " + ((System.nanoTime()-t0)/1000000.0) + " msec");
} catch (IOException ioe) {
//System.out.println(Thread.currentThread().getName() + ": IOE");
//ioe.printStackTrace();
throw new RuntimeException(ioe);
}
}
} catch (Throwable t) {
//System.out.println("REOPEN EXC");
//t.printStackTrace(System.out);
throw new RuntimeException(t);
}
}
}

View File

@ -26,7 +26,7 @@ import org.apache.lucene.index.IndexWriterConfig; // javadocs
import org.apache.lucene.search.similarities.Similarity; // javadocs
/**
* Factory class used by {@link SearcherManager} and {@link NRTManager} to
* Factory class used by {@link SearcherManager} to
* create new IndexSearchers. The default implementation just creates
* an IndexSearcher with no custom behavior:
*

View File

@ -24,7 +24,6 @@ import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.lucene.search.NRTManager; // javadocs
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.util.IOUtils;
@ -41,8 +40,8 @@ import org.apache.lucene.util.IOUtils;
*
* Per search-request, if it's a "new" search request, then
* obtain the latest searcher you have (for example, by
* using {@link SearcherManager} or {@link NRTManager}), and
* then record this searcher:
* using {@link SearcherManager}), and then record this
* searcher:
*
* <pre class="prettyprint">
* // Record the current searcher, and save the returend
@ -143,8 +142,7 @@ public class SearcherLifetimeManager implements Closeable {
/** Records that you are now using this IndexSearcher.
* Always call this when you've obtained a possibly new
* {@link IndexSearcher}, for example from one of the
* <code>get</code> methods in {@link NRTManager} or {@link
* {@link IndexSearcher}, for example from {@link
* SearcherManager}. It's fine if you already passed the
* same searcher to this method before.
*

View File

@ -36,21 +36,33 @@ import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.index.RandomIndexWriter;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.ThreadedIndexingAndSearchingTestCase;
import org.apache.lucene.index.TrackingIndexWriter;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.NRTCachingDirectory;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.LuceneTestCase.SuppressCodecs;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.ThreadInterruptedException;
@SuppressCodecs({ "SimpleText", "Memory", "Direct" })
public class TestNRTManager extends ThreadedIndexingAndSearchingTestCase {
public class TestControlledRealTimeReopenThread extends ThreadedIndexingAndSearchingTestCase {
// Not guaranteed to reflect deletes:
private SearcherManager nrtNoDeletes;
// Is guaranteed to reflect deletes:
private SearcherManager nrtDeletes;
private TrackingIndexWriter genWriter;
private ControlledRealTimeReopenThread<IndexSearcher> nrtDeletesThread;
private ControlledRealTimeReopenThread<IndexSearcher> nrtNoDeletesThread;
private final ThreadLocal<Long> lastGens = new ThreadLocal<Long>();
private boolean warmCalled;
public void testNRTManager() throws Exception {
runTest("TestNRTManager");
public void testControlledRealTimeReopenThread() throws Exception {
runTest("TestControlledRealTimeReopenThread");
}
@Override
@ -58,7 +70,7 @@ public class TestNRTManager extends ThreadedIndexingAndSearchingTestCase {
if (VERBOSE) {
System.out.println("TEST: finalSearcher maxGen=" + maxGen);
}
nrtDeletes.waitForGeneration(maxGen);
nrtDeletesThread.waitForGeneration(maxGen);
return nrtDeletes.acquire();
}
@ -85,7 +97,7 @@ public class TestNRTManager extends ThreadedIndexingAndSearchingTestCase {
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": nrt: verify " + id);
}
nrtDeletes.waitForGeneration(gen);
nrtDeletesThread.waitForGeneration(gen);
final IndexSearcher s = nrtDeletes.acquire();
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s);
@ -108,7 +120,7 @@ public class TestNRTManager extends ThreadedIndexingAndSearchingTestCase {
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": nrt: verify " + id);
}
nrtNoDeletes.waitForGeneration(gen);
nrtNoDeletesThread.waitForGeneration(gen);
final IndexSearcher s = nrtNoDeletes.acquire();
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s);
@ -131,7 +143,7 @@ public class TestNRTManager extends ThreadedIndexingAndSearchingTestCase {
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": nrt: verify " + id);
}
nrtNoDeletes.waitForGeneration(gen);
nrtNoDeletesThread.waitForGeneration(gen);
final IndexSearcher s = nrtNoDeletes.acquire();
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s);
@ -153,7 +165,7 @@ public class TestNRTManager extends ThreadedIndexingAndSearchingTestCase {
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": nrt: verify " + id);
}
nrtDeletes.waitForGeneration(gen);
nrtDeletesThread.waitForGeneration(gen);
final IndexSearcher s = nrtDeletes.acquire();
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s);
@ -175,7 +187,7 @@ public class TestNRTManager extends ThreadedIndexingAndSearchingTestCase {
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": nrt: verify del " + id);
}
nrtDeletes.waitForGeneration(gen);
nrtDeletesThread.waitForGeneration(gen);
final IndexSearcher s = nrtDeletes.acquire();
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s);
@ -189,48 +201,37 @@ public class TestNRTManager extends ThreadedIndexingAndSearchingTestCase {
lastGens.set(gen);
}
// Not guaranteed to reflect deletes:
private NRTManager nrtNoDeletes;
// Is guaranteed to reflect deletes:
private NRTManager nrtDeletes;
private NRTManager.TrackingIndexWriter genWriter;
private NRTManagerReopenThread nrtDeletesThread;
private NRTManagerReopenThread nrtNoDeletesThread;
@Override
protected void doAfterWriter(final ExecutorService es) throws Exception {
final double minReopenSec = 0.01 + 0.05 * random().nextDouble();
final double maxReopenSec = minReopenSec * (1.0 + 10 * random().nextDouble());
if (VERBOSE) {
System.out.println("TEST: make NRTManager maxReopenSec=" + maxReopenSec + " minReopenSec=" + minReopenSec);
System.out.println("TEST: make SearcherManager maxReopenSec=" + maxReopenSec + " minReopenSec=" + minReopenSec);
}
genWriter = new NRTManager.TrackingIndexWriter(writer);
genWriter = new TrackingIndexWriter(writer);
final SearcherFactory sf = new SearcherFactory() {
@Override
public IndexSearcher newSearcher(IndexReader r) throws IOException {
TestNRTManager.this.warmCalled = true;
TestControlledRealTimeReopenThread.this.warmCalled = true;
IndexSearcher s = new IndexSearcher(r, es);
s.search(new TermQuery(new Term("body", "united")), 10);
return s;
}
};
nrtNoDeletes = new NRTManager(genWriter, sf, false);
nrtDeletes = new NRTManager(genWriter, sf, true);
nrtNoDeletes = new SearcherManager(writer, false, sf);
nrtDeletes = new SearcherManager(writer, true, sf);
nrtDeletesThread = new NRTManagerReopenThread(nrtDeletes, maxReopenSec, minReopenSec);
nrtDeletesThread = new ControlledRealTimeReopenThread<IndexSearcher>(genWriter, nrtDeletes, maxReopenSec, minReopenSec);
nrtDeletesThread.setName("NRTDeletes Reopen Thread");
nrtDeletesThread.setPriority(Math.min(Thread.currentThread().getPriority()+2, Thread.MAX_PRIORITY));
nrtDeletesThread.setDaemon(true);
nrtDeletesThread.start();
nrtNoDeletesThread = new NRTManagerReopenThread(nrtNoDeletes, maxReopenSec, minReopenSec);
nrtNoDeletesThread = new ControlledRealTimeReopenThread<IndexSearcher>(genWriter, nrtNoDeletes, maxReopenSec, minReopenSec);
nrtNoDeletesThread.setName("NRTNoDeletes Reopen Thread");
nrtNoDeletesThread.setPriority(Math.min(Thread.currentThread().getPriority()+2, Thread.MAX_PRIORITY));
nrtNoDeletesThread.setDaemon(true);
@ -260,7 +261,7 @@ public class TestNRTManager extends ThreadedIndexingAndSearchingTestCase {
protected IndexSearcher getCurrentSearcher() throws Exception {
// Test doesn't assert deletions until the end, so we
// can randomize whether dels must be applied
final NRTManager nrt;
final SearcherManager nrt;
if (random().nextBoolean()) {
nrt = nrtDeletes;
} else {
@ -273,7 +274,7 @@ public class TestNRTManager extends ThreadedIndexingAndSearchingTestCase {
@Override
protected void releaseSearcher(IndexSearcher s) throws Exception {
// NOTE: a bit iffy... technically you should release
// against the same NRT mgr you acquired from... but
// against the same SearcherManager you acquired from... but
// both impls just decRef the underlying reader so we
// can get away w/ cheating:
nrtNoDeletes.release(s);
@ -283,7 +284,7 @@ public class TestNRTManager extends ThreadedIndexingAndSearchingTestCase {
protected void doClose() throws Exception {
assertTrue(warmCalled);
if (VERBOSE) {
System.out.println("TEST: now close NRTManager");
System.out.println("TEST: now close SearcherManagers");
}
nrtDeletesThread.close();
nrtDeletes.close();
@ -302,13 +303,12 @@ public class TestNRTManager extends ThreadedIndexingAndSearchingTestCase {
final CountDownLatch signal = new CountDownLatch(1);
LatchedIndexWriter _writer = new LatchedIndexWriter(d, conf, latch, signal);
final NRTManager.TrackingIndexWriter writer = new NRTManager.TrackingIndexWriter(_writer);
final NRTManager manager = new NRTManager(writer, null, false);
final TrackingIndexWriter writer = new TrackingIndexWriter(_writer);
final SearcherManager manager = new SearcherManager(_writer, false, null);
Document doc = new Document();
doc.add(newTextField("test", "test", Field.Store.YES));
long gen = writer.addDocument(doc);
writer.addDocument(doc);
manager.maybeRefresh();
assertFalse(gen < manager.getCurrentSearchingGen());
Thread t = new Thread() {
@Override
public void run() {
@ -336,7 +336,7 @@ public class TestNRTManager extends ThreadedIndexingAndSearchingTestCase {
} finally {
manager.release(searcher);
}
NRTManagerReopenThread thread = new NRTManagerReopenThread(manager, 0.01, 0.01);
final ControlledRealTimeReopenThread<IndexSearcher> thread = new ControlledRealTimeReopenThread<IndexSearcher>(writer, manager, 0.01, 0.01);
thread.start(); // start reopening
if (VERBOSE) {
System.out.println("waiting now for generation " + lastGen);
@ -346,7 +346,12 @@ public class TestNRTManager extends ThreadedIndexingAndSearchingTestCase {
Thread waiter = new Thread() {
@Override
public void run() {
manager.waitForGeneration(lastGen);
try {
thread.waitForGeneration(lastGen);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new RuntimeException(ie);
}
finished.set(true);
}
};
@ -408,7 +413,7 @@ public class TestNRTManager extends ThreadedIndexingAndSearchingTestCase {
};
try {
new NRTManager(new NRTManager.TrackingIndexWriter(w.w), theEvilOne);
new SearcherManager(w.w, false, theEvilOne);
} catch (IllegalStateException ise) {
// expected
}
@ -421,7 +426,7 @@ public class TestNRTManager extends ThreadedIndexingAndSearchingTestCase {
Directory dir = newDirectory();
IndexWriter iw = new IndexWriter(dir, new IndexWriterConfig(TEST_VERSION_CURRENT, null));
final AtomicBoolean afterRefreshCalled = new AtomicBoolean(false);
NRTManager sm = new NRTManager(new NRTManager.TrackingIndexWriter(iw),new SearcherFactory());
SearcherManager sm = new SearcherManager(iw, true, new SearcherFactory());
sm.addListener(new ReferenceManager.RefreshListener() {
@Override
public void beforeRefresh() {

View File

@ -25,7 +25,6 @@ import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import org.apache.lucene.analysis.MockAnalyzer;
@ -36,10 +35,8 @@ import org.apache.lucene.document.StringField;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.RandomIndexWriter;
import org.apache.lucene.index.StoredDocument;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.NRTManager.TrackingIndexWriter;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util._TestUtil;
@ -50,10 +47,9 @@ public class TestLiveFieldValues extends LuceneTestCase {
Directory dir = newFSDirectory(_TestUtil.getTempDir("livefieldupdates"));
IndexWriterConfig iwc = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random()));
final IndexWriter _w = new IndexWriter(dir, iwc);
final TrackingIndexWriter w = new TrackingIndexWriter(_w);
final IndexWriter w = new IndexWriter(dir, iwc);
final NRTManager mgr = new NRTManager(w, new SearcherFactory() {
final SearcherManager mgr = new SearcherManager(w, true, new SearcherFactory() {
@Override
public IndexSearcher newSearcher(IndexReader r) {
return new IndexSearcher(r);
@ -174,7 +170,7 @@ public class TestLiveFieldValues extends LuceneTestCase {
rt.close();
mgr.close();
_w.close();
w.close();
dir.close();
}
}