From 4d44198e0de2d536aa5d095e6a2988bae4cb53d4 Mon Sep 17 00:00:00 2001 From: Michael McCandless Date: Sun, 25 Sep 2011 15:18:13 +0000 Subject: [PATCH] LUCENE-3445: add SearcherManager to simplify handling of multiple search threads and reopening IndexSearcher git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1175413 13f79535-47bb-0310-9956-ffa450edef68 --- lucene/contrib/CHANGES.txt | 5 + .../org/apache/lucene/index/NRTManager.java | 92 ++- .../apache/lucene/search/SearcherManager.java | 201 +++++ .../apache/lucene/search/SearcherWarmer.java | 34 + .../apache/lucene/index/TestNRTManager.java | 770 ++++-------------- .../lucene/search/TestSearcherManager.java | 113 +++ .../ThreadedIndexingAndSearchingTestCase.java | 652 +++++++++++++++ .../apache/lucene/index/TestIndexWriter.java | 26 + .../apache/lucene/index/TestNRTThreads.java | 530 +----------- 9 files changed, 1281 insertions(+), 1142 deletions(-) create mode 100644 lucene/contrib/misc/src/java/org/apache/lucene/search/SearcherManager.java create mode 100644 lucene/contrib/misc/src/java/org/apache/lucene/search/SearcherWarmer.java create mode 100644 lucene/contrib/misc/src/test/org/apache/lucene/search/TestSearcherManager.java create mode 100644 lucene/src/test-framework/org/apache/lucene/index/ThreadedIndexingAndSearchingTestCase.java diff --git a/lucene/contrib/CHANGES.txt b/lucene/contrib/CHANGES.txt index b168f76de31..6a2617df9f1 100644 --- a/lucene/contrib/CHANGES.txt +++ b/lucene/contrib/CHANGES.txt @@ -95,6 +95,11 @@ New Features implements Iterable, and allows adding Filters without creating FilterClause. (Uwe Schindler) + * LUCENE-3445: Added SearcherManager, to manage sharing and reopening + IndexSearchers across multiple search threads. IndexReader's + refCount is used to safely close the reader only once all threads are done + using it. (Michael McCandless) + Bug Fixes * LUCENE-3417: DictionaryCompoundWordFilter did not properly add tokens from the diff --git a/lucene/contrib/misc/src/java/org/apache/lucene/index/NRTManager.java b/lucene/contrib/misc/src/java/org/apache/lucene/index/NRTManager.java index 9fb8d0e24e5..7385fa66848 100644 --- a/lucene/contrib/misc/src/java/org/apache/lucene/index/NRTManager.java +++ b/lucene/contrib/misc/src/java/org/apache/lucene/index/NRTManager.java @@ -19,16 +19,16 @@ package org.apache.lucene.index; import java.io.Closeable; import java.io.IOException; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.List; import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.index.IndexReader; // javadocs -import org.apache.lucene.document.Document; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.Query; +import org.apache.lucene.search.SearcherWarmer; import org.apache.lucene.util.ThreadInterruptedException; // TODO @@ -46,7 +46,7 @@ import org.apache.lucene.util.ThreadInterruptedException; * caller is waiting for a specific generation searcher.

