LUCENE-3445: add SearcherManager to simplify handling of multiple search threads and reopening IndexSearcher

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1175413 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael McCandless 2011-09-25 15:18:13 +00:00
parent 2028a88d4f
commit 4d44198e0d
9 changed files with 1281 additions and 1142 deletions

View File

@ -95,6 +95,11 @@ New Features
implements Iterable<FilterClause>, and allows adding Filters without implements Iterable<FilterClause>, and allows adding Filters without
creating FilterClause. (Uwe Schindler) creating FilterClause. (Uwe Schindler)
* LUCENE-3445: Added SearcherManager, to manage sharing and reopening
IndexSearchers across multiple search threads. IndexReader's
refCount is used to safely close the reader only once all threads are done
using it. (Michael McCandless)
Bug Fixes Bug Fixes
* LUCENE-3417: DictionaryCompoundWordFilter did not properly add tokens from the * LUCENE-3417: DictionaryCompoundWordFilter did not properly add tokens from the

View File

@ -19,16 +19,16 @@ package org.apache.lucene.index;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.List;
import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.index.IndexReader; // javadocs import org.apache.lucene.index.IndexReader; // javadocs
import org.apache.lucene.document.Document;
import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query; import org.apache.lucene.search.Query;
import org.apache.lucene.search.SearcherWarmer;
import org.apache.lucene.util.ThreadInterruptedException; import org.apache.lucene.util.ThreadInterruptedException;
// TODO // TODO
@ -46,7 +46,7 @@ import org.apache.lucene.util.ThreadInterruptedException;
* caller is waiting for a specific generation searcher. </p> * caller is waiting for a specific generation searcher. </p>
* *
* @lucene.experimental * @lucene.experimental
*/ */
public class NRTManager implements Closeable { public class NRTManager implements Closeable {
private final IndexWriter writer; private final IndexWriter writer;
@ -54,36 +54,36 @@ public class NRTManager implements Closeable {
private final AtomicLong indexingGen; private final AtomicLong indexingGen;
private final AtomicLong searchingGen; private final AtomicLong searchingGen;
private final AtomicLong noDeletesSearchingGen; private final AtomicLong noDeletesSearchingGen;
private final SearcherWarmer warmer;
private final List<WaitingListener> waitingListeners = new CopyOnWriteArrayList<WaitingListener>(); private final List<WaitingListener> waitingListeners = new CopyOnWriteArrayList<WaitingListener>();
private volatile IndexSearcher currentSearcher; private volatile IndexSearcher currentSearcher;
private volatile IndexSearcher noDeletesCurrentSearcher; private volatile IndexSearcher noDeletesCurrentSearcher;
/** /**
* Create new NRTManager. Note that this installs a * Create new NRTManager.
* merged segment warmer on the provided IndexWriter's
* config.
* *
* @param writer IndexWriter to open near-real-time * @param writer IndexWriter to open near-real-time
* readers * readers
*/ * @param es optional ExecutorService so different segments can
public NRTManager(IndexWriter writer) throws IOException { * be searched concurrently (see {@link
this(writer, null); * IndexSearcher#IndexSearcher(IndexReader,ExecutorService)}. Pass null
} * to search segments sequentially.
* @param warmer optional {@link SearcherWarmer}. Pass
/** * null if you don't require the searcher to warmed
* Create new NRTManager. Note that this installs a * before going live. If this is non-null then a
* merged segment warmer on the provided IndexWriter's * merged segment warmer is installed on the
* config. * provided IndexWriter's config.
* *
* @param writer IndexWriter to open near-real-time * <p><b>NOTE</b>: the provided {@link SearcherWarmer} is
* readers * not invoked for the initial searcher; you should
* @param es ExecutorService to pass to the IndexSearcher * warm it yourself if necessary.
*/ */
public NRTManager(IndexWriter writer, ExecutorService es) throws IOException { public NRTManager(IndexWriter writer, ExecutorService es, SearcherWarmer warmer) throws IOException {
this.writer = writer; this.writer = writer;
this.es = es; this.es = es;
this.warmer = warmer;
indexingGen = new AtomicLong(1); indexingGen = new AtomicLong(1);
searchingGen = new AtomicLong(-1); searchingGen = new AtomicLong(-1);
noDeletesSearchingGen = new AtomicLong(-1); noDeletesSearchingGen = new AtomicLong(-1);
@ -91,13 +91,15 @@ public class NRTManager implements Closeable {
// Create initial reader: // Create initial reader:
swapSearcher(new IndexSearcher(IndexReader.open(writer, true), es), 0, true); swapSearcher(new IndexSearcher(IndexReader.open(writer, true), es), 0, true);
writer.getConfig().setMergedSegmentWarmer( if (this.warmer != null) {
writer.getConfig().setMergedSegmentWarmer(
new IndexWriter.IndexReaderWarmer() { new IndexWriter.IndexReaderWarmer() {
@Override @Override
public void warm(IndexReader reader) throws IOException { public void warm(IndexReader reader) throws IOException {
NRTManager.this.warm(reader); NRTManager.this.warmer.warm(new IndexSearcher(reader, NRTManager.this.es));
} }
}); });
}
} }
/** NRTManager invokes this interface to notify it when a /** NRTManager invokes this interface to notify it when a
@ -120,25 +122,25 @@ public class NRTManager implements Closeable {
waitingListeners.remove(l); waitingListeners.remove(l);
} }
public long updateDocument(Term t, Document d, Analyzer a) throws IOException { public long updateDocument(Term t, Iterable<? extends IndexableField> d, Analyzer a) throws IOException {
writer.updateDocument(t, d, a); writer.updateDocument(t, d, a);
// Return gen as of when indexing finished: // Return gen as of when indexing finished:
return indexingGen.get(); return indexingGen.get();
} }
public long updateDocument(Term t, Document d) throws IOException { public long updateDocument(Term t, Iterable<? extends IndexableField> d) throws IOException {
writer.updateDocument(t, d); writer.updateDocument(t, d);
// Return gen as of when indexing finished: // Return gen as of when indexing finished:
return indexingGen.get(); return indexingGen.get();
} }
public long updateDocuments(Term t, Iterable<Document> docs, Analyzer a) throws IOException { public long updateDocuments(Term t, Iterable<? extends Iterable<? extends IndexableField>> docs, Analyzer a) throws IOException {
writer.updateDocuments(t, docs, a); writer.updateDocuments(t, docs, a);
// Return gen as of when indexing finished: // Return gen as of when indexing finished:
return indexingGen.get(); return indexingGen.get();
} }
public long updateDocuments(Term t, Iterable<Document> docs) throws IOException { public long updateDocuments(Term t, Iterable<? extends Iterable<? extends IndexableField>> docs) throws IOException {
writer.updateDocuments(t, docs); writer.updateDocuments(t, docs);
// Return gen as of when indexing finished: // Return gen as of when indexing finished:
return indexingGen.get(); return indexingGen.get();
@ -156,25 +158,25 @@ public class NRTManager implements Closeable {
return indexingGen.get(); return indexingGen.get();
} }
public long addDocument(Document d, Analyzer a) throws IOException { public long addDocument(Iterable<? extends IndexableField> d, Analyzer a) throws IOException {
writer.addDocument(d, a); writer.addDocument(d, a);
// Return gen as of when indexing finished: // Return gen as of when indexing finished:
return indexingGen.get(); return indexingGen.get();
} }
public long addDocuments(Iterable<Document> docs, Analyzer a) throws IOException { public long addDocuments(Iterable<? extends Iterable<? extends IndexableField>> docs, Analyzer a) throws IOException {
writer.addDocuments(docs, a); writer.addDocuments(docs, a);
// Return gen as of when indexing finished: // Return gen as of when indexing finished:
return indexingGen.get(); return indexingGen.get();
} }
public long addDocument(Document d) throws IOException { public long addDocument(Iterable<? extends IndexableField> d) throws IOException {
writer.addDocument(d); writer.addDocument(d);
// Return gen as of when indexing finished: // Return gen as of when indexing finished:
return indexingGen.get(); return indexingGen.get();
} }
public long addDocuments(Iterable<Document> docs) throws IOException { public long addDocuments(Iterable<? extends Iterable<? extends IndexableField>> docs) throws IOException {
writer.addDocuments(docs); writer.addDocuments(docs);
// Return gen as of when indexing finished: // Return gen as of when indexing finished:
return indexingGen.get(); return indexingGen.get();
@ -262,7 +264,10 @@ public class NRTManager implements Closeable {
} }
/** Release the searcher obtained from {@link /** Release the searcher obtained from {@link
* #get()} or {@link #get(long)}. */ * #get()} or {@link #get(long)}.
*
* <p><b>NOTE</b>: it's safe to call this after {@link
* #close}. */
public void release(IndexSearcher s) throws IOException { public void release(IndexSearcher s) throws IOException {
s.getIndexReader().decRef(); s.getIndexReader().decRef();
} }
@ -304,23 +309,19 @@ public class NRTManager implements Closeable {
final IndexSearcher startSearcher = noDeletesSearchingGen.get() > searchingGen.get() ? noDeletesCurrentSearcher : currentSearcher; final IndexSearcher startSearcher = noDeletesSearchingGen.get() > searchingGen.get() ? noDeletesCurrentSearcher : currentSearcher;
final IndexReader nextReader = startSearcher.getIndexReader().reopen(writer, applyDeletes); final IndexReader nextReader = startSearcher.getIndexReader().reopen(writer, applyDeletes);
warm(nextReader); final IndexSearcher nextSearcher = new IndexSearcher(nextReader, es);
if (warmer != null) {
warmer.warm(nextSearcher);
}
// Transfer reference to swapSearcher: // Transfer reference to swapSearcher:
swapSearcher(new IndexSearcher(nextReader, es), swapSearcher(nextSearcher,
newSearcherGen, newSearcherGen,
applyDeletes); applyDeletes);
return true; return true;
} }
/** Override this to warm the newly opened reader before
* it's swapped in. Note that this is called both for
* newly merged segments and for new top-level readers
* opened by #reopen. */
protected void warm(IndexReader reader) throws IOException {
}
// Steals a reference from newSearcher: // Steals a reference from newSearcher:
private synchronized void swapSearcher(IndexSearcher newSearcher, long newSearchingGen, boolean applyDeletes) throws IOException { private synchronized void swapSearcher(IndexSearcher newSearcher, long newSearchingGen, boolean applyDeletes) throws IOException {
//System.out.println(Thread.currentThread().getName() + ": swap searcher gen=" + newSearchingGen + " applyDeletes=" + applyDeletes); //System.out.println(Thread.currentThread().getName() + ": swap searcher gen=" + newSearchingGen + " applyDeletes=" + applyDeletes);
@ -350,7 +351,12 @@ public class NRTManager implements Closeable {
//System.out.println(Thread.currentThread().getName() + ": done"); //System.out.println(Thread.currentThread().getName() + ": done");
} }
/** NOTE: caller must separately close the writer. */ /** Close this NRTManager to future searching. Any
* searches still in process in other threads won't be
* affected, and they should still call {@link #release}
* after they are done.
*
* <p><b>NOTE</b>: caller must separately close the writer. */
@Override @Override
public void close() throws IOException { public void close() throws IOException {
swapSearcher(null, indexingGen.getAndIncrement(), true); swapSearcher(null, indexingGen.getAndIncrement(), true);

View File

@ -0,0 +1,201 @@
package org.apache.lucene.search;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.NRTManager; // javadocs
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
/** Utility class to safely share {@link IndexSearcher} instances
* across multiple threads, while periodically reopening.
* This class ensures each IndexSearcher instance is not
* closed until it is no longer needed.
*
* <p>Use {@link #get} to obtain the current searcher, and
* {@link #release} to release it, like this:
*
* <pre>
* IndexSearcher s = manager.get();
* try {
* // Do searching, doc retrieval, etc. with s
* } finally {
* manager.release(s);
* }
* // Do not use s after this!
* s = null;
* </pre>
*
* <p>In addition you should periodically call {@link
* #maybeReopen}. While it's possible to call this just
* before running each query, this is discouraged since it
* penalizes the unlucky queries that do the reopen. It's
* better to use a separate background thread, that
* periodically calls maybeReopen. Finally, be sure to
* call {@link #close} once you are done.
*
* <p><b>NOTE</b>: if you have an {@link IndexWriter}, it's
* better to use {@link NRTManager} since that class pulls
* near-real-time readers from the IndexWriter.
*
* @lucene.experimental
*/
public class SearcherManager implements Closeable {
// Current searcher
private volatile IndexSearcher currentSearcher;
private final SearcherWarmer warmer;
private final AtomicBoolean reopening = new AtomicBoolean();
private final ExecutorService es;
/** Opens an initial searcher from the Directory.
*
* @param dir Directory to open the searcher from
*
* @param warmer optional {@link SearcherWarmer}. Pass
* null if you don't require the searcher to warmed
* before going live.
*
* <p><b>NOTE</b>: the provided {@link SearcherWarmer} is
* not invoked for the initial searcher; you should
* warm it yourself if necessary.
*/
public SearcherManager(Directory dir, SearcherWarmer warmer) throws IOException {
this(dir, warmer, null);
}
/** Opens an initial searcher from the Directory.
*
* @param dir Directory to open the searcher from
*
* @param warmer optional {@link SearcherWarmer}. Pass
* null if you don't require the searcher to warmed
* before going live.
*
* @param es optional ExecutorService so different segments can
* be searched concurrently (see {@link
* IndexSearcher#IndexSearcher(IndexReader,ExecutorService)}. Pass null
* to search segments sequentially.
*
* <p><b>NOTE</b>: the provided {@link SearcherWarmer} is
* not invoked for the initial searcher; you should
* warm it yourself if necessary.
*/
public SearcherManager(Directory dir, SearcherWarmer warmer, ExecutorService es) throws IOException {
this.es = es;
currentSearcher = new IndexSearcher(IndexReader.open(dir), this.es);
this.warmer = warmer;
}
/** You must call this, periodically, to perform a
* reopen. This calls {@link IndexReader#reopen} on the
* underlying reader, and if that returns a new reader,
* it's warmed (if you provided a {@link SearcherWarmer}
* and then swapped into production.
*
* <p><b>Threads</b>: it's fine for more than one thread to
* call this at once. Only the first thread will attempt
* the reopen; subsequent threads will see that another
* thread is already handling reopen and will return
* immediately. Note that this means if another thread
* is already reopening then subsequent threads will
* return right away without waiting for the reader
* reopen to complete.</p>
*
* <p>This method returns true if a new reader was in
* fact opened.</p>
*/
public boolean maybeReopen()
throws IOException {
if (currentSearcher == null) {
throw new AlreadyClosedException("this SearcherManager is closed");
}
// Ensure only 1 thread does reopen at once; other
// threads just return immediately:
if (!reopening.getAndSet(true)) {
try {
IndexReader newReader = currentSearcher.getIndexReader().reopen();
if (newReader != currentSearcher.getIndexReader()) {
IndexSearcher newSearcher = new IndexSearcher(newReader, es);
if (warmer != null) {
warmer.warm(newSearcher);
}
swapSearcher(newSearcher);
return true;
} else {
return false;
}
} finally {
reopening.set(false);
}
} else {
return false;
}
}
/** Obtain the current IndexSearcher. You must match
* every call to get with one call to {@link #release};
* it's best to do so in a finally clause. */
public IndexSearcher get() {
IndexSearcher toReturn = currentSearcher;
if (toReturn == null) {
throw new AlreadyClosedException("this SearcherManager is closed");
}
toReturn.getIndexReader().incRef();
return toReturn;
}
/** Release the searcher previously obtained with {@link
* #get}.
*
* <p><b>NOTE</b>: it's safe to call this after {@link
* #close}. */
public void release(IndexSearcher searcher)
throws IOException {
searcher.getIndexReader().decRef();
}
// Replaces old searcher with new one
private void swapSearcher(IndexSearcher newSearcher)
throws IOException {
IndexSearcher oldSearcher = currentSearcher;
if (oldSearcher == null) {
throw new AlreadyClosedException("this SearcherManager is closed");
}
currentSearcher = newSearcher;
release(oldSearcher);
}
/** Close this SearcherManager to future searching. Any
* searches still in process in other threads won't be
* affected, and they should still call {@link #release}
* after they are done. */
@Override
public void close() throws IOException {
swapSearcher(null);
}
}

View File

@ -0,0 +1,34 @@
package org.apache.lucene.search;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.IOException;
import org.apache.lucene.index.NRTManager; // javadocs
/** Pass an implementation of this to {@link NRTManager} or
* {@link SearcherManager} to warm a new {@link
* IndexSearcher} before it's put into production.
*
* @lucene.experimental */
public interface SearcherWarmer {
// TODO: can we somehow merge this w/ IW's
// IndexReaderWarmer.... should IW switch to this?
public void warm(IndexSearcher s) throws IOException;
}

View File

@ -17,155 +17,160 @@ package org.apache.lucene.index;
* limitations under the License. * limitations under the License.
*/ */
import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lucene.analysis.MockAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.StringField;
import org.apache.lucene.document.TextField;
import org.apache.lucene.index.codecs.CodecProvider;
import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.PhraseQuery; import org.apache.lucene.search.SearcherWarmer;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.TermQuery; import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.store.Directory; import org.apache.lucene.store.Directory;
import org.apache.lucene.store.MockDirectoryWrapper;
import org.apache.lucene.store.NRTCachingDirectory; import org.apache.lucene.store.NRTCachingDirectory;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.LineFileDocs;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.NamedThreadFactory;
import org.apache.lucene.util._TestUtil;
import org.junit.Test;
// TODO public class TestNRTManager extends ThreadedIndexingAndSearchingTestCase {
// - mix in optimize, addIndexes
// - randomoly mix in non-congruent docs
// NOTE: This is a copy of TestNRTThreads, but swapping in private final ThreadLocal<Long> lastGens = new ThreadLocal<Long>();
// NRTManager for adding/updating/searching private boolean warmCalled;
public class TestNRTManager extends LuceneTestCase {
private static class SubDocs {
public final String packID;
public final List<String> subIDs;
public boolean deleted;
public SubDocs(String packID, List<String> subIDs) {
this.packID = packID;
this.subIDs = subIDs;
}
}
// TODO: is there a pre-existing way to do this!!!
private Document cloneDoc(Document doc1) {
final Document doc2 = new Document();
for(IndexableField f : doc1) {
Field field1 = (Field) f;
Field field2 = new Field(field1.name(),
((Field) f).fieldType(),
field1.stringValue());
doc2.add(field2);
}
return doc2;
}
@Test
public void testNRTManager() throws Exception { public void testNRTManager() throws Exception {
runTest("TestNRTManager");
}
final long t0 = System.currentTimeMillis(); @Override
protected IndexSearcher getFinalSearcher() throws Exception {
if (CodecProvider.getDefault().getDefaultFieldCodec().equals("SimpleText")) { if (VERBOSE) {
// no System.out.println("TEST: finalSearcher maxGen=" + maxGen);
CodecProvider.getDefault().setDefaultFieldCodec("Standard");
} }
return nrt.get(maxGen, true);
}
final LineFileDocs docs = new LineFileDocs(random); @Override
final File tempDir = _TestUtil.getTempDir("nrtopenfiles"); protected Directory getDirectory(Directory in) {
final MockDirectoryWrapper _dir = newFSDirectory(tempDir); // Randomly swap in NRTCachingDir
_dir.setCheckIndexOnClose(false); // don't double-checkIndex, we do it ourselves
Directory dir = _dir;
final IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random)).setOpenMode(IndexWriterConfig.OpenMode.CREATE);
if (LuceneTestCase.TEST_NIGHTLY) {
// newIWConfig makes smallish max seg size, which
// results in tons and tons of segments for this test
// when run nightly:
MergePolicy mp = conf.getMergePolicy();
if (mp instanceof TieredMergePolicy) {
((TieredMergePolicy) mp).setMaxMergedSegmentMB(5000.);
} else if (mp instanceof LogByteSizeMergePolicy) {
((LogByteSizeMergePolicy) mp).setMaxMergeMB(1000.);
} else if (mp instanceof LogMergePolicy) {
((LogMergePolicy) mp).setMaxMergeDocs(100000);
}
}
conf.setMergedSegmentWarmer(new IndexWriter.IndexReaderWarmer() {
@Override
public void warm(IndexReader reader) throws IOException {
if (VERBOSE) {
System.out.println("TEST: now warm merged reader=" + reader);
}
final int maxDoc = reader.maxDoc();
final Bits liveDocs = reader.getLiveDocs();
int sum = 0;
final int inc = Math.max(1, maxDoc/50);
for(int docID=0;docID<maxDoc;docID += inc) {
if (liveDocs == null || liveDocs.get(docID)) {
final Document doc = reader.document(docID);
sum += doc.getFields().size();
}
}
IndexSearcher searcher = newSearcher(reader);
sum += searcher.search(new TermQuery(new Term("body", "united")), 10).totalHits;
searcher.close();
if (VERBOSE) {
System.out.println("TEST: warm visited " + sum + " fields");
}
}
});
if (random.nextBoolean()) { if (random.nextBoolean()) {
if (VERBOSE) { if (VERBOSE) {
System.out.println("TEST: wrap NRTCachingDir"); System.out.println("TEST: wrap NRTCachingDir");
} }
dir = new NRTCachingDirectory(dir, 5.0, 60.0); return new NRTCachingDirectory(in, 5.0, 60.0);
} else {
return in;
}
}
@Override
protected void updateDocuments(Term id, List<? extends Iterable<? extends IndexableField>> docs) throws Exception {
final long gen = nrt.updateDocuments(id, docs);
// Randomly verify the update "took":
if (random.nextInt(20) == 2) {
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": nrt: verify " + id);
}
final IndexSearcher s = nrt.get(gen, true);
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s);
}
try {
assertEquals(docs.size(), s.search(new TermQuery(id), 10).totalHits);
} finally {
nrt.release(s);
}
} }
final IndexWriter writer = new IndexWriter(dir, conf); lastGens.set(gen);
}
if (VERBOSE) {
writer.setInfoStream(System.out); @Override
protected void addDocuments(Term id, List<? extends Iterable<? extends IndexableField>> docs) throws Exception {
final long gen = nrt.addDocuments(docs);
// Randomly verify the add "took":
if (random.nextInt(20) == 2) {
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": nrt: verify " + id);
}
final IndexSearcher s = nrt.get(gen, false);
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s);
}
try {
assertEquals(docs.size(), s.search(new TermQuery(id), 10).totalHits);
} finally {
nrt.release(s);
}
} }
_TestUtil.reduceOpenFiles(writer); lastGens.set(gen);
//System.out.println("TEST: conf=" + writer.getConfig()); }
final ExecutorService es = random.nextBoolean() ? null : Executors.newCachedThreadPool(new NamedThreadFactory("NRT search threads")); @Override
protected void addDocument(Term id, Iterable<? extends IndexableField> doc) throws Exception {
final long gen = nrt.addDocument(doc);
// Randomly verify the add "took":
if (random.nextInt(20) == 2) {
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": nrt: verify " + id);
}
final IndexSearcher s = nrt.get(gen, false);
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s);
}
try {
assertEquals(1, s.search(new TermQuery(id), 10).totalHits);
} finally {
nrt.release(s);
}
}
lastGens.set(gen);
}
@Override
protected void updateDocument(Term id, Iterable<? extends IndexableField> doc) throws Exception {
final long gen = nrt.updateDocument(id, doc);
// Randomly verify the udpate "took":
if (random.nextInt(20) == 2) {
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": nrt: verify " + id);
}
final IndexSearcher s = nrt.get(gen, true);
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s);
}
try {
assertEquals(1, s.search(new TermQuery(id), 10).totalHits);
} finally {
nrt.release(s);
}
}
lastGens.set(gen);
}
@Override
protected void deleteDocuments(Term id) throws Exception {
final long gen = nrt.deleteDocuments(id);
// randomly verify the delete "took":
if (random.nextInt(20) == 7) {
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": nrt: verify del " + id);
}
final IndexSearcher s = nrt.get(gen, true);
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s);
}
try {
assertEquals(0, s.search(new TermQuery(id), 10).totalHits);
} finally {
nrt.release(s);
}
}
lastGens.set(gen);
}
private NRTManager nrt;
private NRTManagerReopenThread nrtThread;
@Override
protected void doAfterWriter(ExecutorService es) throws Exception {
final double minReopenSec = 0.01 + 0.05 * random.nextDouble(); final double minReopenSec = 0.01 + 0.05 * random.nextDouble();
final double maxReopenSec = minReopenSec * (1.0 + 10 * random.nextDouble()); final double maxReopenSec = minReopenSec * (1.0 + 10 * random.nextDouble());
@ -173,506 +178,57 @@ public class TestNRTManager extends LuceneTestCase {
System.out.println("TEST: make NRTManager maxReopenSec=" + maxReopenSec + " minReopenSec=" + minReopenSec); System.out.println("TEST: make NRTManager maxReopenSec=" + maxReopenSec + " minReopenSec=" + minReopenSec);
} }
final NRTManager nrt = new NRTManager(writer, es); nrt = new NRTManager(writer, es,
final NRTManagerReopenThread nrtThread = new NRTManagerReopenThread(nrt, maxReopenSec, minReopenSec); new SearcherWarmer() {
@Override
public void warm(IndexSearcher s) throws IOException {
TestNRTManager.this.warmCalled = true;
s.search(new TermQuery(new Term("body", "united")), 10);
}
});
nrtThread = new NRTManagerReopenThread(nrt, maxReopenSec, minReopenSec);
nrtThread.setName("NRT Reopen Thread"); nrtThread.setName("NRT Reopen Thread");
nrtThread.setPriority(Math.min(Thread.currentThread().getPriority()+2, Thread.MAX_PRIORITY)); nrtThread.setPriority(Math.min(Thread.currentThread().getPriority()+2, Thread.MAX_PRIORITY));
nrtThread.setDaemon(true); nrtThread.setDaemon(true);
nrtThread.start(); nrtThread.start();
}
final int NUM_INDEX_THREADS = _TestUtil.nextInt(random, 1, 3); @Override
final int NUM_SEARCH_THREADS = _TestUtil.nextInt(random, 1, 3); protected void doAfterIndexingThreadDone() {
//final int NUM_INDEX_THREADS = 1; Long gen = lastGens.get();
//final int NUM_SEARCH_THREADS = 1; if (gen != null) {
if (VERBOSE) { addMaxGen(gen);
System.out.println("TEST: " + NUM_INDEX_THREADS + " index threads; " + NUM_SEARCH_THREADS + " search threads");
} }
}
final int RUN_TIME_SEC = LuceneTestCase.TEST_NIGHTLY ? 300 : RANDOM_MULTIPLIER; private long maxGen = -1;
final AtomicBoolean failed = new AtomicBoolean(); private synchronized void addMaxGen(long gen) {
final AtomicInteger addCount = new AtomicInteger(); maxGen = Math.max(gen, maxGen);
final AtomicInteger delCount = new AtomicInteger(); }
final AtomicInteger packCount = new AtomicInteger();
final List<Long> lastGens = new ArrayList<Long>();
final Set<String> delIDs = Collections.synchronizedSet(new HashSet<String>()); @Override
final List<SubDocs> allSubDocs = Collections.synchronizedList(new ArrayList<SubDocs>()); protected void doSearching(ExecutorService es, long stopTime) throws Exception {
runSearchThreads(stopTime);
}
final long stopTime = System.currentTimeMillis() + RUN_TIME_SEC*1000; @Override
Thread[] threads = new Thread[NUM_INDEX_THREADS]; protected IndexSearcher getCurrentSearcher() throws Exception {
for(int thread=0;thread<NUM_INDEX_THREADS;thread++) { return nrt.get(random.nextBoolean());
threads[thread] = new Thread() { }
@Override
public void run() {
// TODO: would be better if this were cross thread, so that we make sure one thread deleting anothers added docs works:
final List<String> toDeleteIDs = new ArrayList<String>();
final List<SubDocs> toDeleteSubDocs = new ArrayList<SubDocs>();
long gen = 0; @Override
while(System.currentTimeMillis() < stopTime && !failed.get()) { protected void releaseSearcher(IndexSearcher s) throws Exception {
//System.out.println(Thread.currentThread().getName() + ": cycle");
try {
// Occassional longish pause if running
// nightly
if (LuceneTestCase.TEST_NIGHTLY && random.nextInt(6) == 3) {
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": now long sleep");
}
Thread.sleep(_TestUtil.nextInt(random, 50, 500));
}
// Rate limit ingest rate:
Thread.sleep(_TestUtil.nextInt(random, 1, 10));
if (VERBOSE) {
System.out.println(Thread.currentThread() + ": done sleep");
}
Document doc = docs.nextDoc();
if (doc == null) {
break;
}
final String addedField;
if (random.nextBoolean()) {
addedField = "extra" + random.nextInt(10);
doc.add(new TextField(addedField, "a random field"));
} else {
addedField = null;
}
if (random.nextBoolean()) {
if (random.nextBoolean()) {
// Add a pack of adjacent sub-docs
final String packID;
final SubDocs delSubDocs;
if (toDeleteSubDocs.size() > 0 && random.nextBoolean()) {
delSubDocs = toDeleteSubDocs.get(random.nextInt(toDeleteSubDocs.size()));
assert !delSubDocs.deleted;
toDeleteSubDocs.remove(delSubDocs);
// reuse prior packID
packID = delSubDocs.packID;
} else {
delSubDocs = null;
// make new packID
packID = packCount.getAndIncrement() + "";
}
final Field packIDField = newField("packID", packID, StringField.TYPE_STORED);
final List<String> docIDs = new ArrayList<String>();
final SubDocs subDocs = new SubDocs(packID, docIDs);
final List<Document> docsList = new ArrayList<Document>();
allSubDocs.add(subDocs);
doc.add(packIDField);
docsList.add(cloneDoc(doc));
docIDs.add(doc.get("docid"));
final int maxDocCount = _TestUtil.nextInt(random, 1, 10);
while(docsList.size() < maxDocCount) {
doc = docs.nextDoc();
if (doc == null) {
break;
}
docsList.add(cloneDoc(doc));
docIDs.add(doc.get("docid"));
}
addCount.addAndGet(docsList.size());
if (delSubDocs != null) {
delSubDocs.deleted = true;
delIDs.addAll(delSubDocs.subIDs);
delCount.addAndGet(delSubDocs.subIDs.size());
if (VERBOSE) {
System.out.println("TEST: update pack packID=" + delSubDocs.packID + " count=" + docsList.size() + " docs=" + docIDs);
}
gen = nrt.updateDocuments(new Term("packID", delSubDocs.packID), docsList);
/*
// non-atomic:
nrt.deleteDocuments(new Term("packID", delSubDocs.packID));
for(Document subDoc : docsList) {
nrt.addDocument(subDoc);
}
*/
} else {
if (VERBOSE) {
System.out.println("TEST: add pack packID=" + packID + " count=" + docsList.size() + " docs=" + docIDs);
}
gen = nrt.addDocuments(docsList);
/*
// non-atomic:
for(Document subDoc : docsList) {
nrt.addDocument(subDoc);
}
*/
}
doc.removeField("packID");
if (random.nextInt(5) == 2) {
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": buffer del id:" + packID);
}
toDeleteSubDocs.add(subDocs);
}
// randomly verify the add/update "took":
if (random.nextInt(20) == 2) {
final boolean applyDeletes = delSubDocs != null;
final IndexSearcher s = nrt.get(gen, applyDeletes);
try {
assertEquals(docsList.size(), s.search(new TermQuery(new Term("packID", packID)), 10).totalHits);
} finally {
nrt.release(s);
}
}
} else {
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": add doc docid:" + doc.get("docid"));
}
gen = nrt.addDocument(doc);
addCount.getAndIncrement();
// randomly verify the add "took":
if (random.nextInt(20) == 2) {
//System.out.println(Thread.currentThread().getName() + ": verify");
final IndexSearcher s = nrt.get(gen, false);
//System.out.println(Thread.currentThread().getName() + ": got s=" + s);
try {
assertEquals(1, s.search(new TermQuery(new Term("docid", doc.get("docid"))), 10).totalHits);
} finally {
nrt.release(s);
}
//System.out.println(Thread.currentThread().getName() + ": done verify");
}
if (random.nextInt(5) == 3) {
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": buffer del id:" + doc.get("docid"));
}
toDeleteIDs.add(doc.get("docid"));
}
}
} else {
// we use update but it never replaces a
// prior doc
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": update doc id:" + doc.get("docid"));
}
gen = nrt.updateDocument(new Term("docid", doc.get("docid")), doc);
addCount.getAndIncrement();
// randomly verify the add "took":
if (random.nextInt(20) == 2) {
final IndexSearcher s = nrt.get(gen, true);
try {
assertEquals(1, s.search(new TermQuery(new Term("docid", doc.get("docid"))), 10).totalHits);
} finally {
nrt.release(s);
}
}
if (random.nextInt(5) == 3) {
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": buffer del id:" + doc.get("docid"));
}
toDeleteIDs.add(doc.get("docid"));
}
}
if (random.nextInt(30) == 17) {
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": apply " + toDeleteIDs.size() + " deletes");
}
for(String id : toDeleteIDs) {
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": del term=id:" + id);
}
gen = nrt.deleteDocuments(new Term("docid", id));
// randomly verify the delete "took":
if (random.nextInt(20) == 7) {
final IndexSearcher s = nrt.get(gen, true);
try {
assertEquals(0, s.search(new TermQuery(new Term("docid", id)), 10).totalHits);
} finally {
nrt.release(s);
}
}
}
final int count = delCount.addAndGet(toDeleteIDs.size());
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": tot " + count + " deletes");
}
delIDs.addAll(toDeleteIDs);
toDeleteIDs.clear();
for(SubDocs subDocs : toDeleteSubDocs) {
assertTrue(!subDocs.deleted);
gen = nrt.deleteDocuments(new Term("packID", subDocs.packID));
subDocs.deleted = true;
if (VERBOSE) {
System.out.println(" del subs: " + subDocs.subIDs + " packID=" + subDocs.packID);
}
delIDs.addAll(subDocs.subIDs);
delCount.addAndGet(subDocs.subIDs.size());
// randomly verify the delete "took":
if (random.nextInt(20) == 7) {
final IndexSearcher s = nrt.get(gen, true);
try {
assertEquals(0, s.search(new TermQuery(new Term("packID", subDocs.packID)), 1).totalHits);
} finally {
nrt.release(s);
}
}
}
toDeleteSubDocs.clear();
}
if (addedField != null) {
doc.removeField(addedField);
}
} catch (Throwable t) {
System.out.println(Thread.currentThread().getName() + ": FAILED: hit exc");
t.printStackTrace();
failed.set(true);
throw new RuntimeException(t);
}
}
lastGens.add(gen);
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": indexing done");
}
}
};
threads[thread].setDaemon(true);
threads[thread].start();
}
if (VERBOSE) {
System.out.println("TEST: DONE start indexing threads [" + (System.currentTimeMillis()-t0) + " ms]");
}
// let index build up a bit
Thread.sleep(100);
// silly starting guess:
final AtomicInteger totTermCount = new AtomicInteger(100);
// run search threads
final Thread[] searchThreads = new Thread[NUM_SEARCH_THREADS];
final AtomicInteger totHits = new AtomicInteger();
if (VERBOSE) {
System.out.println("TEST: start search threads");
}
for(int thread=0;thread<NUM_SEARCH_THREADS;thread++) {
searchThreads[thread] = new Thread() {
@Override
public void run() {
while(System.currentTimeMillis() < stopTime && !failed.get()) {
final IndexSearcher s = nrt.get(random.nextBoolean());
try {
try {
smokeTestSearcher(s);
if (s.getIndexReader().numDocs() > 0) {
Fields fields = MultiFields.getFields(s.getIndexReader());
if (fields == null) {
continue;
}
Terms terms = fields.terms("body");
if (terms == null) {
continue;
}
TermsEnum termsEnum = terms.iterator();
int seenTermCount = 0;
int shift;
int trigger;
if (totTermCount.get() < 10) {
shift = 0;
trigger = 1;
} else {
trigger = totTermCount.get()/10;
shift = random.nextInt(trigger);
}
while(System.currentTimeMillis() < stopTime) {
BytesRef term = termsEnum.next();
if (term == null) {
if (seenTermCount == 0) {
break;
}
totTermCount.set(seenTermCount);
seenTermCount = 0;
if (totTermCount.get() < 10) {
shift = 0;
trigger = 1;
} else {
trigger = totTermCount.get()/10;
//System.out.println("trigger " + trigger);
shift = random.nextInt(trigger);
}
termsEnum.seekCeil(new BytesRef(""));
continue;
}
seenTermCount++;
// search 10 terms
if (trigger == 0) {
trigger = 1;
}
if ((seenTermCount + shift) % trigger == 0) {
//if (VERBOSE) {
//System.out.println(Thread.currentThread().getName() + " now search body:" + term.utf8ToString());
//}
totHits.addAndGet(runQuery(s, new TermQuery(new Term("body", term))));
}
}
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": search done");
}
}
} finally {
nrt.release(s);
}
} catch (Throwable t) {
System.out.println(Thread.currentThread().getName() + ": FAILED: hit exc");
failed.set(true);
t.printStackTrace(System.out);
throw new RuntimeException(t);
}
}
}
};
searchThreads[thread].setDaemon(true);
searchThreads[thread].start();
}
if (VERBOSE) {
System.out.println("TEST: now join");
}
for(int thread=0;thread<NUM_INDEX_THREADS;thread++) {
threads[thread].join();
}
for(int thread=0;thread<NUM_SEARCH_THREADS;thread++) {
searchThreads[thread].join();
}
if (VERBOSE) {
System.out.println("TEST: done join [" + (System.currentTimeMillis()-t0) + " ms]; addCount=" + addCount + " delCount=" + delCount);
System.out.println("TEST: search totHits=" + totHits);
}
long maxGen = 0;
for(long gen : lastGens) {
maxGen = Math.max(maxGen, gen);
}
final IndexSearcher s = nrt.get(maxGen, true);
boolean doFail = false;
for(String id : delIDs) {
final TopDocs hits = s.search(new TermQuery(new Term("docid", id)), 1);
if (hits.totalHits != 0) {
System.out.println("doc id=" + id + " is supposed to be deleted, but got docID=" + hits.scoreDocs[0].doc);
doFail = true;
}
}
// Make sure each group of sub-docs are still in docID order:
for(SubDocs subDocs : allSubDocs) {
if (!subDocs.deleted) {
// We sort by relevance but the scores should be identical so sort falls back to by docID:
TopDocs hits = s.search(new TermQuery(new Term("packID", subDocs.packID)), 20);
assertEquals(subDocs.subIDs.size(), hits.totalHits);
int lastDocID = -1;
int startDocID = -1;
for(ScoreDoc scoreDoc : hits.scoreDocs) {
final int docID = scoreDoc.doc;
if (lastDocID != -1) {
assertEquals(1+lastDocID, docID);
} else {
startDocID = docID;
}
lastDocID = docID;
final Document doc = s.doc(docID);
assertEquals(subDocs.packID, doc.get("packID"));
}
lastDocID = startDocID - 1;
for(String subID : subDocs.subIDs) {
hits = s.search(new TermQuery(new Term("docid", subID)), 1);
assertEquals(1, hits.totalHits);
final int docID = hits.scoreDocs[0].doc;
if (lastDocID != -1) {
assertEquals(1+lastDocID, docID);
}
lastDocID = docID;
}
} else {
for(String subID : subDocs.subIDs) {
assertEquals(0, s.search(new TermQuery(new Term("docid", subID)), 1).totalHits);
}
}
}
final int endID = Integer.parseInt(docs.nextDoc().get("docid"));
for(int id=0;id<endID;id++) {
String stringID = ""+id;
if (!delIDs.contains(stringID)) {
final TopDocs hits = s.search(new TermQuery(new Term("docid", stringID)), 1);
if (hits.totalHits != 1) {
System.out.println("doc id=" + stringID + " is not supposed to be deleted, but got hitCount=" + hits.totalHits);
doFail = true;
}
}
}
assertFalse(doFail);
assertEquals("index=" + writer.segString() + " addCount=" + addCount + " delCount=" + delCount, addCount.get() - delCount.get(), s.getIndexReader().numDocs());
nrt.release(s); nrt.release(s);
}
if (es != null) { @Override
es.shutdown(); protected void doClose() throws Exception {
es.awaitTermination(1, TimeUnit.SECONDS); assertTrue(warmCalled);
}
writer.commit();
assertEquals("index=" + writer.segString() + " addCount=" + addCount + " delCount=" + delCount, addCount.get() - delCount.get(), writer.numDocs());
if (VERBOSE) { if (VERBOSE) {
System.out.println("TEST: now close NRTManager"); System.out.println("TEST: now close NRTManager");
} }
nrtThread.close(); nrtThread.close();
nrt.close(); nrt.close();
assertFalse(writer.anyNonBulkMerges);
writer.close(false);
_TestUtil.checkIndex(dir);
dir.close();
_TestUtil.rmDir(tempDir);
docs.close();
if (VERBOSE) {
System.out.println("TEST: done [" + (System.currentTimeMillis()-t0) + " ms]");
}
}
private int runQuery(IndexSearcher s, Query q) throws Exception {
s.search(q, 10);
return s.search(q, null, 10, new Sort(new SortField("title", SortField.Type.STRING))).totalHits;
}
private void smokeTestSearcher(IndexSearcher s) throws Exception {
runQuery(s, new TermQuery(new Term("body", "united")));
runQuery(s, new TermQuery(new Term("titleTokenized", "states")));
PhraseQuery pq = new PhraseQuery();
pq.add(new Term("body", "united"));
pq.add(new Term("body", "states"));
runQuery(s, pq);
} }
} }

