LUCENE-3636: make it possible to use SearcherManager with distributed stats

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1213020 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Muir 2011-12-11 17:07:47 +00:00
parent 377332e696
commit 503097ac71
7 changed files with 155 additions and 148 deletions

View File

@ -671,6 +671,13 @@ Changes in backwards compatibility policy
were made final. This is not expected to affect many apps, since these methods
already delegate to abstract methods, which you had to already override
anyway. (Shai Erera)
* LUCENE-3636: Added SearcherFactory, used by SearcherManager and NRTManager
to create new IndexSearchers. You can provide your own implementation to
warm new searchers, set an ExecutorService, set a custom Similarity, or
even return your own subclass of IndexSearcher. The SearcherWarmer and
ExecutorService parameters on these classes were removed, as they are
subsumed by SearcherFactory. (Shai Erera, Mike McCandless, Robert Muir)
Security fixes

View File

@ -21,7 +21,6 @@ import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
@ -34,6 +33,7 @@ import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.IndexSearcher; // javadocs
import org.apache.lucene.search.SearcherFactory;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.ThreadInterruptedException;
@ -49,6 +49,8 @@ import org.apache.lucene.util.ThreadInterruptedException;
* #addWaitingListener} so your reopener is notified when a
* caller is waiting for a specific generation searcher. </p>
*
* @see SearcherFactory
*
* @lucene.experimental
*/
@ -65,60 +67,30 @@ public class NRTManager implements Closeable {
/**
* Create new NRTManager.
*
* @param writer IndexWriter to open near-real-time
* readers
* @param warmer optional {@link SearcherWarmer}. Pass
* null if you don't require the searcher to warmed
* before going live. If this is non-null then a
* merged segment warmer is installed on the
* provided IndexWriter's config.
*
* <p><b>NOTE</b>: the provided {@link SearcherWarmer} is
* not invoked for the initial searcher; you should
* warm it yourself if necessary.
* @param writer IndexWriter to open near-real-time
* readers
* @param searcherFactory An optional {@link SearcherFactory}. Pass
* <code>null</code> if you don't require the searcher to be warmed
* before going live or other custom behavior.
*/
public NRTManager(IndexWriter writer, SearcherWarmer warmer) throws IOException {
this(writer, null, warmer, true);
}
/**
* Create new NRTManager.
*
* @param writer IndexWriter to open near-real-time
* readers
* @param es optional ExecutorService so different segments can
* be searched concurrently (see {@link IndexSearcher#IndexSearcher(IndexReader, ExecutorService)}.
* Pass <code>null</code> to search segments sequentially.
* @param warmer optional {@link SearcherWarmer}. Pass
* null if you don't require the searcher to warmed
* before going live. If this is non-null then a
* merged segment warmer is installed on the
* provided IndexWriter's config.
*
* <p><b>NOTE</b>: the provided {@link SearcherWarmer} is
* not invoked for the initial searcher; you should
* warm it yourself if necessary.
*/
public NRTManager(IndexWriter writer, ExecutorService es,
SearcherWarmer warmer) throws IOException {
this(writer, es, warmer, true);
public NRTManager(IndexWriter writer, SearcherFactory searcherFactory) throws IOException {
this(writer, searcherFactory, true);
}
/**
* Expert: just like {@link
* #NRTManager(IndexWriter,ExecutorService,SearcherWarmer)},
* #NRTManager(IndexWriter,SearcherFactory)},
* but you can also specify whether every searcher must
* apply deletes. This is useful for cases where certain
* uses can tolerate seeing some deleted docs, since
* reopen time is faster if deletes need not be applied. */
public NRTManager(IndexWriter writer, ExecutorService es,
SearcherWarmer warmer, boolean alwaysApplyDeletes) throws IOException {
public NRTManager(IndexWriter writer, SearcherFactory searcherFactory, boolean alwaysApplyDeletes) throws IOException {
this.writer = writer;
if (alwaysApplyDeletes) {
withoutDeletes = withDeletes = new SearcherManagerRef(true, 0, new SearcherManager(writer, true, warmer, es));
withoutDeletes = withDeletes = new SearcherManagerRef(true, 0, new SearcherManager(writer, true, searcherFactory));
} else {
withDeletes = new SearcherManagerRef(true, 0, new SearcherManager(writer, true, warmer, es));
withoutDeletes = new SearcherManagerRef(false, 0, new SearcherManager(writer, false, warmer, es));
withDeletes = new SearcherManagerRef(true, 0, new SearcherManager(writer, true, searcherFactory));
withoutDeletes = new SearcherManagerRef(false, 0, new SearcherManager(writer, false, searcherFactory));
}
indexingGen = new AtomicLong(1);
}

View File

@ -0,0 +1,57 @@
package org.apache.lucene.search;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.IOException;
import java.util.concurrent.ExecutorService; // javadocs
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter; // javadocs
import org.apache.lucene.index.IndexWriterConfig; // javadocs
import org.apache.lucene.search.similarities.SimilarityProvider; // javadocs
/**
* Factory class used by {@link SearcherManager} and {@link NRTManager} to
* create new IndexSearchers. The default implementation just creates
* an IndexSearcher with no custom behavior:
*
* <pre class="prettyprint">
* public IndexSearcher newSearcher(IndexReader r) throws IOException {
* return new IndexSearcher(r);
* }
* </pre>
*
* You can pass your own factory instead if you want custom behavior, such as:
* <ul>
* <li>Setting a custom scoring model: {@link IndexSearcher#setSimilarityProvider(SimilarityProvider)}
* <li>Parallel per-segment search: {@link IndexSearcher#IndexSearcher(IndexReader, ExecutorService)}
* <li>Return custom subclasses of IndexSearcher (for example that implement distributed scoring)
* <li>Run queries to warm your IndexSearcher before it is used. Note: when using near-realtime search
* you may want to also {@link IndexWriterConfig#setMergedSegmentWarmer(IndexWriter.IndexReaderWarmer)} to warm
* newly merged segments in the background, outside of the reopen path.
* </ul>
* @lucene.experimental
*/
public class SearcherFactory {
/**
* Returns a new IndexSearcher over the given reader.
*/
public IndexSearcher newSearcher(IndexReader reader) throws IOException {
return new IndexSearcher(reader);
}
}

View File

@ -18,7 +18,6 @@ package org.apache.lucene.search;
*/
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;
import org.apache.lucene.index.CorruptIndexException;
@ -61,14 +60,15 @@ import org.apache.lucene.store.Directory;
* {@link NRTManager} since that class pulls near-real-time readers from the
* IndexWriter.
*
* @see SearcherFactory
*
* @lucene.experimental
*/
public final class SearcherManager {
private volatile IndexSearcher currentSearcher;
private final ExecutorService es;
private final SearcherWarmer warmer;
private final SearcherFactory searcherFactory;
private final Semaphore reopenLock = new Semaphore(1);
/**
@ -81,60 +81,41 @@ public final class SearcherManager {
* Applying deletes can be costly, so if your app can tolerate deleted documents
* being returned you might gain some performance by passing <code>false</code>.
* See {@link IndexReader#openIfChanged(IndexReader, IndexWriter, boolean)}.
* @param warmer An optional {@link SearcherWarmer}. Pass
* <code>null</code> if you don't require the searcher to warmed
* before going live. If this is <code>non-null</code> then a
* merged segment warmer is installed on the
* provided IndexWriter's config.
* @param es An optional {@link ExecutorService} so different segments can
* be searched concurrently (see {@link
* IndexSearcher#IndexSearcher(IndexReader,ExecutorService)}. Pass <code>null</code>
* to search segments sequentially.
* @param searcherFactory An optional {@link SearcherFactory}. Pass
* <code>null</code> if you don't require the searcher to be warmed
* before going live or other custom behavior.
*
* @throws IOException
*/
public SearcherManager(IndexWriter writer, boolean applyAllDeletes,
final SearcherWarmer warmer, final ExecutorService es) throws IOException {
this.es = es;
this.warmer = warmer;
currentSearcher = new IndexSearcher(IndexReader.open(writer, applyAllDeletes));
if (warmer != null) {
writer.getConfig().setMergedSegmentWarmer(
new IndexWriter.IndexReaderWarmer() {
@Override
public void warm(IndexReader reader) throws IOException {
warmer.warm(new IndexSearcher(reader, es));
}
});
public SearcherManager(IndexWriter writer, boolean applyAllDeletes, SearcherFactory searcherFactory) throws IOException {
if (searcherFactory == null) {
searcherFactory = new SearcherFactory();
}
this.searcherFactory = searcherFactory;
currentSearcher = searcherFactory.newSearcher(IndexReader.open(writer, applyAllDeletes));
}
/**
* Creates and returns a new SearcherManager from the given {@link Directory}.
* @param dir the directory to open the IndexReader on.
* @param warmer An optional {@link SearcherWarmer}. Pass
* <code>null</code> if you don't require the searcher to warmed
* before going live. If this is <code>non-null</code> then a
* merged segment warmer is installed on the
* provided IndexWriter's config.
* @param es And optional {@link ExecutorService} so different segments can
* be searched concurrently (see {@link
* IndexSearcher#IndexSearcher(IndexReader,ExecutorService)}. Pass <code>null</code>
* to search segments sequentially.
* @param searcherFactory An optional {@link SearcherFactory}. Pass
* <code>null</code> if you don't require the searcher to be warmed
* before going live or other custom behavior.
*
* @throws IOException
*/
public SearcherManager(Directory dir, SearcherWarmer warmer,
ExecutorService es) throws IOException {
this.es = es;
this.warmer = warmer;
currentSearcher = new IndexSearcher(IndexReader.open(dir), es);
public SearcherManager(Directory dir, SearcherFactory searcherFactory) throws IOException {
if (searcherFactory == null) {
searcherFactory = new SearcherFactory();
}
this.searcherFactory = searcherFactory;
currentSearcher = searcherFactory.newSearcher(IndexReader.open(dir));
}
/**
* You must call this, periodically, to perform a reopen. This calls
* {@link IndexReader#openIfChanged(IndexReader)} with the underlying reader, and if that returns a
* new reader, it's warmed (if you provided a {@link SearcherWarmer} and then
* new reader, it's warmed (if you provided a {@link SearcherFactory} and then
* swapped into production.
*
* <p>
@ -167,12 +148,9 @@ public final class SearcherManager {
release(searcherToReopen);
}
if (newReader != null) {
final IndexSearcher newSearcher = new IndexSearcher(newReader, es);
final IndexSearcher newSearcher = searcherFactory.newSearcher(newReader);
boolean success = false;
try {
if (warmer != null) {
warmer.warm(newSearcher);
}
swapSearcher(newSearcher);
success = true;
} finally {
@ -260,5 +238,4 @@ public final class SearcherManager {
currentSearcher = newSearcher;
release(oldSearcher);
}
}

View File

@ -1,34 +0,0 @@
package org.apache.lucene.search;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.IOException;
import org.apache.lucene.search.NRTManager; // javadocs
/** Pass an implementation of this to {@link NRTManager} or
* {@link SearcherManager} to warm a new {@link
* IndexSearcher} before it's put into production.
*
* @lucene.experimental */
public interface SearcherWarmer {
// TODO: can we somehow merge this w/ IW's
// IndexReaderWarmer.... should IW switch to this?
public void warm(IndexSearcher s) throws IOException;
}

View File

@ -28,12 +28,14 @@ import org.apache.lucene.analysis.MockAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.TextField;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.ThreadedIndexingAndSearchingTestCase;
import org.apache.lucene.search.NRTManagerReopenThread;
import org.apache.lucene.search.SearcherFactory;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.store.NRTCachingDirectory;
@ -190,7 +192,7 @@ public class TestNRTManager extends ThreadedIndexingAndSearchingTestCase {
private NRTManager nrt;
private NRTManagerReopenThread nrtThread;
@Override
protected void doAfterWriter(ExecutorService es) throws Exception {
protected void doAfterWriter(final ExecutorService es) throws Exception {
final double minReopenSec = 0.01 + 0.05 * random.nextDouble();
final double maxReopenSec = minReopenSec * (1.0 + 10 * random.nextDouble());
@ -198,14 +200,16 @@ public class TestNRTManager extends ThreadedIndexingAndSearchingTestCase {
System.out.println("TEST: make NRTManager maxReopenSec=" + maxReopenSec + " minReopenSec=" + minReopenSec);
}
nrt = new NRTManager(writer, es,
new SearcherWarmer() {
@Override
public void warm(IndexSearcher s) throws IOException {
TestNRTManager.this.warmCalled = true;
s.search(new TermQuery(new Term("body", "united")), 10);
}
}, false);
nrt = new NRTManager(writer,
new SearcherFactory() {
@Override
public IndexSearcher newSearcher(IndexReader r) throws IOException {
TestNRTManager.this.warmCalled = true;
IndexSearcher s = new IndexSearcher(r, es);
s.search(new TermQuery(new Term("body", "united")), 10);
return s;
}
}, false);
nrtThread = new NRTManagerReopenThread(nrt, maxReopenSec, minReopenSec);
nrtThread.setName("NRT Reopen Thread");
@ -267,7 +271,7 @@ public class TestNRTManager extends ThreadedIndexingAndSearchingTestCase {
final CountDownLatch signal = new CountDownLatch(1);
LatchedIndexWriter writer = new LatchedIndexWriter(d, conf, latch, signal);
final NRTManager manager = new NRTManager(writer, null, null, false);
final NRTManager manager = new NRTManager(writer, null, false);
Document doc = new Document();
doc.add(newField("test","test", TextField.TYPE_STORED));
long gen = manager.addDocument(doc);

View File

@ -29,9 +29,11 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.lucene.analysis.MockAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.ConcurrentMergeScheduler;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.ThreadedIndexingAndSearchingTestCase;
import org.apache.lucene.search.SearcherFactory;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.LuceneTestCase.UseNoMemoryExpensiveCodec;
@ -65,24 +67,26 @@ public class TestSearcherManager extends ThreadedIndexingAndSearchingTestCase {
private boolean isNRT;
@Override
protected void doAfterWriter(ExecutorService es) throws Exception {
final SearcherWarmer warmer = new SearcherWarmer() {
protected void doAfterWriter(final ExecutorService es) throws Exception {
final SearcherFactory factory = new SearcherFactory() {
@Override
public void warm(IndexSearcher s) throws IOException {
public IndexSearcher newSearcher(IndexReader r) throws IOException {
IndexSearcher s = new IndexSearcher(r, es);
TestSearcherManager.this.warmCalled = true;
s.search(new TermQuery(new Term("body", "united")), 10);
return s;
}
};
if (random.nextBoolean()) {
// TODO: can we randomize the applyAllDeletes? But
// somehow for final searcher we must apply
// deletes...
mgr = new SearcherManager(writer, true, warmer, es);
mgr = new SearcherManager(writer, true, factory);
isNRT = true;
} else {
// SearcherManager needs to see empty commit:
writer.commit();
mgr = new SearcherManager(dir, warmer, es);
mgr = new SearcherManager(dir, factory);
isNRT = false;
}
@ -106,8 +110,10 @@ public class TestSearcherManager extends ThreadedIndexingAndSearchingTestCase {
}
}
} catch (Throwable t) {
System.out.println("TEST: reopen thread hit exc");
t.printStackTrace(System.out);
if (VERBOSE) {
System.out.println("TEST: reopen thread hit exc");
t.printStackTrace(System.out);
}
failed.set(true);
throw new RuntimeException(t);
}
@ -191,20 +197,28 @@ public class TestSearcherManager extends ThreadedIndexingAndSearchingTestCase {
writer.commit();
final CountDownLatch awaitEnterWarm = new CountDownLatch(1);
final CountDownLatch awaitClose = new CountDownLatch(1);
final AtomicBoolean triedReopen = new AtomicBoolean(false);
final ExecutorService es = random.nextBoolean() ? null : Executors.newCachedThreadPool(new NamedThreadFactory("testIntermediateClose"));
final SearcherWarmer warmer = new SearcherWarmer() {
final SearcherFactory factory = new SearcherFactory() {
@Override
public void warm(IndexSearcher s) throws IOException {
public IndexSearcher newSearcher(IndexReader r) throws IOException {
try {
awaitEnterWarm.countDown();
awaitClose.await();
if (triedReopen.get()) {
awaitEnterWarm.countDown();
awaitClose.await();
}
} catch (InterruptedException e) {
//
}
return new IndexSearcher(r, es);
}
};
final SearcherManager searcherManager = random.nextBoolean() ? new SearcherManager(dir,
warmer, es) : new SearcherManager(writer, random.nextBoolean(), warmer, es);
final SearcherManager searcherManager = random.nextBoolean()
? new SearcherManager(dir, factory)
: new SearcherManager(writer, random.nextBoolean(), factory);
if (VERBOSE) {
System.out.println("sm created");
}
IndexSearcher searcher = searcherManager.acquire();
try {
assertEquals(1, searcher.getIndexReader().numDocs());
@ -214,20 +228,24 @@ public class TestSearcherManager extends ThreadedIndexingAndSearchingTestCase {
writer.addDocument(new Document());
writer.commit();
final AtomicBoolean success = new AtomicBoolean(false);
final AtomicBoolean triedReopen = new AtomicBoolean(false);
final Throwable[] exc = new Throwable[1];
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
try {
triedReopen.set(true);
if (VERBOSE) {
System.out.println("NOW call maybeReopen");
}
searcherManager.maybeReopen();
success.set(true);
} catch (AlreadyClosedException e) {
// expected
} catch (Throwable e) {
System.out.println("FAIL: unexpected exc");
e.printStackTrace(System.out);
if (VERBOSE) {
System.out.println("FAIL: unexpected exc");
e.printStackTrace(System.out);
}
exc[0] = e;
// use success as the barrier here to make sure we see the write
success.set(false);
@ -236,7 +254,13 @@ public class TestSearcherManager extends ThreadedIndexingAndSearchingTestCase {
}
});
thread.start();
if (VERBOSE) {
System.out.println("THREAD started");
}
awaitEnterWarm.await();
if (VERBOSE) {
System.out.println("NOW call close");
}
searcherManager.close();
awaitClose.countDown();
thread.join();