* * @lucene.experimental -*/ + */ public class NRTManager implements Closeable { private final IndexWriter writer; @@ -54,36 +54,36 @@ public class NRTManager implements Closeable { private final AtomicLong indexingGen; private final AtomicLong searchingGen; private final AtomicLong noDeletesSearchingGen; + private final SearcherWarmer warmer; private final List waitingListeners = new CopyOnWriteArrayList(); private volatile IndexSearcher currentSearcher; private volatile IndexSearcher noDeletesCurrentSearcher; /** - * Create new NRTManager. Note that this installs a - * merged segment warmer on the provided IndexWriter's - * config. + * Create new NRTManager. * * @param writer IndexWriter to open near-real-time * readers - */ - public NRTManager(IndexWriter writer) throws IOException { - this(writer, null); - } - - /** - * Create new NRTManager. Note that this installs a - * merged segment warmer on the provided IndexWriter's - * config. - * - * @param writer IndexWriter to open near-real-time - * readers - * @param es ExecutorService to pass to the IndexSearcher - */ - public NRTManager(IndexWriter writer, ExecutorService es) throws IOException { + * @param es optional ExecutorService so different segments can + * be searched concurrently (see {@link + * IndexSearcher#IndexSearcher(IndexReader,ExecutorService)}. Pass null + * to search segments sequentially. + * @param warmer optional {@link SearcherWarmer}. Pass + * null if you don't require the searcher to warmed + * before going live. If this is non-null then a + * merged segment warmer is installed on the + * provided IndexWriter's config. + * + *

NOTE: the provided {@link SearcherWarmer} is + * not invoked for the initial searcher; you should + * warm it yourself if necessary. + */ + public NRTManager(IndexWriter writer, ExecutorService es, SearcherWarmer warmer) throws IOException { this.writer = writer; this.es = es; + this.warmer = warmer; indexingGen = new AtomicLong(1); searchingGen = new AtomicLong(-1); noDeletesSearchingGen = new AtomicLong(-1); @@ -91,13 +91,15 @@ public class NRTManager implements Closeable { // Create initial reader: swapSearcher(new IndexSearcher(IndexReader.open(writer, true), es), 0, true); - writer.getConfig().setMergedSegmentWarmer( + if (this.warmer != null) { + writer.getConfig().setMergedSegmentWarmer( new IndexWriter.IndexReaderWarmer() { @Override public void warm(IndexReader reader) throws IOException { - NRTManager.this.warm(reader); + NRTManager.this.warmer.warm(new IndexSearcher(reader, NRTManager.this.es)); } }); + } } /** NRTManager invokes this interface to notify it when a @@ -120,25 +122,25 @@ public class NRTManager implements Closeable { waitingListeners.remove(l); } - public long updateDocument(Term t, Document d, Analyzer a) throws IOException { + public long updateDocument(Term t, Iterable d, Analyzer a) throws IOException { writer.updateDocument(t, d, a); // Return gen as of when indexing finished: return indexingGen.get(); } - public long updateDocument(Term t, Document d) throws IOException { + public long updateDocument(Term t, Iterable d) throws IOException { writer.updateDocument(t, d); // Return gen as of when indexing finished: return indexingGen.get(); } - public long updateDocuments(Term t, Iterable docs, Analyzer a) throws IOException { + public long updateDocuments(Term t, Iterable> docs, Analyzer a) throws IOException { writer.updateDocuments(t, docs, a); // Return gen as of when indexing finished: return indexingGen.get(); } - public long updateDocuments(Term t, Iterable docs) throws IOException { + public long updateDocuments(Term t, Iterable> docs) throws IOException { writer.updateDocuments(t, docs); // Return gen as of when indexing finished: return indexingGen.get(); @@ -156,25 +158,25 @@ public class NRTManager implements Closeable { return indexingGen.get(); } - public long addDocument(Document d, Analyzer a) throws IOException { + public long addDocument(Iterable d, Analyzer a) throws IOException { writer.addDocument(d, a); // Return gen as of when indexing finished: return indexingGen.get(); } - public long addDocuments(Iterable docs, Analyzer a) throws IOException { + public long addDocuments(Iterable> docs, Analyzer a) throws IOException { writer.addDocuments(docs, a); // Return gen as of when indexing finished: return indexingGen.get(); } - public long addDocument(Document d) throws IOException { + public long addDocument(Iterable d) throws IOException { writer.addDocument(d); // Return gen as of when indexing finished: return indexingGen.get(); } - public long addDocuments(Iterable docs) throws IOException { + public long addDocuments(Iterable> docs) throws IOException { writer.addDocuments(docs); // Return gen as of when indexing finished: return indexingGen.get(); @@ -262,7 +264,10 @@ public class NRTManager implements Closeable { } /** Release the searcher obtained from {@link - * #get()} or {@link #get(long)}. */ + * #get()} or {@link #get(long)}. + * + *

NOTE: it's safe to call this after {@link + * #close}. */ public void release(IndexSearcher s) throws IOException { s.getIndexReader().decRef(); } @@ -304,23 +309,19 @@ public class NRTManager implements Closeable { final IndexSearcher startSearcher = noDeletesSearchingGen.get() > searchingGen.get() ? noDeletesCurrentSearcher : currentSearcher; final IndexReader nextReader = startSearcher.getIndexReader().reopen(writer, applyDeletes); - warm(nextReader); + final IndexSearcher nextSearcher = new IndexSearcher(nextReader, es); + if (warmer != null) { + warmer.warm(nextSearcher); + } // Transfer reference to swapSearcher: - swapSearcher(new IndexSearcher(nextReader, es), + swapSearcher(nextSearcher, newSearcherGen, applyDeletes); return true; } - /** Override this to warm the newly opened reader before - * it's swapped in. Note that this is called both for - * newly merged segments and for new top-level readers - * opened by #reopen. */ - protected void warm(IndexReader reader) throws IOException { - } - // Steals a reference from newSearcher: private synchronized void swapSearcher(IndexSearcher newSearcher, long newSearchingGen, boolean applyDeletes) throws IOException { //System.out.println(Thread.currentThread().getName() + ": swap searcher gen=" + newSearchingGen + " applyDeletes=" + applyDeletes); @@ -350,7 +351,12 @@ public class NRTManager implements Closeable { //System.out.println(Thread.currentThread().getName() + ": done"); } - /** NOTE: caller must separately close the writer. */ + /** Close this NRTManager to future searching. Any + * searches still in process in other threads won't be + * affected, and they should still call {@link #release} + * after they are done. + * + *

NOTE: caller must separately close the writer. */ @Override public void close() throws IOException { swapSearcher(null, indexingGen.getAndIncrement(), true); diff --git a/lucene/contrib/misc/src/java/org/apache/lucene/search/SearcherManager.java b/lucene/contrib/misc/src/java/org/apache/lucene/search/SearcherManager.java new file mode 100644 index 00000000000..9fff8c9ddb1 --- /dev/null +++ b/lucene/contrib/misc/src/java/org/apache/lucene/search/SearcherManager.java @@ -0,0 +1,201 @@ +package org.apache.lucene.search; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.Closeable; +import java.io.IOException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.NRTManager; // javadocs +import org.apache.lucene.store.AlreadyClosedException; +import org.apache.lucene.store.Directory; + +/** Utility class to safely share {@link IndexSearcher} instances + * across multiple threads, while periodically reopening. + * This class ensures each IndexSearcher instance is not + * closed until it is no longer needed. + * + *

Use {@link #get} to obtain the current searcher, and + * {@link #release} to release it, like this: + * + *

+ *    IndexSearcher s = manager.get();
+ *    try {
+ *      // Do searching, doc retrieval, etc. with s
+ *    } finally {
+ *      manager.release(s);
+ *    }
+ *    // Do not use s after this!
+ *    s = null;
+ *  
+ * + *

In addition you should periodically call {@link + * #maybeReopen}. While it's possible to call this just + * before running each query, this is discouraged since it + * penalizes the unlucky queries that do the reopen. It's + * better to use a separate background thread, that + * periodically calls maybeReopen. Finally, be sure to + * call {@link #close} once you are done. + * + *

NOTE: if you have an {@link IndexWriter}, it's + * better to use {@link NRTManager} since that class pulls + * near-real-time readers from the IndexWriter. + * + * @lucene.experimental + */ + +public class SearcherManager implements Closeable { + + // Current searcher + private volatile IndexSearcher currentSearcher; + private final SearcherWarmer warmer; + private final AtomicBoolean reopening = new AtomicBoolean(); + private final ExecutorService es; + + /** Opens an initial searcher from the Directory. + * + * @param dir Directory to open the searcher from + * + * @param warmer optional {@link SearcherWarmer}. Pass + * null if you don't require the searcher to warmed + * before going live. + * + *

NOTE: the provided {@link SearcherWarmer} is + * not invoked for the initial searcher; you should + * warm it yourself if necessary. + */ + public SearcherManager(Directory dir, SearcherWarmer warmer) throws IOException { + this(dir, warmer, null); + } + + /** Opens an initial searcher from the Directory. + * + * @param dir Directory to open the searcher from + * + * @param warmer optional {@link SearcherWarmer}. Pass + * null if you don't require the searcher to warmed + * before going live. + * + * @param es optional ExecutorService so different segments can + * be searched concurrently (see {@link + * IndexSearcher#IndexSearcher(IndexReader,ExecutorService)}. Pass null + * to search segments sequentially. + * + *

NOTE: the provided {@link SearcherWarmer} is + * not invoked for the initial searcher; you should + * warm it yourself if necessary. + */ + public SearcherManager(Directory dir, SearcherWarmer warmer, ExecutorService es) throws IOException { + this.es = es; + currentSearcher = new IndexSearcher(IndexReader.open(dir), this.es); + this.warmer = warmer; + } + + /** You must call this, periodically, to perform a + * reopen. This calls {@link IndexReader#reopen} on the + * underlying reader, and if that returns a new reader, + * it's warmed (if you provided a {@link SearcherWarmer} + * and then swapped into production. + * + *

Threads: it's fine for more than one thread to + * call this at once. Only the first thread will attempt + * the reopen; subsequent threads will see that another + * thread is already handling reopen and will return + * immediately. Note that this means if another thread + * is already reopening then subsequent threads will + * return right away without waiting for the reader + * reopen to complete.

+ * + *

This method returns true if a new reader was in + * fact opened.

+ */ + public boolean maybeReopen() + throws IOException { + + if (currentSearcher == null) { + throw new AlreadyClosedException("this SearcherManager is closed"); + } + + // Ensure only 1 thread does reopen at once; other + // threads just return immediately: + if (!reopening.getAndSet(true)) { + try { + IndexReader newReader = currentSearcher.getIndexReader().reopen(); + if (newReader != currentSearcher.getIndexReader()) { + IndexSearcher newSearcher = new IndexSearcher(newReader, es); + if (warmer != null) { + warmer.warm(newSearcher); + } + swapSearcher(newSearcher); + return true; + } else { + return false; + } + } finally { + reopening.set(false); + } + } else { + return false; + } + } + + /** Obtain the current IndexSearcher. You must match + * every call to get with one call to {@link #release}; + * it's best to do so in a finally clause. */ + public IndexSearcher get() { + IndexSearcher toReturn = currentSearcher; + if (toReturn == null) { + throw new AlreadyClosedException("this SearcherManager is closed"); + } + toReturn.getIndexReader().incRef(); + return toReturn; + } + + /** Release the searcher previously obtained with {@link + * #get}. + * + *

NOTE: it's safe to call this after {@link + * #close}. */ + public void release(IndexSearcher searcher) + throws IOException { + searcher.getIndexReader().decRef(); + } + + // Replaces old searcher with new one + private void swapSearcher(IndexSearcher newSearcher) + throws IOException { + IndexSearcher oldSearcher = currentSearcher; + if (oldSearcher == null) { + throw new AlreadyClosedException("this SearcherManager is closed"); + } + currentSearcher = newSearcher; + release(oldSearcher); + } + + /** Close this SearcherManager to future searching. Any + * searches still in process in other threads won't be + * affected, and they should still call {@link #release} + * after they are done. */ + @Override + public void close() throws IOException { + swapSearcher(null); + } +} diff --git a/lucene/contrib/misc/src/java/org/apache/lucene/search/SearcherWarmer.java b/lucene/contrib/misc/src/java/org/apache/lucene/search/SearcherWarmer.java new file mode 100644 index 00000000000..52f0ec427c7 --- /dev/null +++ b/lucene/contrib/misc/src/java/org/apache/lucene/search/SearcherWarmer.java @@ -0,0 +1,34 @@ +package org.apache.lucene.search; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.IOException; + +import org.apache.lucene.index.NRTManager; // javadocs + +/** Pass an implementation of this to {@link NRTManager} or + * {@link SearcherManager} to warm a new {@link + * IndexSearcher} before it's put into production. + * + * @lucene.experimental */ + +public interface SearcherWarmer { + // TODO: can we somehow merge this w/ IW's + // IndexReaderWarmer.... should IW switch to this? + public void warm(IndexSearcher s) throws IOException; +} diff --git a/lucene/contrib/misc/src/test/org/apache/lucene/index/TestNRTManager.java b/lucene/contrib/misc/src/test/org/apache/lucene/index/TestNRTManager.java index 5a7c73ff0a7..76c00eaee08 100644 --- a/lucene/contrib/misc/src/test/org/apache/lucene/index/TestNRTManager.java +++ b/lucene/contrib/misc/src/test/org/apache/lucene/index/TestNRTManager.java @@ -17,155 +17,160 @@ package org.apache.lucene.index; * limitations under the License. */ -import java.io.File; import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; import java.util.List; -import java.util.Set; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import org.apache.lucene.analysis.MockAnalyzer; -import org.apache.lucene.document.Document; -import org.apache.lucene.document.Field; -import org.apache.lucene.document.StringField; -import org.apache.lucene.document.TextField; -import org.apache.lucene.index.codecs.CodecProvider; import org.apache.lucene.search.IndexSearcher; -import org.apache.lucene.search.PhraseQuery; -import org.apache.lucene.search.Query; -import org.apache.lucene.search.ScoreDoc; -import org.apache.lucene.search.Sort; -import org.apache.lucene.search.SortField; +import org.apache.lucene.search.SearcherWarmer; import org.apache.lucene.search.TermQuery; -import org.apache.lucene.search.TopDocs; import org.apache.lucene.store.Directory; -import org.apache.lucene.store.MockDirectoryWrapper; import org.apache.lucene.store.NRTCachingDirectory; -import org.apache.lucene.util.Bits; -import org.apache.lucene.util.BytesRef; -import org.apache.lucene.util.LineFileDocs; -import org.apache.lucene.util.LuceneTestCase; -import org.apache.lucene.util.NamedThreadFactory; -import org.apache.lucene.util._TestUtil; -import org.junit.Test; -// TODO -// - mix in optimize, addIndexes -// - randomoly mix in non-congruent docs +public class TestNRTManager extends ThreadedIndexingAndSearchingTestCase { -// NOTE: This is a copy of TestNRTThreads, but swapping in -// NRTManager for adding/updating/searching + private final ThreadLocal lastGens = new ThreadLocal(); + private boolean warmCalled; -public class TestNRTManager extends LuceneTestCase { - - private static class SubDocs { - public final String packID; - public final List subIDs; - public boolean deleted; - - public SubDocs(String packID, List subIDs) { - this.packID = packID; - this.subIDs = subIDs; - } - } - - // TODO: is there a pre-existing way to do this!!! - private Document cloneDoc(Document doc1) { - final Document doc2 = new Document(); - for(IndexableField f : doc1) { - Field field1 = (Field) f; - - Field field2 = new Field(field1.name(), - ((Field) f).fieldType(), - field1.stringValue()); - doc2.add(field2); - } - - return doc2; - } - - @Test public void testNRTManager() throws Exception { + runTest("TestNRTManager"); + } - final long t0 = System.currentTimeMillis(); - - if (CodecProvider.getDefault().getDefaultFieldCodec().equals("SimpleText")) { - // no - CodecProvider.getDefault().setDefaultFieldCodec("Standard"); + @Override + protected IndexSearcher getFinalSearcher() throws Exception { + if (VERBOSE) { + System.out.println("TEST: finalSearcher maxGen=" + maxGen); } + return nrt.get(maxGen, true); + } - final LineFileDocs docs = new LineFileDocs(random); - final File tempDir = _TestUtil.getTempDir("nrtopenfiles"); - final MockDirectoryWrapper _dir = newFSDirectory(tempDir); - _dir.setCheckIndexOnClose(false); // don't double-checkIndex, we do it ourselves - Directory dir = _dir; - final IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random)).setOpenMode(IndexWriterConfig.OpenMode.CREATE); - - if (LuceneTestCase.TEST_NIGHTLY) { - // newIWConfig makes smallish max seg size, which - // results in tons and tons of segments for this test - // when run nightly: - MergePolicy mp = conf.getMergePolicy(); - if (mp instanceof TieredMergePolicy) { - ((TieredMergePolicy) mp).setMaxMergedSegmentMB(5000.); - } else if (mp instanceof LogByteSizeMergePolicy) { - ((LogByteSizeMergePolicy) mp).setMaxMergeMB(1000.); - } else if (mp instanceof LogMergePolicy) { - ((LogMergePolicy) mp).setMaxMergeDocs(100000); - } - } - - conf.setMergedSegmentWarmer(new IndexWriter.IndexReaderWarmer() { - @Override - public void warm(IndexReader reader) throws IOException { - if (VERBOSE) { - System.out.println("TEST: now warm merged reader=" + reader); - } - final int maxDoc = reader.maxDoc(); - final Bits liveDocs = reader.getLiveDocs(); - int sum = 0; - final int inc = Math.max(1, maxDoc/50); - for(int docID=0;docID> docs) throws Exception { + final long gen = nrt.updateDocuments(id, docs); + + // Randomly verify the update "took": + if (random.nextInt(20) == 2) { + if (VERBOSE) { + System.out.println(Thread.currentThread().getName() + ": nrt: verify " + id); + } + final IndexSearcher s = nrt.get(gen, true); + if (VERBOSE) { + System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s); + } + try { + assertEquals(docs.size(), s.search(new TermQuery(id), 10).totalHits); + } finally { + nrt.release(s); + } } - final IndexWriter writer = new IndexWriter(dir, conf); - - if (VERBOSE) { - writer.setInfoStream(System.out); + lastGens.set(gen); + } + + @Override + protected void addDocuments(Term id, List> docs) throws Exception { + final long gen = nrt.addDocuments(docs); + // Randomly verify the add "took": + if (random.nextInt(20) == 2) { + if (VERBOSE) { + System.out.println(Thread.currentThread().getName() + ": nrt: verify " + id); + } + final IndexSearcher s = nrt.get(gen, false); + if (VERBOSE) { + System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s); + } + try { + assertEquals(docs.size(), s.search(new TermQuery(id), 10).totalHits); + } finally { + nrt.release(s); + } } - _TestUtil.reduceOpenFiles(writer); - //System.out.println("TEST: conf=" + writer.getConfig()); + lastGens.set(gen); + } - final ExecutorService es = random.nextBoolean() ? null : Executors.newCachedThreadPool(new NamedThreadFactory("NRT search threads")); + @Override + protected void addDocument(Term id, Iterable doc) throws Exception { + final long gen = nrt.addDocument(doc); + // Randomly verify the add "took": + if (random.nextInt(20) == 2) { + if (VERBOSE) { + System.out.println(Thread.currentThread().getName() + ": nrt: verify " + id); + } + final IndexSearcher s = nrt.get(gen, false); + if (VERBOSE) { + System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s); + } + try { + assertEquals(1, s.search(new TermQuery(id), 10).totalHits); + } finally { + nrt.release(s); + } + } + lastGens.set(gen); + } + + @Override + protected void updateDocument(Term id, Iterable doc) throws Exception { + final long gen = nrt.updateDocument(id, doc); + // Randomly verify the udpate "took": + if (random.nextInt(20) == 2) { + if (VERBOSE) { + System.out.println(Thread.currentThread().getName() + ": nrt: verify " + id); + } + final IndexSearcher s = nrt.get(gen, true); + if (VERBOSE) { + System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s); + } + try { + assertEquals(1, s.search(new TermQuery(id), 10).totalHits); + } finally { + nrt.release(s); + } + } + lastGens.set(gen); + } + + @Override + protected void deleteDocuments(Term id) throws Exception { + final long gen = nrt.deleteDocuments(id); + // randomly verify the delete "took": + if (random.nextInt(20) == 7) { + if (VERBOSE) { + System.out.println(Thread.currentThread().getName() + ": nrt: verify del " + id); + } + final IndexSearcher s = nrt.get(gen, true); + if (VERBOSE) { + System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s); + } + try { + assertEquals(0, s.search(new TermQuery(id), 10).totalHits); + } finally { + nrt.release(s); + } + } + lastGens.set(gen); + } + + private NRTManager nrt; + private NRTManagerReopenThread nrtThread; + + @Override + protected void doAfterWriter(ExecutorService es) throws Exception { final double minReopenSec = 0.01 + 0.05 * random.nextDouble(); final double maxReopenSec = minReopenSec * (1.0 + 10 * random.nextDouble()); @@ -173,506 +178,57 @@ public class TestNRTManager extends LuceneTestCase { System.out.println("TEST: make NRTManager maxReopenSec=" + maxReopenSec + " minReopenSec=" + minReopenSec); } - final NRTManager nrt = new NRTManager(writer, es); - final NRTManagerReopenThread nrtThread = new NRTManagerReopenThread(nrt, maxReopenSec, minReopenSec); + nrt = new NRTManager(writer, es, + new SearcherWarmer() { + @Override + public void warm(IndexSearcher s) throws IOException { + TestNRTManager.this.warmCalled = true; + s.search(new TermQuery(new Term("body", "united")), 10); + } + }); + nrtThread = new NRTManagerReopenThread(nrt, maxReopenSec, minReopenSec); nrtThread.setName("NRT Reopen Thread"); nrtThread.setPriority(Math.min(Thread.currentThread().getPriority()+2, Thread.MAX_PRIORITY)); nrtThread.setDaemon(true); nrtThread.start(); + } - final int NUM_INDEX_THREADS = _TestUtil.nextInt(random, 1, 3); - final int NUM_SEARCH_THREADS = _TestUtil.nextInt(random, 1, 3); - //final int NUM_INDEX_THREADS = 1; - //final int NUM_SEARCH_THREADS = 1; - if (VERBOSE) { - System.out.println("TEST: " + NUM_INDEX_THREADS + " index threads; " + NUM_SEARCH_THREADS + " search threads"); + @Override + protected void doAfterIndexingThreadDone() { + Long gen = lastGens.get(); + if (gen != null) { + addMaxGen(gen); } + } - final int RUN_TIME_SEC = LuceneTestCase.TEST_NIGHTLY ? 300 : RANDOM_MULTIPLIER; + private long maxGen = -1; - final AtomicBoolean failed = new AtomicBoolean(); - final AtomicInteger addCount = new AtomicInteger(); - final AtomicInteger delCount = new AtomicInteger(); - final AtomicInteger packCount = new AtomicInteger(); - final List lastGens = new ArrayList(); + private synchronized void addMaxGen(long gen) { + maxGen = Math.max(gen, maxGen); + } - final Set delIDs = Collections.synchronizedSet(new HashSet()); - final List allSubDocs = Collections.synchronizedList(new ArrayList()); + @Override + protected void doSearching(ExecutorService es, long stopTime) throws Exception { + runSearchThreads(stopTime); + } - final long stopTime = System.currentTimeMillis() + RUN_TIME_SEC*1000; - Thread[] threads = new Thread[NUM_INDEX_THREADS]; - for(int thread=0;thread toDeleteIDs = new ArrayList(); - final List toDeleteSubDocs = new ArrayList(); + @Override + protected IndexSearcher getCurrentSearcher() throws Exception { + return nrt.get(random.nextBoolean()); + } - long gen = 0; - while(System.currentTimeMillis() < stopTime && !failed.get()) { - - //System.out.println(Thread.currentThread().getName() + ": cycle"); - try { - // Occassional longish pause if running - // nightly - if (LuceneTestCase.TEST_NIGHTLY && random.nextInt(6) == 3) { - if (VERBOSE) { - System.out.println(Thread.currentThread().getName() + ": now long sleep"); - } - Thread.sleep(_TestUtil.nextInt(random, 50, 500)); - } - - // Rate limit ingest rate: - Thread.sleep(_TestUtil.nextInt(random, 1, 10)); - if (VERBOSE) { - System.out.println(Thread.currentThread() + ": done sleep"); - } - - Document doc = docs.nextDoc(); - if (doc == null) { - break; - } - final String addedField; - if (random.nextBoolean()) { - addedField = "extra" + random.nextInt(10); - doc.add(new TextField(addedField, "a random field")); - } else { - addedField = null; - } - if (random.nextBoolean()) { - - if (random.nextBoolean()) { - // Add a pack of adjacent sub-docs - final String packID; - final SubDocs delSubDocs; - if (toDeleteSubDocs.size() > 0 && random.nextBoolean()) { - delSubDocs = toDeleteSubDocs.get(random.nextInt(toDeleteSubDocs.size())); - assert !delSubDocs.deleted; - toDeleteSubDocs.remove(delSubDocs); - // reuse prior packID - packID = delSubDocs.packID; - } else { - delSubDocs = null; - // make new packID - packID = packCount.getAndIncrement() + ""; - } - - final Field packIDField = newField("packID", packID, StringField.TYPE_STORED); - final List docIDs = new ArrayList(); - final SubDocs subDocs = new SubDocs(packID, docIDs); - final List docsList = new ArrayList(); - - allSubDocs.add(subDocs); - doc.add(packIDField); - docsList.add(cloneDoc(doc)); - docIDs.add(doc.get("docid")); - - final int maxDocCount = _TestUtil.nextInt(random, 1, 10); - while(docsList.size() < maxDocCount) { - doc = docs.nextDoc(); - if (doc == null) { - break; - } - docsList.add(cloneDoc(doc)); - docIDs.add(doc.get("docid")); - } - addCount.addAndGet(docsList.size()); - - if (delSubDocs != null) { - delSubDocs.deleted = true; - delIDs.addAll(delSubDocs.subIDs); - delCount.addAndGet(delSubDocs.subIDs.size()); - if (VERBOSE) { - System.out.println("TEST: update pack packID=" + delSubDocs.packID + " count=" + docsList.size() + " docs=" + docIDs); - } - gen = nrt.updateDocuments(new Term("packID", delSubDocs.packID), docsList); - /* - // non-atomic: - nrt.deleteDocuments(new Term("packID", delSubDocs.packID)); - for(Document subDoc : docsList) { - nrt.addDocument(subDoc); - } - */ - } else { - if (VERBOSE) { - System.out.println("TEST: add pack packID=" + packID + " count=" + docsList.size() + " docs=" + docIDs); - } - gen = nrt.addDocuments(docsList); - - /* - // non-atomic: - for(Document subDoc : docsList) { - nrt.addDocument(subDoc); - } - */ - } - doc.removeField("packID"); - - if (random.nextInt(5) == 2) { - if (VERBOSE) { - System.out.println(Thread.currentThread().getName() + ": buffer del id:" + packID); - } - toDeleteSubDocs.add(subDocs); - } - - // randomly verify the add/update "took": - if (random.nextInt(20) == 2) { - final boolean applyDeletes = delSubDocs != null; - final IndexSearcher s = nrt.get(gen, applyDeletes); - try { - assertEquals(docsList.size(), s.search(new TermQuery(new Term("packID", packID)), 10).totalHits); - } finally { - nrt.release(s); - } - } - - } else { - if (VERBOSE) { - System.out.println(Thread.currentThread().getName() + ": add doc docid:" + doc.get("docid")); - } - - gen = nrt.addDocument(doc); - addCount.getAndIncrement(); - - // randomly verify the add "took": - if (random.nextInt(20) == 2) { - //System.out.println(Thread.currentThread().getName() + ": verify"); - final IndexSearcher s = nrt.get(gen, false); - //System.out.println(Thread.currentThread().getName() + ": got s=" + s); - try { - assertEquals(1, s.search(new TermQuery(new Term("docid", doc.get("docid"))), 10).totalHits); - } finally { - nrt.release(s); - } - //System.out.println(Thread.currentThread().getName() + ": done verify"); - } - - if (random.nextInt(5) == 3) { - if (VERBOSE) { - System.out.println(Thread.currentThread().getName() + ": buffer del id:" + doc.get("docid")); - } - toDeleteIDs.add(doc.get("docid")); - } - } - } else { - // we use update but it never replaces a - // prior doc - if (VERBOSE) { - System.out.println(Thread.currentThread().getName() + ": update doc id:" + doc.get("docid")); - } - gen = nrt.updateDocument(new Term("docid", doc.get("docid")), doc); - addCount.getAndIncrement(); - - // randomly verify the add "took": - if (random.nextInt(20) == 2) { - final IndexSearcher s = nrt.get(gen, true); - try { - assertEquals(1, s.search(new TermQuery(new Term("docid", doc.get("docid"))), 10).totalHits); - } finally { - nrt.release(s); - } - } - - if (random.nextInt(5) == 3) { - if (VERBOSE) { - System.out.println(Thread.currentThread().getName() + ": buffer del id:" + doc.get("docid")); - } - toDeleteIDs.add(doc.get("docid")); - } - } - - if (random.nextInt(30) == 17) { - if (VERBOSE) { - System.out.println(Thread.currentThread().getName() + ": apply " + toDeleteIDs.size() + " deletes"); - } - for(String id : toDeleteIDs) { - if (VERBOSE) { - System.out.println(Thread.currentThread().getName() + ": del term=id:" + id); - } - gen = nrt.deleteDocuments(new Term("docid", id)); - - // randomly verify the delete "took": - if (random.nextInt(20) == 7) { - final IndexSearcher s = nrt.get(gen, true); - try { - assertEquals(0, s.search(new TermQuery(new Term("docid", id)), 10).totalHits); - } finally { - nrt.release(s); - } - } - } - - final int count = delCount.addAndGet(toDeleteIDs.size()); - if (VERBOSE) { - System.out.println(Thread.currentThread().getName() + ": tot " + count + " deletes"); - } - delIDs.addAll(toDeleteIDs); - toDeleteIDs.clear(); - - for(SubDocs subDocs : toDeleteSubDocs) { - assertTrue(!subDocs.deleted); - gen = nrt.deleteDocuments(new Term("packID", subDocs.packID)); - subDocs.deleted = true; - if (VERBOSE) { - System.out.println(" del subs: " + subDocs.subIDs + " packID=" + subDocs.packID); - } - delIDs.addAll(subDocs.subIDs); - delCount.addAndGet(subDocs.subIDs.size()); - - // randomly verify the delete "took": - if (random.nextInt(20) == 7) { - final IndexSearcher s = nrt.get(gen, true); - try { - assertEquals(0, s.search(new TermQuery(new Term("packID", subDocs.packID)), 1).totalHits); - } finally { - nrt.release(s); - } - } - } - toDeleteSubDocs.clear(); - } - if (addedField != null) { - doc.removeField(addedField); - } - } catch (Throwable t) { - System.out.println(Thread.currentThread().getName() + ": FAILED: hit exc"); - t.printStackTrace(); - failed.set(true); - throw new RuntimeException(t); - } - } - - lastGens.add(gen); - if (VERBOSE) { - System.out.println(Thread.currentThread().getName() + ": indexing done"); - } - } - }; - threads[thread].setDaemon(true); - threads[thread].start(); - } - - if (VERBOSE) { - System.out.println("TEST: DONE start indexing threads [" + (System.currentTimeMillis()-t0) + " ms]"); - } - - // let index build up a bit - Thread.sleep(100); - - // silly starting guess: - final AtomicInteger totTermCount = new AtomicInteger(100); - - // run search threads - final Thread[] searchThreads = new Thread[NUM_SEARCH_THREADS]; - final AtomicInteger totHits = new AtomicInteger(); - - if (VERBOSE) { - System.out.println("TEST: start search threads"); - } - - for(int thread=0;thread 0) { - Fields fields = MultiFields.getFields(s.getIndexReader()); - if (fields == null) { - continue; - } - Terms terms = fields.terms("body"); - if (terms == null) { - continue; - } - - TermsEnum termsEnum = terms.iterator(); - int seenTermCount = 0; - int shift; - int trigger; - if (totTermCount.get() < 10) { - shift = 0; - trigger = 1; - } else { - trigger = totTermCount.get()/10; - shift = random.nextInt(trigger); - } - - while(System.currentTimeMillis() < stopTime) { - BytesRef term = termsEnum.next(); - if (term == null) { - if (seenTermCount == 0) { - break; - } - totTermCount.set(seenTermCount); - seenTermCount = 0; - if (totTermCount.get() < 10) { - shift = 0; - trigger = 1; - } else { - trigger = totTermCount.get()/10; - //System.out.println("trigger " + trigger); - shift = random.nextInt(trigger); - } - termsEnum.seekCeil(new BytesRef("")); - continue; - } - seenTermCount++; - // search 10 terms - if (trigger == 0) { - trigger = 1; - } - if ((seenTermCount + shift) % trigger == 0) { - //if (VERBOSE) { - //System.out.println(Thread.currentThread().getName() + " now search body:" + term.utf8ToString()); - //} - totHits.addAndGet(runQuery(s, new TermQuery(new Term("body", term)))); - } - } - if (VERBOSE) { - System.out.println(Thread.currentThread().getName() + ": search done"); - } - } - } finally { - nrt.release(s); - } - } catch (Throwable t) { - System.out.println(Thread.currentThread().getName() + ": FAILED: hit exc"); - failed.set(true); - t.printStackTrace(System.out); - throw new RuntimeException(t); - } - } - } - }; - searchThreads[thread].setDaemon(true); - searchThreads[thread].start(); - } - - if (VERBOSE) { - System.out.println("TEST: now join"); - } - for(int thread=0;thread subIDs; + public boolean deleted; + + public SubDocs(String packID, List subIDs) { + this.packID = packID; + this.subIDs = subIDs; + } + } + + // Called per-search + protected abstract IndexSearcher getCurrentSearcher() throws Exception; + + protected abstract IndexSearcher getFinalSearcher() throws Exception; + + protected void releaseSearcher(IndexSearcher s) throws Exception { + } + + // Called once to run searching + protected abstract void doSearching(ExecutorService es, long stopTime) throws Exception; + + protected Directory getDirectory(Directory in) { + return in; + } + + protected void updateDocuments(Term id, List> docs) throws Exception { + writer.updateDocuments(id, docs); + } + + protected void addDocuments(Term id, List> docs) throws Exception { + writer.addDocuments(docs); + } + + protected void addDocument(Term id, Iterable doc) throws Exception { + writer.addDocument(doc); + } + + protected void updateDocument(Term term, Iterable doc) throws Exception { + writer.updateDocument(term, doc); + } + + protected void deleteDocuments(Term term) throws Exception { + writer.deleteDocuments(term); + } + + protected void doAfterIndexingThreadDone() { + } + + private Thread[] launchIndexingThreads(final LineFileDocs docs, + int numThreads, + final long stopTime, + final Set delIDs, + final Set delPackIDs, + final List allSubDocs) + throws Exception { + final Thread[] threads = new Thread[numThreads]; + for(int thread=0;thread toDeleteIDs = new ArrayList(); + final List toDeleteSubDocs = new ArrayList(); + while(System.currentTimeMillis() < stopTime && !failed.get()) { + try { + + // Occasional longish pause if running + // nightly + if (LuceneTestCase.TEST_NIGHTLY && random.nextInt(6) == 3) { + if (VERBOSE) { + System.out.println(Thread.currentThread().getName() + ": now long sleep"); + } + Thread.sleep(_TestUtil.nextInt(random, 50, 500)); + } + + // Rate limit ingest rate: + if (random.nextInt(7) == 5) { + Thread.sleep(_TestUtil.nextInt(random, 1, 10)); + if (VERBOSE) { + System.out.println(Thread.currentThread().getName() + ": done sleep"); + } + } + + Document doc = docs.nextDoc(); + if (doc == null) { + break; + } + + // Maybe add randomly named field + final String addedField; + if (random.nextBoolean()) { + addedField = "extra" + random.nextInt(40); + doc.add(newField(addedField, "a random field", TextField.TYPE_STORED)); + } else { + addedField = null; + } + + if (random.nextBoolean()) { + + if (random.nextBoolean()) { + // Add/update doc block: + final String packID; + final SubDocs delSubDocs; + if (toDeleteSubDocs.size() > 0 && random.nextBoolean()) { + delSubDocs = toDeleteSubDocs.get(random.nextInt(toDeleteSubDocs.size())); + assert !delSubDocs.deleted; + toDeleteSubDocs.remove(delSubDocs); + // Update doc block, replacing prior packID + packID = delSubDocs.packID; + } else { + delSubDocs = null; + // Add doc block, using new packID + packID = packCount.getAndIncrement() + ""; + } + + final Field packIDField = newField("packID", packID, StringField.TYPE_STORED); + final List docIDs = new ArrayList(); + final SubDocs subDocs = new SubDocs(packID, docIDs); + final List docsList = new ArrayList(); + + allSubDocs.add(subDocs); + doc.add(packIDField); + docsList.add(_TestUtil.cloneDocument(doc)); + docIDs.add(doc.get("docid")); + + final int maxDocCount = _TestUtil.nextInt(random, 1, 10); + while(docsList.size() < maxDocCount) { + doc = docs.nextDoc(); + if (doc == null) { + break; + } + docsList.add(_TestUtil.cloneDocument(doc)); + docIDs.add(doc.get("docid")); + } + addCount.addAndGet(docsList.size()); + + final Term packIDTerm = new Term("packID", packID); + + if (delSubDocs != null) { + delSubDocs.deleted = true; + delIDs.addAll(delSubDocs.subIDs); + delCount.addAndGet(delSubDocs.subIDs.size()); + if (VERBOSE) { + System.out.println(Thread.currentThread().getName() + ": update pack packID=" + delSubDocs.packID + " count=" + docsList.size() + " docs=" + docIDs); + } + updateDocuments(packIDTerm, docsList); + } else { + if (VERBOSE) { + System.out.println(Thread.currentThread().getName() + ": add pack packID=" + packID + " count=" + docsList.size() + " docs=" + docIDs); + } + addDocuments(packIDTerm, docsList); + } + doc.removeField("packID"); + + if (random.nextInt(5) == 2) { + if (VERBOSE) { + System.out.println(Thread.currentThread().getName() + ": buffer del id:" + packID); + } + toDeleteSubDocs.add(subDocs); + } + + } else { + // Add single doc + final String docid = doc.get("docid"); + if (VERBOSE) { + System.out.println(Thread.currentThread().getName() + ": add doc docid:" + docid); + } + addDocument(new Term("docid", docid), doc); + addCount.getAndIncrement(); + + if (random.nextInt(5) == 3) { + if (VERBOSE) { + System.out.println(Thread.currentThread().getName() + ": buffer del id:" + doc.get("docid")); + } + toDeleteIDs.add(docid); + } + } + } else { + + // Update single doc, but we never re-use + // and ID so the delete will never + // actually happen: + if (VERBOSE) { + System.out.println(Thread.currentThread().getName() + ": update doc id:" + doc.get("docid")); + } + final String docid = doc.get("docid"); + updateDocument(new Term("docid", docid), doc); + addCount.getAndIncrement(); + + if (random.nextInt(5) == 3) { + if (VERBOSE) { + System.out.println(Thread.currentThread().getName() + ": buffer del id:" + doc.get("docid")); + } + toDeleteIDs.add(docid); + } + } + + if (random.nextInt(30) == 17) { + if (VERBOSE) { + System.out.println(Thread.currentThread().getName() + ": apply " + toDeleteIDs.size() + " deletes"); + } + for(String id : toDeleteIDs) { + if (VERBOSE) { + System.out.println(Thread.currentThread().getName() + ": del term=id:" + id); + } + deleteDocuments(new Term("docid", id)); + } + final int count = delCount.addAndGet(toDeleteIDs.size()); + if (VERBOSE) { + System.out.println(Thread.currentThread().getName() + ": tot " + count + " deletes"); + } + delIDs.addAll(toDeleteIDs); + toDeleteIDs.clear(); + + for(SubDocs subDocs : toDeleteSubDocs) { + assert !subDocs.deleted; + delPackIDs.add(subDocs.packID); + deleteDocuments(new Term("packID", subDocs.packID)); + subDocs.deleted = true; + if (VERBOSE) { + System.out.println(Thread.currentThread().getName() + ": del subs: " + subDocs.subIDs + " packID=" + subDocs.packID); + } + delIDs.addAll(subDocs.subIDs); + delCount.addAndGet(subDocs.subIDs.size()); + } + toDeleteSubDocs.clear(); + } + if (addedField != null) { + doc.removeField(addedField); + } + } catch (Throwable t) { + System.out.println(Thread.currentThread().getName() + ": hit exc"); + t.printStackTrace(); + failed.set(true); + throw new RuntimeException(t); + } + } + if (VERBOSE) { + System.out.println(Thread.currentThread().getName() + ": indexing done"); + } + + doAfterIndexingThreadDone(); + } + }; + threads[thread].setDaemon(true); + threads[thread].start(); + } + + return threads; + } + + protected void runSearchThreads(final long stopTimeMS) throws Exception { + final int numThreads = _TestUtil.nextInt(random, 1, 5); + final Thread[] searchThreads = new Thread[numThreads]; + final AtomicInteger totHits = new AtomicInteger(); + + // silly starting guess: + final AtomicInteger totTermCount = new AtomicInteger(100); + + // TODO: we should enrich this to do more interesting searches + for(int thread=0;thread 0) { + smokeTestSearcher(s); + Fields fields = MultiFields.getFields(s.getIndexReader()); + if (fields == null) { + continue; + } + Terms terms = fields.terms("body"); + if (terms == null) { + continue; + } + TermsEnum termsEnum = terms.iterator(); + int seenTermCount = 0; + int shift; + int trigger; + if (totTermCount.get() < 10) { + shift = 0; + trigger = 1; + } else { + trigger = totTermCount.get()/10; + shift = random.nextInt(trigger); + } + BytesRef term = termsEnum.next(); + if (term == null) { + if (seenTermCount == 0) { + break; + } + totTermCount.set(seenTermCount); + seenTermCount = 0; + if (totTermCount.get() < 10) { + shift = 0; + trigger = 1; + } else { + trigger = totTermCount.get()/10; + //System.out.println("trigger " + trigger); + shift = random.nextInt(trigger); + } + termsEnum.seekCeil(new BytesRef("")); + continue; + } + seenTermCount++; + // search 10 terms + if (trigger == 0) { + trigger = 1; + } + if ((seenTermCount + shift) % trigger == 0) { + //if (VERBOSE) { + //System.out.println(Thread.currentThread().getName() + " now search body:" + term.utf8ToString()); + //} + totHits.addAndGet(runQuery(s, new TermQuery(new Term("body", term)))); + } + //if (VERBOSE) { + //System.out.println(Thread.currentThread().getName() + ": search done"); + //} + } + } finally { + releaseSearcher(s); + } + } catch (Throwable t) { + System.out.println(Thread.currentThread().getName() + ": hit exc"); + failed.set(true); + t.printStackTrace(System.out); + throw new RuntimeException(t); + } + } + } + }; + searchThreads[thread].setDaemon(true); + searchThreads[thread].start(); + } + + for(int thread=0;thread delIDs = Collections.synchronizedSet(new HashSet()); + final Set delPackIDs = Collections.synchronizedSet(new HashSet()); + final List allSubDocs = Collections.synchronizedList(new ArrayList()); + + final long stopTime = System.currentTimeMillis() + RUN_TIME_SEC*1000; + + final Thread[] indexThreads = launchIndexingThreads(docs, NUM_INDEX_THREADS, stopTime, delIDs, delPackIDs, allSubDocs); + + if (VERBOSE) { + System.out.println("TEST: DONE start indexing threads [" + (System.currentTimeMillis()-t0) + " ms]"); + } + + // Let index build up a bit + Thread.sleep(100); + + doSearching(es, stopTime); + + if (VERBOSE) { + System.out.println("TEST: all searching done [" + (System.currentTimeMillis()-t0) + " ms]"); + } + + for(int thread=0;thread version); + + w.deleteDocuments(new Term("id", "0")); + r = w.getReader(); + w.close(); + long version3 = r.getVersion(); + r.close(); + assert(version3 > version2); + d.close(); + } } diff --git a/lucene/src/test/org/apache/lucene/index/TestNRTThreads.java b/lucene/src/test/org/apache/lucene/index/TestNRTThreads.java index 2a49e3925a5..975156f90c2 100644 --- a/lucene/src/test/org/apache/lucene/index/TestNRTThreads.java +++ b/lucene/src/test/org/apache/lucene/index/TestNRTThreads.java @@ -17,329 +17,26 @@ package org.apache.lucene.index; * limitations under the License. */ -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; import java.util.Set; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import org.apache.lucene.analysis.MockAnalyzer; -import org.apache.lucene.document.Document; -import org.apache.lucene.document.Field; -import org.apache.lucene.document.StringField; -import org.apache.lucene.document.TextField; -import org.apache.lucene.index.codecs.CodecProvider; import org.apache.lucene.search.IndexSearcher; -import org.apache.lucene.search.PhraseQuery; -import org.apache.lucene.search.Query; -import org.apache.lucene.search.ScoreDoc; -import org.apache.lucene.search.Sort; -import org.apache.lucene.search.SortField; -import org.apache.lucene.search.TermQuery; -import org.apache.lucene.search.TopDocs; import org.apache.lucene.store.MockDirectoryWrapper; -import org.apache.lucene.util.Bits; -import org.apache.lucene.util.BytesRef; -import org.apache.lucene.util.LineFileDocs; -import org.apache.lucene.util.LuceneTestCase; -import org.apache.lucene.util.NamedThreadFactory; -import org.apache.lucene.util._TestUtil; -import org.junit.Test; // TODO // - mix in optimize, addIndexes // - randomoly mix in non-congruent docs -public class TestNRTThreads extends LuceneTestCase { +public class TestNRTThreads extends ThreadedIndexingAndSearchingTestCase { + + @Override + protected void doSearching(ExecutorService es, long stopTime) throws Exception { - private static class SubDocs { - public final String packID; - public final List subIDs; - public boolean deleted; - - public SubDocs(String packID, List subIDs) { - this.packID = packID; - this.subIDs = subIDs; - } - } - - @Test - public void testNRTThreads() throws Exception { - - final long t0 = System.currentTimeMillis(); - - final String defaultCodec = CodecProvider.getDefault().getDefaultFieldCodec(); - if (defaultCodec.equals("SimpleText") || defaultCodec.equals("Memory")) { - // no - CodecProvider.getDefault().setDefaultFieldCodec("Standard"); - } - - final LineFileDocs docs = new LineFileDocs(random); - final File tempDir = _TestUtil.getTempDir("nrtopenfiles"); - final MockDirectoryWrapper dir = newFSDirectory(tempDir); - dir.setCheckIndexOnClose(false); // don't double-checkIndex, we do it ourselves. - final IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random)); - - if (LuceneTestCase.TEST_NIGHTLY) { - // newIWConfig makes smallish max seg size, which - // results in tons and tons of segments for this test - // when run nightly: - MergePolicy mp = conf.getMergePolicy(); - if (mp instanceof TieredMergePolicy) { - ((TieredMergePolicy) mp).setMaxMergedSegmentMB(5000.); - } else if (mp instanceof LogByteSizeMergePolicy) { - ((LogByteSizeMergePolicy) mp).setMaxMergeMB(1000.); - } else if (mp instanceof LogMergePolicy) { - ((LogMergePolicy) mp).setMaxMergeDocs(100000); - } - } - - conf.setMergedSegmentWarmer(new IndexWriter.IndexReaderWarmer() { - @Override - public void warm(IndexReader reader) throws IOException { - if (VERBOSE) { - System.out.println("TEST: now warm merged reader=" + reader); - } - final int maxDoc = reader.maxDoc(); - final Bits liveDocs = reader.getLiveDocs(); - int sum = 0; - final int inc = Math.max(1, maxDoc/50); - for(int docID=0;docID delIDs = Collections.synchronizedSet(new HashSet()); - final List allSubDocs = Collections.synchronizedList(new ArrayList()); - - final long stopTime = System.currentTimeMillis() + RUN_TIME_SEC*1000; - Thread[] threads = new Thread[NUM_INDEX_THREADS]; - for(int thread=0;thread toDeleteIDs = new ArrayList(); - final List toDeleteSubDocs = new ArrayList(); - while(System.currentTimeMillis() < stopTime && !failed.get()) { - try { - Document doc = docs.nextDoc(); - if (doc == null) { - break; - } - final String addedField; - if (random.nextBoolean()) { - addedField = "extra" + random.nextInt(10); - doc.add(new TextField(addedField, "a random field")); - } else { - addedField = null; - } - if (random.nextBoolean()) { - if (VERBOSE) { - //System.out.println(Thread.currentThread().getName() + ": add doc id:" + doc.get("docid")); - } - - if (random.nextBoolean()) { - // Add a pack of adjacent sub-docs - final String packID; - final SubDocs delSubDocs; - if (toDeleteSubDocs.size() > 0 && random.nextBoolean()) { - delSubDocs = toDeleteSubDocs.get(random.nextInt(toDeleteSubDocs.size())); - assert !delSubDocs.deleted; - toDeleteSubDocs.remove(delSubDocs); - // reuse prior packID - packID = delSubDocs.packID; - } else { - delSubDocs = null; - // make new packID - packID = packCount.getAndIncrement() + ""; - } - - final Field packIDField = newField("packID", packID, StringField.TYPE_STORED); - final List docIDs = new ArrayList(); - final SubDocs subDocs = new SubDocs(packID, docIDs); - final List docsList = new ArrayList(); - - allSubDocs.add(subDocs); - doc.add(packIDField); - docsList.add(_TestUtil.cloneDocument(doc)); - docIDs.add(doc.get("docid")); - - final int maxDocCount = _TestUtil.nextInt(random, 1, 10); - while(docsList.size() < maxDocCount) { - doc = docs.nextDoc(); - if (doc == null) { - break; - } - docsList.add(_TestUtil.cloneDocument(doc)); - docIDs.add(doc.get("docid")); - } - addCount.addAndGet(docsList.size()); - - if (delSubDocs != null) { - delSubDocs.deleted = true; - delIDs.addAll(delSubDocs.subIDs); - delCount.addAndGet(delSubDocs.subIDs.size()); - if (VERBOSE) { - System.out.println("TEST: update pack packID=" + delSubDocs.packID + " count=" + docsList.size() + " docs=" + docIDs); - } - writer.updateDocuments(new Term("packID", delSubDocs.packID), docsList); - /* - // non-atomic: - writer.deleteDocuments(new Term("packID", delSubDocs.packID)); - for(Document subDoc : docsList) { - writer.addDocument(subDoc); - } - */ - } else { - if (VERBOSE) { - System.out.println("TEST: add pack packID=" + packID + " count=" + docsList.size() + " docs=" + docIDs); - } - writer.addDocuments(docsList); - - /* - // non-atomic: - for(Document subDoc : docsList) { - writer.addDocument(subDoc); - } - */ - } - doc.removeField("packID"); - - if (random.nextInt(5) == 2) { - if (VERBOSE) { - //System.out.println(Thread.currentThread().getName() + ": buffer del id:" + packID); - } - toDeleteSubDocs.add(subDocs); - } - - } else { - writer.addDocument(doc); - addCount.getAndIncrement(); - - if (random.nextInt(5) == 3) { - if (VERBOSE) { - //System.out.println(Thread.currentThread().getName() + ": buffer del id:" + doc.get("docid")); - } - toDeleteIDs.add(doc.get("docid")); - } - } - } else { - // we use update but it never replaces a - // prior doc - if (VERBOSE) { - //System.out.println(Thread.currentThread().getName() + ": update doc id:" + doc.get("docid")); - } - writer.updateDocument(new Term("docid", doc.get("docid")), doc); - addCount.getAndIncrement(); - - if (random.nextInt(5) == 3) { - if (VERBOSE) { - //System.out.println(Thread.currentThread().getName() + ": buffer del id:" + doc.get("docid")); - } - toDeleteIDs.add(doc.get("docid")); - } - } - - if (random.nextInt(30) == 17) { - if (VERBOSE) { - //System.out.println(Thread.currentThread().getName() + ": apply " + toDeleteIDs.size() + " deletes"); - } - for(String id : toDeleteIDs) { - if (VERBOSE) { - //System.out.println(Thread.currentThread().getName() + ": del term=id:" + id); - } - writer.deleteDocuments(new Term("docid", id)); - } - final int count = delCount.addAndGet(toDeleteIDs.size()); - if (VERBOSE) { - //System.out.println(Thread.currentThread().getName() + ": tot " + count + " deletes"); - } - delIDs.addAll(toDeleteIDs); - toDeleteIDs.clear(); - - for(SubDocs subDocs : toDeleteSubDocs) { - assert !subDocs.deleted; - writer.deleteDocuments(new Term("packID", subDocs.packID)); - subDocs.deleted = true; - if (VERBOSE) { - System.out.println(" del subs: " + subDocs.subIDs + " packID=" + subDocs.packID); - } - delIDs.addAll(subDocs.subIDs); - delCount.addAndGet(subDocs.subIDs.size()); - } - toDeleteSubDocs.clear(); - } - if (addedField != null) { - doc.removeField(addedField); - } - } catch (Throwable t) { - System.out.println(Thread.currentThread().getName() + ": hit exc"); - t.printStackTrace(); - failed.set(true); - throw new RuntimeException(t); - } - } - if (VERBOSE) { - System.out.println(Thread.currentThread().getName() + ": indexing done"); - } - } - }; - threads[thread].setDaemon(true); - threads[thread].start(); - } - - if (VERBOSE) { - System.out.println("TEST: DONE start indexing threads [" + (System.currentTimeMillis()-t0) + " ms]"); - } - - // let index build up a bit - Thread.sleep(100); + boolean anyOpenDelFiles = false; IndexReader r = IndexReader.open(writer, true); - boolean any = false; - // silly starting guess: - final AtomicInteger totTermCount = new AtomicInteger(100); - - final ExecutorService es = Executors.newCachedThreadPool(new NamedThreadFactory("NRT search threads")); - - while(System.currentTimeMillis() < stopTime && !failed.get()) { + while (System.currentTimeMillis() < stopTime && !failed.get()) { if (random.nextBoolean()) { if (VERBOSE) { System.out.println("TEST: now reopen r=" + r); @@ -355,11 +52,11 @@ public class TestNRTThreads extends LuceneTestCase { } r.close(); writer.commit(); - final Set openDeletedFiles = dir.getOpenDeletedFiles(); + final Set openDeletedFiles = ((MockDirectoryWrapper) dir).getOpenDeletedFiles(); if (openDeletedFiles.size() > 0) { System.out.println("OBD files: " + openDeletedFiles); } - any |= openDeletedFiles.size() > 0; + anyOpenDelFiles |= openDeletedFiles.size() > 0; //assertEquals("open but deleted: " + openDeletedFiles, 0, openDeletedFiles.size()); if (VERBOSE) { System.out.println("TEST: now open"); @@ -372,203 +69,52 @@ public class TestNRTThreads extends LuceneTestCase { //System.out.println("numDocs=" + r.numDocs() + " //openDelFileCount=" + dir.openDeleteFileCount()); - smokeTestReader(r); - if (r.numDocs() > 0) { - - final IndexSearcher s = new IndexSearcher(r, es); - - // run search threads - final long searchStopTime = System.currentTimeMillis() + 500; - final Thread[] searchThreads = new Thread[NUM_SEARCH_THREADS]; - final AtomicInteger totHits = new AtomicInteger(); - for(int thread=0;thread 0; + anyOpenDelFiles |= openDeletedFiles.size() > 0; - assertFalse("saw non-zero open-but-deleted count", any); - if (VERBOSE) { - System.out.println("TEST: now join"); - } - for(int thread=0;thread