View File

@ -0,0 +1,113 @@
package org.apache.lucene.search;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.ThreadedIndexingAndSearchingTestCase;
import org.apache.lucene.util._TestUtil;
public class TestSearcherManager extends ThreadedIndexingAndSearchingTestCase {
boolean warmCalled;
public void testSearcherManager() throws Exception {
runTest("TestSearcherManager");
}
@Override
protected IndexSearcher getFinalSearcher() throws Exception {
writer.commit();
mgr.maybeReopen();
return mgr.get();
}
private SearcherManager mgr;
@Override
protected void doAfterWriter(ExecutorService es) throws Exception {
// SearcherManager needs to see empty commit:
writer.commit();
mgr = new SearcherManager(dir,
new SearcherWarmer() {
@Override
public void warm(IndexSearcher s) throws IOException {
TestSearcherManager.this.warmCalled = true;
s.search(new TermQuery(new Term("body", "united")), 10);
}
}, es);
}
@Override
protected void doSearching(ExecutorService es, final long stopTime) throws Exception {
Thread reopenThread = new Thread() {
@Override
public void run() {
try {
while(System.currentTimeMillis() < stopTime) {
Thread.sleep(_TestUtil.nextInt(random, 1, 100));
writer.commit();
Thread.sleep(_TestUtil.nextInt(random, 1, 5));
mgr.maybeReopen();
}
} catch (Throwable t) {
System.out.println("TEST: reopen thread hit exc");
t.printStackTrace(System.out);
failed.set(true);
throw new RuntimeException(t);
}
}
};
reopenThread.setDaemon(true);
reopenThread.start();
runSearchThreads(stopTime);
reopenThread.join();
}
@Override
protected IndexSearcher getCurrentSearcher() throws Exception {
if (random.nextInt(10) == 7) {
// NOTE: not best practice to call maybeReopen
// synchronous to your search threads, but still we
// test as apps will presumably do this for
// simplicity:
mgr.maybeReopen();
}
return mgr.get();
}
@Override
protected void releaseSearcher(IndexSearcher s) throws Exception {
mgr.release(s);
}
@Override
protected void doClose() throws Exception {
assertTrue(warmCalled);
if (VERBOSE) {
System.out.println("TEST: now close SearcherManager");
}
mgr.close();
}
}

View File

@ -0,0 +1,652 @@
package org.apache.lucene.index;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lucene.analysis.MockAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.StringField;
import org.apache.lucene.document.TextField;
import org.apache.lucene.index.codecs.CodecProvider;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.PhraseQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.MockDirectoryWrapper;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.LineFileDocs;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.NamedThreadFactory;
import org.apache.lucene.util._TestUtil;
// TODO
// - mix in optimize, addIndexes
// - randomoly mix in non-congruent docs
/** Utility class that spawns multiple indexing and
* searching threads. */
public abstract class ThreadedIndexingAndSearchingTestCase extends LuceneTestCase {
protected final AtomicBoolean failed = new AtomicBoolean();
protected final AtomicInteger addCount = new AtomicInteger();
protected final AtomicInteger delCount = new AtomicInteger();
protected final AtomicInteger packCount = new AtomicInteger();
protected Directory dir;
protected IndexWriter writer;
private static class SubDocs {
public final String packID;
public final List<String> subIDs;
public boolean deleted;
public SubDocs(String packID, List<String> subIDs) {
this.packID = packID;
this.subIDs = subIDs;
}
}
// Called per-search
protected abstract IndexSearcher getCurrentSearcher() throws Exception;
protected abstract IndexSearcher getFinalSearcher() throws Exception;
protected void releaseSearcher(IndexSearcher s) throws Exception {
}
// Called once to run searching
protected abstract void doSearching(ExecutorService es, long stopTime) throws Exception;
protected Directory getDirectory(Directory in) {
return in;
}
protected void updateDocuments(Term id, List<? extends Iterable<? extends IndexableField>> docs) throws Exception {
writer.updateDocuments(id, docs);
}
protected void addDocuments(Term id, List<? extends Iterable<? extends IndexableField>> docs) throws Exception {
writer.addDocuments(docs);
}
protected void addDocument(Term id, Iterable<? extends IndexableField> doc) throws Exception {
writer.addDocument(doc);
}
protected void updateDocument(Term term, Iterable<? extends IndexableField> doc) throws Exception {
writer.updateDocument(term, doc);
}
protected void deleteDocuments(Term term) throws Exception {
writer.deleteDocuments(term);
}
protected void doAfterIndexingThreadDone() {
}
private Thread[] launchIndexingThreads(final LineFileDocs docs,
int numThreads,
final long stopTime,
final Set<String> delIDs,
final Set<String> delPackIDs,
final List<SubDocs> allSubDocs)
throws Exception {
final Thread[] threads = new Thread[numThreads];
for(int thread=0;thread<numThreads;thread++) {
threads[thread] = new Thread() {
@Override
public void run() {
// TODO: would be better if this were cross thread, so that we make sure one thread deleting anothers added docs works:
final List<String> toDeleteIDs = new ArrayList<String>();
final List<SubDocs> toDeleteSubDocs = new ArrayList<SubDocs>();
while(System.currentTimeMillis() < stopTime && !failed.get()) {
try {
// Occasional longish pause if running
// nightly
if (LuceneTestCase.TEST_NIGHTLY && random.nextInt(6) == 3) {
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": now long sleep");
}
Thread.sleep(_TestUtil.nextInt(random, 50, 500));
}
// Rate limit ingest rate:
if (random.nextInt(7) == 5) {
Thread.sleep(_TestUtil.nextInt(random, 1, 10));
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": done sleep");
}
}
Document doc = docs.nextDoc();
if (doc == null) {
break;
}
// Maybe add randomly named field
final String addedField;
if (random.nextBoolean()) {
addedField = "extra" + random.nextInt(40);
doc.add(newField(addedField, "a random field", TextField.TYPE_STORED));
} else {
addedField = null;
}
if (random.nextBoolean()) {
if (random.nextBoolean()) {
// Add/update doc block:
final String packID;
final SubDocs delSubDocs;
if (toDeleteSubDocs.size() > 0 && random.nextBoolean()) {
delSubDocs = toDeleteSubDocs.get(random.nextInt(toDeleteSubDocs.size()));
assert !delSubDocs.deleted;
toDeleteSubDocs.remove(delSubDocs);
// Update doc block, replacing prior packID
packID = delSubDocs.packID;
} else {
delSubDocs = null;
// Add doc block, using new packID
packID = packCount.getAndIncrement() + "";
}
final Field packIDField = newField("packID", packID, StringField.TYPE_STORED);
final List<String> docIDs = new ArrayList<String>();
final SubDocs subDocs = new SubDocs(packID, docIDs);
final List<Document> docsList = new ArrayList<Document>();
allSubDocs.add(subDocs);
doc.add(packIDField);
docsList.add(_TestUtil.cloneDocument(doc));
docIDs.add(doc.get("docid"));
final int maxDocCount = _TestUtil.nextInt(random, 1, 10);
while(docsList.size() < maxDocCount) {
doc = docs.nextDoc();
if (doc == null) {
break;
}
docsList.add(_TestUtil.cloneDocument(doc));
docIDs.add(doc.get("docid"));
}
addCount.addAndGet(docsList.size());
final Term packIDTerm = new Term("packID", packID);
if (delSubDocs != null) {
delSubDocs.deleted = true;
delIDs.addAll(delSubDocs.subIDs);
delCount.addAndGet(delSubDocs.subIDs.size());
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": update pack packID=" + delSubDocs.packID + " count=" + docsList.size() + " docs=" + docIDs);
}
updateDocuments(packIDTerm, docsList);
} else {
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": add pack packID=" + packID + " count=" + docsList.size() + " docs=" + docIDs);
}
addDocuments(packIDTerm, docsList);
}
doc.removeField("packID");
if (random.nextInt(5) == 2) {
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": buffer del id:" + packID);
}
toDeleteSubDocs.add(subDocs);
}
} else {
// Add single doc
final String docid = doc.get("docid");
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": add doc docid:" + docid);
}
addDocument(new Term("docid", docid), doc);
addCount.getAndIncrement();
if (random.nextInt(5) == 3) {
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": buffer del id:" + doc.get("docid"));
}
toDeleteIDs.add(docid);
}
}
} else {
// Update single doc, but we never re-use
// and ID so the delete will never
// actually happen:
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": update doc id:" + doc.get("docid"));
}
final String docid = doc.get("docid");
updateDocument(new Term("docid", docid), doc);
addCount.getAndIncrement();
if (random.nextInt(5) == 3) {
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": buffer del id:" + doc.get("docid"));
}
toDeleteIDs.add(docid);
}
}
if (random.nextInt(30) == 17) {
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": apply " + toDeleteIDs.size() + " deletes");
}
for(String id : toDeleteIDs) {
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": del term=id:" + id);
}
deleteDocuments(new Term("docid", id));
}
final int count = delCount.addAndGet(toDeleteIDs.size());
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": tot " + count + " deletes");
}
delIDs.addAll(toDeleteIDs);
toDeleteIDs.clear();
for(SubDocs subDocs : toDeleteSubDocs) {
assert !subDocs.deleted;
delPackIDs.add(subDocs.packID);
deleteDocuments(new Term("packID", subDocs.packID));
subDocs.deleted = true;
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": del subs: " + subDocs.subIDs + " packID=" + subDocs.packID);
}
delIDs.addAll(subDocs.subIDs);
delCount.addAndGet(subDocs.subIDs.size());
}
toDeleteSubDocs.clear();
}
if (addedField != null) {
doc.removeField(addedField);
}
} catch (Throwable t) {
System.out.println(Thread.currentThread().getName() + ": hit exc");
t.printStackTrace();
failed.set(true);
throw new RuntimeException(t);
}
}
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": indexing done");
}
doAfterIndexingThreadDone();
}
};
threads[thread].setDaemon(true);
threads[thread].start();
}
return threads;
}
protected void runSearchThreads(final long stopTimeMS) throws Exception {
final int numThreads = _TestUtil.nextInt(random, 1, 5);
final Thread[] searchThreads = new Thread[numThreads];
final AtomicInteger totHits = new AtomicInteger();
// silly starting guess:
final AtomicInteger totTermCount = new AtomicInteger(100);
// TODO: we should enrich this to do more interesting searches
for(int thread=0;thread<searchThreads.length;thread++) {
searchThreads[thread] = new Thread() {
@Override
public void run() {
while (System.currentTimeMillis() < stopTimeMS) {
try {
final IndexSearcher s = getCurrentSearcher();
try {
if (s.getIndexReader().numDocs() > 0) {
smokeTestSearcher(s);
Fields fields = MultiFields.getFields(s.getIndexReader());
if (fields == null) {
continue;
}
Terms terms = fields.terms("body");
if (terms == null) {
continue;
}
TermsEnum termsEnum = terms.iterator();
int seenTermCount = 0;
int shift;
int trigger;
if (totTermCount.get() < 10) {
shift = 0;
trigger = 1;
} else {
trigger = totTermCount.get()/10;
shift = random.nextInt(trigger);
}
BytesRef term = termsEnum.next();
if (term == null) {
if (seenTermCount == 0) {
break;
}
totTermCount.set(seenTermCount);
seenTermCount = 0;
if (totTermCount.get() < 10) {
shift = 0;
trigger = 1;
} else {
trigger = totTermCount.get()/10;
//System.out.println("trigger " + trigger);
shift = random.nextInt(trigger);
}
termsEnum.seekCeil(new BytesRef(""));
continue;
}
seenTermCount++;
// search 10 terms
if (trigger == 0) {
trigger = 1;
}
if ((seenTermCount + shift) % trigger == 0) {
//if (VERBOSE) {
//System.out.println(Thread.currentThread().getName() + " now search body:" + term.utf8ToString());
//}
totHits.addAndGet(runQuery(s, new TermQuery(new Term("body", term))));
}
//if (VERBOSE) {
//System.out.println(Thread.currentThread().getName() + ": search done");
//}
}
} finally {
releaseSearcher(s);
}
} catch (Throwable t) {
System.out.println(Thread.currentThread().getName() + ": hit exc");
failed.set(true);
t.printStackTrace(System.out);
throw new RuntimeException(t);
}
}
}
};
searchThreads[thread].setDaemon(true);
searchThreads[thread].start();
}
for(int thread=0;thread<searchThreads.length;thread++) {
searchThreads[thread].join();
}
if (VERBOSE) {
System.out.println("TEST: DONE search: totHits=" + totHits);
}
}
protected void doAfterWriter(ExecutorService es) throws Exception {
}
protected void doClose() throws Exception {
}
public void runTest(String testName) throws Exception {
failed.set(false);
addCount.set(0);
delCount.set(0);
packCount.set(0);
final long t0 = System.currentTimeMillis();
final String defaultCodec = CodecProvider.getDefault().getDefaultFieldCodec();
if (defaultCodec.equals("SimpleText") || defaultCodec.equals("Memory")) {
// no
CodecProvider.getDefault().setDefaultFieldCodec("Standard");
}
final LineFileDocs docs = new LineFileDocs(random);
final File tempDir = _TestUtil.getTempDir(testName);
dir = newFSDirectory(tempDir);
((MockDirectoryWrapper) dir).setCheckIndexOnClose(false); // don't double-checkIndex, we do it ourselves.
final IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random));
if (LuceneTestCase.TEST_NIGHTLY) {
// newIWConfig makes smallish max seg size, which
// results in tons and tons of segments for this test
// when run nightly:
MergePolicy mp = conf.getMergePolicy();
if (mp instanceof TieredMergePolicy) {
((TieredMergePolicy) mp).setMaxMergedSegmentMB(5000.);
} else if (mp instanceof LogByteSizeMergePolicy) {
((LogByteSizeMergePolicy) mp).setMaxMergeMB(1000.);
} else if (mp instanceof LogMergePolicy) {
((LogMergePolicy) mp).setMaxMergeDocs(100000);
}
}
conf.setMergedSegmentWarmer(new IndexWriter.IndexReaderWarmer() {
@Override
public void warm(IndexReader reader) throws IOException {
if (VERBOSE) {
System.out.println("TEST: now warm merged reader=" + reader);
}
final int maxDoc = reader.maxDoc();
final Bits liveDocs = reader.getLiveDocs();
int sum = 0;
final int inc = Math.max(1, maxDoc/50);
for(int docID=0;docID<maxDoc;docID += inc) {
if (liveDocs == null || liveDocs.get(docID)) {
final Document doc = reader.document(docID);
sum += doc.getFields().size();
}
}
IndexSearcher searcher = newSearcher(reader);
sum += searcher.search(new TermQuery(new Term("body", "united")), 10).totalHits;
searcher.close();
if (VERBOSE) {
System.out.println("TEST: warm visited " + sum + " fields");
}
}
});
writer = new IndexWriter(dir, conf);
if (VERBOSE) {
writer.setInfoStream(System.out);
}
_TestUtil.reduceOpenFiles(writer);
final ExecutorService es = random.nextBoolean() ? null : Executors.newCachedThreadPool(new NamedThreadFactory(testName));
doAfterWriter(es);
final int NUM_INDEX_THREADS = _TestUtil.nextInt(random, 2, 4);
final int RUN_TIME_SEC = LuceneTestCase.TEST_NIGHTLY ? 300 : RANDOM_MULTIPLIER;
final Set<String> delIDs = Collections.synchronizedSet(new HashSet<String>());
final Set<String> delPackIDs = Collections.synchronizedSet(new HashSet<String>());
final List<SubDocs> allSubDocs = Collections.synchronizedList(new ArrayList<SubDocs>());
final long stopTime = System.currentTimeMillis() + RUN_TIME_SEC*1000;
final Thread[] indexThreads = launchIndexingThreads(docs, NUM_INDEX_THREADS, stopTime, delIDs, delPackIDs, allSubDocs);
if (VERBOSE) {
System.out.println("TEST: DONE start indexing threads [" + (System.currentTimeMillis()-t0) + " ms]");
}
// Let index build up a bit
Thread.sleep(100);
doSearching(es, stopTime);
if (VERBOSE) {
System.out.println("TEST: all searching done [" + (System.currentTimeMillis()-t0) + " ms]");
}
for(int thread=0;thread<indexThreads.length;thread++) {
indexThreads[thread].join();
}
if (VERBOSE) {
System.out.println("TEST: done join indexing threads [" + (System.currentTimeMillis()-t0) + " ms]; addCount=" + addCount + " delCount=" + delCount);
}
final IndexSearcher s = getFinalSearcher();
if (VERBOSE) {
System.out.println("TEST: finalSearcher=" + s);
}
boolean doFail = false;
// Verify: make sure delIDs are in fact deleted:
for(String id : delIDs) {
final TopDocs hits = s.search(new TermQuery(new Term("docid", id)), 1);
if (hits.totalHits != 0) {
System.out.println("doc id=" + id + " is supposed to be deleted, but got " + hits.totalHits + " hits; first docID=" + hits.scoreDocs[0].doc);
doFail = true;
}
}
// Verify: make sure delPackIDs are in fact deleted:
for(String id : delPackIDs) {
final TopDocs hits = s.search(new TermQuery(new Term("packID", id)), 1);
if (hits.totalHits != 0) {
System.out.println("packID=" + id + " is supposed to be deleted, but got " + hits.totalHits + " matches");
doFail = true;
}
}
// Verify: make sure each group of sub-docs are still in docID order:
for(SubDocs subDocs : allSubDocs) {
TopDocs hits = s.search(new TermQuery(new Term("packID", subDocs.packID)), 20);
if (!subDocs.deleted) {
// We sort by relevance but the scores should be identical so sort falls back to by docID:
if (hits.totalHits != subDocs.subIDs.size()) {
System.out.println("packID=" + subDocs.packID + ": expected " + subDocs.subIDs.size() + " hits but got " + hits.totalHits);
doFail = true;
} else {
int lastDocID = -1;
int startDocID = -1;
for(ScoreDoc scoreDoc : hits.scoreDocs) {
final int docID = scoreDoc.doc;
if (lastDocID != -1) {
assertEquals(1+lastDocID, docID);
} else {
startDocID = docID;
}
lastDocID = docID;
final Document doc = s.doc(docID);
assertEquals(subDocs.packID, doc.get("packID"));
}
lastDocID = startDocID - 1;
for(String subID : subDocs.subIDs) {
hits = s.search(new TermQuery(new Term("docid", subID)), 1);
assertEquals(1, hits.totalHits);
final int docID = hits.scoreDocs[0].doc;
if (lastDocID != -1) {
assertEquals(1+lastDocID, docID);
}
lastDocID = docID;
}
}
} else {
// Pack was deleted -- make sure its docs are
// deleted. We can't verify packID is deleted
// because we can re-use packID for update:
for(String subID : subDocs.subIDs) {
assertEquals(0, s.search(new TermQuery(new Term("docid", subID)), 1).totalHits);
}
}
}
// Verify: make sure all not-deleted docs are in fact
// not deleted:
final int endID = Integer.parseInt(docs.nextDoc().get("docid"));
docs.close();
for(int id=0;id<endID;id++) {
String stringID = ""+id;
if (!delIDs.contains(stringID)) {
final TopDocs hits = s.search(new TermQuery(new Term("docid", stringID)), 1);
if (hits.totalHits != 1) {
System.out.println("doc id=" + stringID + " is not supposed to be deleted, but got hitCount=" + hits.totalHits);
doFail = true;
}
}
}
assertFalse(doFail);
assertEquals("index=" + writer.segString() + " addCount=" + addCount + " delCount=" + delCount, addCount.get() - delCount.get(), s.getIndexReader().numDocs());
releaseSearcher(s);
if (es != null) {
es.shutdown();
es.awaitTermination(1, TimeUnit.SECONDS);
}
writer.commit();
assertEquals("index=" + writer.segString() + " addCount=" + addCount + " delCount=" + delCount, addCount.get() - delCount.get(), writer.numDocs());
assertFalse(writer.anyNonBulkMerges);
doClose();
writer.close(false);
_TestUtil.checkIndex(dir);
dir.close();
_TestUtil.rmDir(tempDir);
if (VERBOSE) {
System.out.println("TEST: done [" + (System.currentTimeMillis()-t0) + " ms]");
}
}
private int runQuery(IndexSearcher s, Query q) throws Exception {
s.search(q, 10);
return s.search(q, null, 10, new Sort(new SortField("title", SortField.Type.STRING))).totalHits;
}
protected void smokeTestSearcher(IndexSearcher s) throws Exception {
runQuery(s, new TermQuery(new Term("body", "united")));
runQuery(s, new TermQuery(new Term("titleTokenized", "states")));
PhraseQuery pq = new PhraseQuery();
pq.add(new Term("body", "united"));
pq.add(new Term("body", "states"));
runQuery(s, pq);
}
}

View File

@ -38,6 +38,7 @@ import org.apache.lucene.document.BinaryField;
import org.apache.lucene.document.Document; import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field; import org.apache.lucene.document.Field;
import org.apache.lucene.document.FieldType; import org.apache.lucene.document.FieldType;
import org.apache.lucene.document.StringField;
import org.apache.lucene.document.TextField; import org.apache.lucene.document.TextField;
import org.apache.lucene.index.IndexWriterConfig.OpenMode; import org.apache.lucene.index.IndexWriterConfig.OpenMode;
import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.search.DocIdSetIterator;
@ -1870,4 +1871,29 @@ public class TestIndexWriter extends LuceneTestCase {
w.close(); w.close();
d.close(); d.close();
} }
public void testNRTReaderVersion() throws Exception {
Directory d = new MockDirectoryWrapper(random, new RAMDirectory());
IndexWriter w = new IndexWriter(d, new IndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random)));
Document doc = new Document();
doc.add(newField("id", "0", StringField.TYPE_STORED));
w.addDocument(doc);
IndexReader r = w.getReader();
long version = r.getVersion();
r.close();
w.addDocument(doc);
r = w.getReader();
long version2 = r.getVersion();
r.close();
assert(version2 > version);
w.deleteDocuments(new Term("id", "0"));
r = w.getReader();
w.close();
long version3 = r.getVersion();
r.close();
assert(version3 > version2);
d.close();
}
} }

View File

@ -17,329 +17,26 @@ package org.apache.lucene.index;
* limitations under the License. * limitations under the License.
*/ */
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lucene.analysis.MockAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.StringField;
import org.apache.lucene.document.TextField;
import org.apache.lucene.index.codecs.CodecProvider;
import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.PhraseQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.store.MockDirectoryWrapper; import org.apache.lucene.store.MockDirectoryWrapper;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.LineFileDocs;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.NamedThreadFactory;
import org.apache.lucene.util._TestUtil;
import org.junit.Test;
// TODO // TODO
// - mix in optimize, addIndexes // - mix in optimize, addIndexes
// - randomoly mix in non-congruent docs // - randomoly mix in non-congruent docs
public class TestNRTThreads extends LuceneTestCase { public class TestNRTThreads extends ThreadedIndexingAndSearchingTestCase {
@Override
protected void doSearching(ExecutorService es, long stopTime) throws Exception {
private static class SubDocs { boolean anyOpenDelFiles = false;
public final String packID;
public final List<String> subIDs;
public boolean deleted;
public SubDocs(String packID, List<String> subIDs) {
this.packID = packID;
this.subIDs = subIDs;
}
}
@Test
public void testNRTThreads() throws Exception {
final long t0 = System.currentTimeMillis();
final String defaultCodec = CodecProvider.getDefault().getDefaultFieldCodec();
if (defaultCodec.equals("SimpleText") || defaultCodec.equals("Memory")) {
// no
CodecProvider.getDefault().setDefaultFieldCodec("Standard");
}
final LineFileDocs docs = new LineFileDocs(random);
final File tempDir = _TestUtil.getTempDir("nrtopenfiles");
final MockDirectoryWrapper dir = newFSDirectory(tempDir);
dir.setCheckIndexOnClose(false); // don't double-checkIndex, we do it ourselves.
final IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random));
if (LuceneTestCase.TEST_NIGHTLY) {
// newIWConfig makes smallish max seg size, which
// results in tons and tons of segments for this test
// when run nightly:
MergePolicy mp = conf.getMergePolicy();
if (mp instanceof TieredMergePolicy) {
((TieredMergePolicy) mp).setMaxMergedSegmentMB(5000.);
} else if (mp instanceof LogByteSizeMergePolicy) {
((LogByteSizeMergePolicy) mp).setMaxMergeMB(1000.);
} else if (mp instanceof LogMergePolicy) {
((LogMergePolicy) mp).setMaxMergeDocs(100000);
}
}
conf.setMergedSegmentWarmer(new IndexWriter.IndexReaderWarmer() {
@Override
public void warm(IndexReader reader) throws IOException {
if (VERBOSE) {
System.out.println("TEST: now warm merged reader=" + reader);
}
final int maxDoc = reader.maxDoc();
final Bits liveDocs = reader.getLiveDocs();
int sum = 0;
final int inc = Math.max(1, maxDoc/50);
for(int docID=0;docID<maxDoc;docID += inc) {
if (liveDocs == null || liveDocs.get(docID)) {
final Document doc = reader.document(docID);
sum += doc.getFields().size();
}
}
IndexSearcher searcher = newSearcher(reader);
sum += searcher.search(new TermQuery(new Term("body", "united")), 10).totalHits;
searcher.close();
if (VERBOSE) {
System.out.println("TEST: warm visited " + sum + " fields");
}
}
});
final IndexWriter writer = new IndexWriter(dir, conf);
if (VERBOSE) {
writer.setInfoStream(System.out);
}
_TestUtil.reduceOpenFiles(writer);
final int NUM_INDEX_THREADS = 2;
final int NUM_SEARCH_THREADS = 3;
final int RUN_TIME_SEC = LuceneTestCase.TEST_NIGHTLY ? 300 : RANDOM_MULTIPLIER;
final AtomicBoolean failed = new AtomicBoolean();
final AtomicInteger addCount = new AtomicInteger();
final AtomicInteger delCount = new AtomicInteger();
final AtomicInteger packCount = new AtomicInteger();
final Set<String> delIDs = Collections.synchronizedSet(new HashSet<String>());
final List<SubDocs> allSubDocs = Collections.synchronizedList(new ArrayList<SubDocs>());
final long stopTime = System.currentTimeMillis() + RUN_TIME_SEC*1000;
Thread[] threads = new Thread[NUM_INDEX_THREADS];
for(int thread=0;thread<NUM_INDEX_THREADS;thread++) {
threads[thread] = new Thread() {
@Override
public void run() {
// TODO: would be better if this were cross thread, so that we make sure one thread deleting anothers added docs works:
final List<String> toDeleteIDs = new ArrayList<String>();
final List<SubDocs> toDeleteSubDocs = new ArrayList<SubDocs>();
while(System.currentTimeMillis() < stopTime && !failed.get()) {
try {
Document doc = docs.nextDoc();
if (doc == null) {
break;
}
final String addedField;
if (random.nextBoolean()) {
addedField = "extra" + random.nextInt(10);
doc.add(new TextField(addedField, "a random field"));
} else {
addedField = null;
}
if (random.nextBoolean()) {
if (VERBOSE) {
//System.out.println(Thread.currentThread().getName() + ": add doc id:" + doc.get("docid"));
}
if (random.nextBoolean()) {
// Add a pack of adjacent sub-docs
final String packID;
final SubDocs delSubDocs;
if (toDeleteSubDocs.size() > 0 && random.nextBoolean()) {
delSubDocs = toDeleteSubDocs.get(random.nextInt(toDeleteSubDocs.size()));
assert !delSubDocs.deleted;
toDeleteSubDocs.remove(delSubDocs);
// reuse prior packID
packID = delSubDocs.packID;
} else {
delSubDocs = null;
// make new packID
packID = packCount.getAndIncrement() + "";
}
final Field packIDField = newField("packID", packID, StringField.TYPE_STORED);
final List<String> docIDs = new ArrayList<String>();
final SubDocs subDocs = new SubDocs(packID, docIDs);
final List<Document> docsList = new ArrayList<Document>();
allSubDocs.add(subDocs);
doc.add(packIDField);
docsList.add(_TestUtil.cloneDocument(doc));
docIDs.add(doc.get("docid"));
final int maxDocCount = _TestUtil.nextInt(random, 1, 10);
while(docsList.size() < maxDocCount) {
doc = docs.nextDoc();
if (doc == null) {
break;
}
docsList.add(_TestUtil.cloneDocument(doc));
docIDs.add(doc.get("docid"));
}
addCount.addAndGet(docsList.size());
if (delSubDocs != null) {
delSubDocs.deleted = true;
delIDs.addAll(delSubDocs.subIDs);
delCount.addAndGet(delSubDocs.subIDs.size());
if (VERBOSE) {
System.out.println("TEST: update pack packID=" + delSubDocs.packID + " count=" + docsList.size() + " docs=" + docIDs);
}
writer.updateDocuments(new Term("packID", delSubDocs.packID), docsList);
/*
// non-atomic:
writer.deleteDocuments(new Term("packID", delSubDocs.packID));
for(Document subDoc : docsList) {
writer.addDocument(subDoc);
}
*/
} else {
if (VERBOSE) {
System.out.println("TEST: add pack packID=" + packID + " count=" + docsList.size() + " docs=" + docIDs);
}
writer.addDocuments(docsList);
/*
// non-atomic:
for(Document subDoc : docsList) {
writer.addDocument(subDoc);
}
*/
}
doc.removeField("packID");
if (random.nextInt(5) == 2) {
if (VERBOSE) {
//System.out.println(Thread.currentThread().getName() + ": buffer del id:" + packID);
}
toDeleteSubDocs.add(subDocs);
}
} else {
writer.addDocument(doc);
addCount.getAndIncrement();
if (random.nextInt(5) == 3) {
if (VERBOSE) {
//System.out.println(Thread.currentThread().getName() + ": buffer del id:" + doc.get("docid"));
}
toDeleteIDs.add(doc.get("docid"));
}
}
} else {
// we use update but it never replaces a
// prior doc
if (VERBOSE) {
//System.out.println(Thread.currentThread().getName() + ": update doc id:" + doc.get("docid"));
}
writer.updateDocument(new Term("docid", doc.get("docid")), doc);
addCount.getAndIncrement();
if (random.nextInt(5) == 3) {
if (VERBOSE) {
//System.out.println(Thread.currentThread().getName() + ": buffer del id:" + doc.get("docid"));
}
toDeleteIDs.add(doc.get("docid"));
}
}
if (random.nextInt(30) == 17) {
if (VERBOSE) {
//System.out.println(Thread.currentThread().getName() + ": apply " + toDeleteIDs.size() + " deletes");
}
for(String id : toDeleteIDs) {
if (VERBOSE) {
//System.out.println(Thread.currentThread().getName() + ": del term=id:" + id);
}
writer.deleteDocuments(new Term("docid", id));
}
final int count = delCount.addAndGet(toDeleteIDs.size());
if (VERBOSE) {
//System.out.println(Thread.currentThread().getName() + ": tot " + count + " deletes");
}
delIDs.addAll(toDeleteIDs);
toDeleteIDs.clear();
for(SubDocs subDocs : toDeleteSubDocs) {
assert !subDocs.deleted;
writer.deleteDocuments(new Term("packID", subDocs.packID));
subDocs.deleted = true;
if (VERBOSE) {
System.out.println(" del subs: " + subDocs.subIDs + " packID=" + subDocs.packID);
}
delIDs.addAll(subDocs.subIDs);
delCount.addAndGet(subDocs.subIDs.size());
}
toDeleteSubDocs.clear();
}
if (addedField != null) {
doc.removeField(addedField);
}
} catch (Throwable t) {
System.out.println(Thread.currentThread().getName() + ": hit exc");
t.printStackTrace();
failed.set(true);
throw new RuntimeException(t);
}
}
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": indexing done");
}
}
};
threads[thread].setDaemon(true);
threads[thread].start();
}
if (VERBOSE) {
System.out.println("TEST: DONE start indexing threads [" + (System.currentTimeMillis()-t0) + " ms]");
}
// let index build up a bit
Thread.sleep(100);
IndexReader r = IndexReader.open(writer, true); IndexReader r = IndexReader.open(writer, true);
boolean any = false;
// silly starting guess: while (System.currentTimeMillis() < stopTime && !failed.get()) {
final AtomicInteger totTermCount = new AtomicInteger(100);
final ExecutorService es = Executors.newCachedThreadPool(new NamedThreadFactory("NRT search threads"));
while(System.currentTimeMillis() < stopTime && !failed.get()) {
if (random.nextBoolean()) { if (random.nextBoolean()) {
if (VERBOSE) { if (VERBOSE) {
System.out.println("TEST: now reopen r=" + r); System.out.println("TEST: now reopen r=" + r);
@ -355,11 +52,11 @@ public class TestNRTThreads extends LuceneTestCase {
} }
r.close(); r.close();
writer.commit(); writer.commit();
final Set<String> openDeletedFiles = dir.getOpenDeletedFiles(); final Set<String> openDeletedFiles = ((MockDirectoryWrapper) dir).getOpenDeletedFiles();
if (openDeletedFiles.size() > 0) { if (openDeletedFiles.size() > 0) {
System.out.println("OBD files: " + openDeletedFiles); System.out.println("OBD files: " + openDeletedFiles);
} }
any |= openDeletedFiles.size() > 0; anyOpenDelFiles |= openDeletedFiles.size() > 0;
//assertEquals("open but deleted: " + openDeletedFiles, 0, openDeletedFiles.size()); //assertEquals("open but deleted: " + openDeletedFiles, 0, openDeletedFiles.size());
if (VERBOSE) { if (VERBOSE) {
System.out.println("TEST: now open"); System.out.println("TEST: now open");
@ -372,203 +69,52 @@ public class TestNRTThreads extends LuceneTestCase {
//System.out.println("numDocs=" + r.numDocs() + " //System.out.println("numDocs=" + r.numDocs() + "
//openDelFileCount=" + dir.openDeleteFileCount()); //openDelFileCount=" + dir.openDeleteFileCount());
smokeTestReader(r);
if (r.numDocs() > 0) { if (r.numDocs() > 0) {
fixedSearcher = new IndexSearcher(r, es);
final IndexSearcher s = new IndexSearcher(r, es); smokeTestSearcher(fixedSearcher);
runSearchThreads(System.currentTimeMillis() + 500);
// run search threads
final long searchStopTime = System.currentTimeMillis() + 500;
final Thread[] searchThreads = new Thread[NUM_SEARCH_THREADS];
final AtomicInteger totHits = new AtomicInteger();
for(int thread=0;thread<NUM_SEARCH_THREADS;thread++) {
searchThreads[thread] = new Thread() {
@Override
public void run() {
try {
TermsEnum termsEnum = MultiFields.getTerms(s.getIndexReader(), "body").iterator();
int seenTermCount = 0;
int shift;
int trigger;
if (totTermCount.get() < 10) {
shift = 0;
trigger = 1;
} else {
trigger = totTermCount.get()/10;
shift = random.nextInt(trigger);
}
while(System.currentTimeMillis() < searchStopTime) {
BytesRef term = termsEnum.next();
if (term == null) {
if (seenTermCount < 10) {
break;
}
totTermCount.set(seenTermCount);
seenTermCount = 0;
trigger = totTermCount.get()/10;
//System.out.println("trigger " + trigger);
shift = random.nextInt(trigger);
termsEnum.seekCeil(new BytesRef(""));
continue;
}
seenTermCount++;
// search 10 terms
if (trigger == 0) {
trigger = 1;
}
if ((seenTermCount + shift) % trigger == 0) {
//if (VERBOSE) {
//System.out.println(Thread.currentThread().getName() + " now search body:" + term.utf8ToString());
//}
totHits.addAndGet(runQuery(s, new TermQuery(new Term("body", term))));
}
}
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": search done");
}
} catch (Throwable t) {
System.out.println(Thread.currentThread().getName() + ": hit exc");
failed.set(true);
t.printStackTrace(System.out);
throw new RuntimeException(t);
}
}
};
searchThreads[thread].setDaemon(true);
searchThreads[thread].start();
}
for(int thread=0;thread<NUM_SEARCH_THREADS;thread++) {
searchThreads[thread].join();
}
if (VERBOSE) {
System.out.println("TEST: DONE search: totHits=" + totHits);
}
} else {
Thread.sleep(100);
} }
} }
r.close();
es.shutdown();
es.awaitTermination(1, TimeUnit.SECONDS);
if (VERBOSE) {
System.out.println("TEST: all searching done [" + (System.currentTimeMillis()-t0) + " ms]");
}
//System.out.println("numDocs=" + r.numDocs() + " openDelFileCount=" + dir.openDeleteFileCount()); //System.out.println("numDocs=" + r.numDocs() + " openDelFileCount=" + dir.openDeleteFileCount());
r.close(); final Set<String> openDeletedFiles = ((MockDirectoryWrapper) dir).getOpenDeletedFiles();
final Set<String> openDeletedFiles = dir.getOpenDeletedFiles();
if (openDeletedFiles.size() > 0) { if (openDeletedFiles.size() > 0) {
System.out.println("OBD files: " + openDeletedFiles); System.out.println("OBD files: " + openDeletedFiles);
} }
any |= openDeletedFiles.size() > 0; anyOpenDelFiles |= openDeletedFiles.size() > 0;
assertFalse("saw non-zero open-but-deleted count", any); assertFalse("saw non-zero open-but-deleted count", anyOpenDelFiles);
if (VERBOSE) { }
System.out.println("TEST: now join");
}
for(int thread=0;thread<NUM_INDEX_THREADS;thread++) {
threads[thread].join();
}
if (VERBOSE) {
System.out.println("TEST: done join [" + (System.currentTimeMillis()-t0) + " ms]; addCount=" + addCount + " delCount=" + delCount);
}
final IndexReader r2 = writer.getReader(); private IndexSearcher fixedSearcher;
final IndexSearcher s = newSearcher(r2);
boolean doFail = false;
for(String id : delIDs) {
final TopDocs hits = s.search(new TermQuery(new Term("docid", id)), 1);
if (hits.totalHits != 0) {
System.out.println("doc id=" + id + " is supposed to be deleted, but got docID=" + hits.scoreDocs[0].doc);
doFail = true;
}
}
// Make sure each group of sub-docs are still in docID order: protected IndexSearcher getCurrentSearcher() throws Exception {
for(SubDocs subDocs : allSubDocs) { return fixedSearcher;
if (!subDocs.deleted) { }
// We sort by relevance but the scores should be identical so sort falls back to by docID:
TopDocs hits = s.search(new TermQuery(new Term("packID", subDocs.packID)), 20);
assertEquals(subDocs.subIDs.size(), hits.totalHits);
int lastDocID = -1;
int startDocID = -1;
for(ScoreDoc scoreDoc : hits.scoreDocs) {
final int docID = scoreDoc.doc;
if (lastDocID != -1) {
assertEquals(1+lastDocID, docID);
} else {
startDocID = docID;
}
lastDocID = docID;
final Document doc = s.doc(docID);
assertEquals(subDocs.packID, doc.get("packID"));
}
lastDocID = startDocID - 1; @Override
for(String subID : subDocs.subIDs) { protected void releaseSearcher(IndexSearcher s) throws Exception {
hits = s.search(new TermQuery(new Term("docid", subID)), 1); if (s != fixedSearcher) {
assertEquals(1, hits.totalHits); // Final searcher:
final int docID = hits.scoreDocs[0].doc; s.getIndexReader().close();
if (lastDocID != -1) { s.close();
assertEquals(1+lastDocID, docID);
}
lastDocID = docID;
}
} else {
for(String subID : subDocs.subIDs) {
assertEquals(0, s.search(new TermQuery(new Term("docid", subID)), 1).totalHits);
}
}
}
final int endID = Integer.parseInt(docs.nextDoc().get("docid"));
for(int id=0;id<endID;id++) {
String stringID = ""+id;
if (!delIDs.contains(stringID)) {
final TopDocs hits = s.search(new TermQuery(new Term("docid", stringID)), 1);
if (hits.totalHits != 1) {
System.out.println("doc id=" + stringID + " is not supposed to be deleted, but got hitCount=" + hits.totalHits);
doFail = true;
}
}
}
assertFalse(doFail);
assertEquals("index=" + writer.segString() + " addCount=" + addCount + " delCount=" + delCount, addCount.get() - delCount.get(), r2.numDocs());
r2.close();
writer.commit();
assertEquals("index=" + writer.segString() + " addCount=" + addCount + " delCount=" + delCount, addCount.get() - delCount.get(), writer.numDocs());
assertFalse(writer.anyNonBulkMerges);
writer.close(false);
_TestUtil.checkIndex(dir);
s.close();
dir.close();
_TestUtil.rmDir(tempDir);
docs.close();
if (VERBOSE) {
System.out.println("TEST: done [" + (System.currentTimeMillis()-t0) + " ms]");
} }
} }
private int runQuery(IndexSearcher s, Query q) throws Exception { @Override
s.search(q, 10); protected IndexSearcher getFinalSearcher() throws Exception {
return s.search(q, null, 10, new Sort(new SortField("title", SortField.Type.STRING))).totalHits; final IndexReader r2;
if (random.nextBoolean()) {
r2 = writer.getReader();
} else {
writer.commit();
r2 = IndexReader.open(dir);
}
return newSearcher(r2);
} }
private void smokeTestReader(IndexReader r) throws Exception { public void testNRTThreads() throws Exception {
IndexSearcher s = newSearcher(r); runTest("TestNRTThreads");
runQuery(s, new TermQuery(new Term("body", "united")));
runQuery(s, new TermQuery(new Term("titleTokenized", "states")));
PhraseQuery pq = new PhraseQuery();
pq.add(new Term("body", "united"));
pq.add(new Term("body", "states"));
runQuery(s, pq);
s.close();
} }
} }