From 3c91517add263dcc2cada194f9545b573b920daf Mon Sep 17 00:00:00 2001 From: Michael McCandless Date: Thu, 9 Apr 2009 17:17:46 +0000 Subject: [PATCH] LUCENE-1516: add near real-time search to IndexWriter git-svn-id: https://svn.apache.org/repos/asf/lucene/java/trunk@763737 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 7 + .../benchmark/byTask/feeds/LineDocMaker.java | 29 + .../byTask/tasks/NearRealtimeReaderTask.java | 134 +++ .../benchmark/byTask/tasks/PerfTask.java | 3 + .../benchmark/byTask/tasks/TaskSequence.java | 7 + .../benchmark/byTask/tasks/UpdateDocTask.java | 110 +++ .../benchmark/byTask/utils/Algorithm.java | 6 +- .../lucene/index/DirectoryIndexReader.java | 29 +- .../apache/lucene/index/DocumentsWriter.java | 22 +- .../apache/lucene/index/IndexFileDeleter.java | 16 +- .../org/apache/lucene/index/IndexReader.java | 5 + .../org/apache/lucene/index/IndexWriter.java | 570 ++++++++++--- .../org/apache/lucene/index/MergePolicy.java | 4 +- .../lucene/index/MultiSegmentReader.java | 65 +- .../index/ReadOnlyMultiSegmentReader.java | 6 +- .../org/apache/lucene/index/SegmentInfo.java | 10 + .../apache/lucene/index/SegmentMergeInfo.java | 25 +- .../apache/lucene/index/SegmentMerger.java | 6 +- .../apache/lucene/index/SegmentReader.java | 94 ++- .../lucene/TestSnapshotDeletionPolicy.java | 1 + .../lucene/index/TestIndexFileDeleter.java | 33 +- .../apache/lucene/index/TestIndexReader.java | 2 +- .../lucene/index/TestIndexReaderReopen.java | 2 +- .../lucene/index/TestIndexWriterReader.java | 793 ++++++++++++++++++ .../lucene/index/TestStressIndexing2.java | 94 ++- .../apache/lucene/store/MockRAMDirectory.java | 4 +- .../lucene/store/MockRAMOutputStream.java | 8 +- 27 files changed, 1885 insertions(+), 200 deletions(-) create mode 100644 contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/NearRealtimeReaderTask.java create mode 100644 contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/UpdateDocTask.java create mode 100644 src/test/org/apache/lucene/index/TestIndexWriterReader.java diff --git a/CHANGES.txt b/CHANGES.txt index 081d6752710..403c0b3084d 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -216,6 +216,13 @@ New features 19. LUCENE-1586: Add IndexReader.getUniqueTermCount(). (Mike McCandless via Derek) +20. LUCENE-1516: Added "near real-time search" to IndexWriter, via a + new expert getReader() method. This method returns a reader that + searches the full index, including any uncommitted changes in the + current IndexWriter session. This should result in a faster + turnaround than the normal approach of commiting the changes and + then reopening a reader. (Jason Rutherglen via Mike McCandless) + Optimizations 1. LUCENE-1427: Fixed QueryWrapperFilter to not waste time computing diff --git a/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/feeds/LineDocMaker.java b/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/feeds/LineDocMaker.java index 0625b2542de..74df8483210 100644 --- a/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/feeds/LineDocMaker.java +++ b/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/feeds/LineDocMaker.java @@ -27,6 +27,8 @@ import java.io.BufferedReader; import java.io.IOException; import java.io.FileInputStream; import java.io.InputStreamReader; +import java.util.Random; + /** * A DocMaker reading one line at a time as a Document from @@ -38,6 +40,11 @@ import java.io.InputStreamReader; * Config properties: * docs.file=<path to the file%gt; * doc.reuse.fields=true|false (default true) + * doc.random.id.limit=N (default -1) -- create random + * docid in the range 0..N; this is useful + * with UpdateDoc to test updating random documents; if + * this is unspecified or -1, then docid is sequentially + * assigned */ public class LineDocMaker extends BasicDocMaker { @@ -50,6 +57,8 @@ public class LineDocMaker extends BasicDocMaker { private final DocState localDocState = new DocState(); private boolean doReuseFields = true; + private Random r; + private int numDocs; class DocState { Document doc; @@ -86,6 +95,11 @@ public class LineDocMaker extends BasicDocMaker { final static String SEP = WriteLineDocTask.SEP; + private int numDocsCreated; + private synchronized int incrNumDocsCreated() { + return numDocsCreated++; + } + public Document setFields(String line) { // title date body final String title, date, body; @@ -102,12 +116,22 @@ public class LineDocMaker extends BasicDocMaker { } else title = date = body = ""; + final String docID; + if (r != null) { + docID = "doc" + r.nextInt(numDocs); + } else { + docID = "doc" + incrNumDocsCreated(); + } + if (doReuseFields) { + idField.setValue(docID); titleField.setValue(title); dateField.setValue(date); bodyField.setValue(body); return doc; } else { + Field localIDField = new Field(BasicDocMaker.ID_FIELD, docID, Field.Store.YES, Field.Index.NOT_ANALYZED_NO_NORMS); + Field localTitleField = new Field(BasicDocMaker.TITLE_FIELD, title, storeVal, @@ -124,6 +148,7 @@ public class LineDocMaker extends BasicDocMaker { Field.Index.ANALYZED, termVecVal); Document localDoc = new Document(); + localDoc.add(localIDField); localDoc.add(localBodyField); localDoc.add(localTitleField); localDoc.add(localDateField); @@ -183,6 +208,10 @@ public class LineDocMaker extends BasicDocMaker { public void setConfig(Config config) { super.setConfig(config); doReuseFields = config.get("doc.reuse.fields", true); + numDocs = config.get("doc.random.id.limit", -1); + if (numDocs != -1) { + r = new Random(179); + } } synchronized void openFile() { diff --git a/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/NearRealtimeReaderTask.java b/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/NearRealtimeReaderTask.java new file mode 100644 index 00000000000..821526bc59f --- /dev/null +++ b/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/NearRealtimeReaderTask.java @@ -0,0 +1,134 @@ +package org.apache.lucene.benchmark.byTask.tasks; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.IOException; + +import org.apache.lucene.benchmark.byTask.PerfRunData; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.TermQuery; +import org.apache.lucene.search.Sort; +import org.apache.lucene.search.SortField; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.TopFieldDocs; +import org.apache.lucene.index.Term; + +/** + * Spawns a BG thread that periodically (defaults to 3.0 + * seconds, but accepts param in seconds) wakes up and asks + * IndexWriter for a near real-time reader. Then runs a + * single query (body: 1) sorted by docdate, and prints + * time to reopen and time to run the search. + * + * NOTE: this is very experimental at this point, and + * subject to change. It's also not generally usable, eg + * you cannot change which query is executed. + */ +public class NearRealtimeReaderTask extends PerfTask { + + ReopenThread t; + float pauseSec = 3.0f; + + private static class ReopenThread extends Thread { + + final IndexWriter writer; + final int pauseMsec; + + public volatile boolean done; + + ReopenThread(IndexWriter writer, float pauseSec) { + this.writer = writer; + this.pauseMsec = (int) (1000*pauseSec); + setDaemon(true); + } + + public void run() { + + IndexReader reader = null; + + final Query query = new TermQuery(new Term("body", "1")); + final SortField sf = new SortField("docdate", SortField.LONG); + final Sort sort = new Sort(sf); + + try { + while(!done) { + final long t0 = System.currentTimeMillis(); + if (reader == null) { + reader = writer.getReader(); + } else { + final IndexReader newReader = reader.reopen(); + if (reader != newReader) { + reader.close(); + reader = newReader; + } + } + + final long t1 = System.currentTimeMillis(); + final TopFieldDocs hits = new IndexSearcher(reader).search(query, null, 10, sort); + final long t2 = System.currentTimeMillis(); + System.out.println("nrt: open " + (t1-t0) + " msec; search " + (t2-t1) + " msec, " + hits.totalHits + + " results; " + reader.numDocs() + " docs"); + + final long t4 = System.currentTimeMillis(); + final int delay = (int) (pauseMsec - (t4-t0)); + if (delay > 0) { + try { + Thread.sleep(delay); + } catch (InterruptedException ie) { + throw new RuntimeException(ie); + } + } + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } + + public NearRealtimeReaderTask(PerfRunData runData) { + super(runData); + } + + public int doLogic() throws IOException { + if (t == null) { + IndexWriter w = getRunData().getIndexWriter(); + t = new ReopenThread(w, pauseSec); + t.start(); + } + return 1; + } + + public void setParams(String params) { + super.setParams(params); + pauseSec = Float.parseFloat(params); + } + + public boolean supportsParams() { + return true; + } + + // Close the thread + public void close() throws InterruptedException { + if (t != null) { + t.done = true; + t.join(); + } + } +} diff --git a/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/PerfTask.java b/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/PerfTask.java index a90e21316c4..dc6ab720af1 100644 --- a/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/PerfTask.java +++ b/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/PerfTask.java @@ -71,6 +71,9 @@ public abstract class PerfTask implements Cloneable { return super.clone(); } + public void close() throws Exception { + } + /** * Run the task, record statistics. * @return number of work items done by this task. diff --git a/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/TaskSequence.java b/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/TaskSequence.java index 8e0b6caf10d..0d0104045a9 100644 --- a/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/TaskSequence.java +++ b/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/TaskSequence.java @@ -57,6 +57,13 @@ public class TaskSequence extends PerfTask { tasks = new ArrayList(); } + public void close() throws Exception { + initTasksArray(); + for(int i=0;iOther side effects: none. + *
Relevant properties: doc.add.log.step. + *
Takes optional param: document size. + */ +public class UpdateDocTask extends PerfTask { + + public UpdateDocTask(PerfRunData runData) { + super(runData); + } + + private int logStep = -1; + private int docSize = 0; + int count = 0; + + // volatile data passed between setup(), doLogic(), tearDown(). + private Document doc = null; + + /* + * (non-Javadoc) + * @see PerfTask#setup() + */ + public void setup() throws Exception { + super.setup(); + DocMaker docMaker = getRunData().getDocMaker(); + if (docSize > 0) { + doc = docMaker.makeDocument(docSize); + } else { + doc = docMaker.makeDocument(); + } + } + + /* (non-Javadoc) + * @see PerfTask#tearDown() + */ + public void tearDown() throws Exception { + log(++count); + doc = null; + super.tearDown(); + } + + public int doLogic() throws Exception { + final String docID = doc.get(BasicDocMaker.ID_FIELD); + if (docID == null) { + throw new IllegalStateException("document must define the docid field"); + } + getRunData().getIndexWriter().updateDocument(new Term(BasicDocMaker.ID_FIELD, docID), + doc); + return 1; + } + + private void log (int count) { + if (logStep<0) { + // init once per instance + logStep = getRunData().getConfig().get("doc.add.log.step",AddDocTask.DEFAULT_ADD_DOC_LOG_STEP); + } + if (logStep>0 && (count%logStep)==0) { + double seconds = (System.currentTimeMillis() - getRunData().getStartTimeMillis())/1000.0; + NumberFormat nf = NumberFormat.getInstance(); + nf.setMaximumFractionDigits(2); + System.out.println("--> "+nf.format(seconds) + " sec: " + Thread.currentThread().getName()+" processed (update) "+count+" docs"); + } + } + + /** + * Set the params (docSize only) + * @param params docSize, or 0 for no limit. + */ + public void setParams(String params) { + super.setParams(params); + docSize = (int) Float.parseFloat(params); + } + + /* (non-Javadoc) + * @see org.apache.lucene.benchmark.byTask.tasks.PerfTask#supportsParams() + */ + public boolean supportsParams() { + return true; + } + +} diff --git a/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/utils/Algorithm.java b/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/utils/Algorithm.java index cb2bba6bec0..296c87bcede 100644 --- a/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/utils/Algorithm.java +++ b/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/utils/Algorithm.java @@ -243,7 +243,11 @@ public class Algorithm { * @throws Exception */ public void execute() throws Exception { - sequence.runAndMaybeStats(true); + try { + sequence.runAndMaybeStats(true); + } finally { + sequence.close(); + } } /** diff --git a/src/java/org/apache/lucene/index/DirectoryIndexReader.java b/src/java/org/apache/lucene/index/DirectoryIndexReader.java index 54a6825d32e..03792e376fd 100644 --- a/src/java/org/apache/lucene/index/DirectoryIndexReader.java +++ b/src/java/org/apache/lucene/index/DirectoryIndexReader.java @@ -29,6 +29,7 @@ import java.util.Collections; import org.apache.lucene.store.Directory; import org.apache.lucene.store.Lock; import org.apache.lucene.store.LockObtainFailedException; +import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.FSDirectory; /** @@ -51,6 +52,7 @@ abstract class DirectoryIndexReader extends IndexReader implements Cloneable { * rollback is necessary */ private boolean rollbackHasChanges; private SegmentInfos rollbackSegmentInfos; + IndexWriter writer; protected boolean readOnly; @@ -183,10 +185,12 @@ abstract class DirectoryIndexReader extends IndexReader implements Cloneable { newReader.init(directory, clonedInfos, closeDirectory, openReadOnly); newReader.deletionPolicy = deletionPolicy; } - + newReader.writer = writer; // If we're cloning a non-readOnly reader, move the // writeLock (if there is one) to the new reader: if (!openReadOnly && writeLock != null) { + // In near real-time search, reader is always readonly + assert writer == null; newReader.writeLock = writeLock; writeLock = null; hasChanges = false; @@ -203,6 +207,29 @@ abstract class DirectoryIndexReader extends IndexReader implements Cloneable { assert commit == null || openReadOnly; + // If we were obtained by writer.getReader(), re-ask the + // writer to get a new reader. + if (writer != null) { + assert readOnly; + + if (!openReadOnly) { + throw new IllegalArgumentException("a reader obtained from IndexWriter.getReader() can only be reopened with openReadOnly=true (got false)"); + } + + if (commit != null) { + throw new IllegalArgumentException("a reader obtained from IndexWriter.getReader() cannot currently accept a commit"); + } + + if (!writer.isOpen(true)) { + throw new AlreadyClosedException("cannot reopen: the IndexWriter this reader was obtained from is now closed"); + } + + // TODO: right now we *always* make a new reader; in + // the future we could have write make some effort to + // detect that no changes have occurred + return writer.getReader(); + } + if (commit == null) { if (hasChanges) { // We have changes, which means we are not readOnly: diff --git a/src/java/org/apache/lucene/index/DocumentsWriter.java b/src/java/org/apache/lucene/index/DocumentsWriter.java index 6b7b440f154..f40f84888f9 100644 --- a/src/java/org/apache/lucene/index/DocumentsWriter.java +++ b/src/java/org/apache/lucene/index/DocumentsWriter.java @@ -915,25 +915,17 @@ final class DocumentsWriter { int docStart = 0; boolean any = false; for (int i = 0; i < infosEnd; i++) { - IndexReader reader = SegmentReader.get(infos.info(i), false); - boolean success = false; + + // Make sure we never attempt to apply deletes to + // segment in external dir + assert infos.info(i).dir == directory; + + SegmentReader reader = writer.readerPool.get(infos.info(i), false); try { any |= applyDeletes(reader, docStart); docStart += reader.maxDoc(); - success = true; } finally { - if (reader != null) { - try { - if (success) - reader.commit(); - } finally { - // Force reader to not have changes; if we hit - // an exception during commit, we don't want - // close to retry the commit: - reader.hasChanges = false; - reader.close(); - } - } + writer.readerPool.release(reader); } } diff --git a/src/java/org/apache/lucene/index/IndexFileDeleter.java b/src/java/org/apache/lucene/index/IndexFileDeleter.java index b1f680cc7d7..757cfe53df9 100644 --- a/src/java/org/apache/lucene/index/IndexFileDeleter.java +++ b/src/java/org/apache/lucene/index/IndexFileDeleter.java @@ -485,7 +485,7 @@ final class IndexFileDeleter { private RefCount getRefCount(String fileName) { RefCount rc; if (!refCounts.containsKey(fileName)) { - rc = new RefCount(); + rc = new RefCount(fileName); refCounts.put(fileName, rc); } else { rc = (RefCount) refCounts.get(fileName); @@ -543,14 +543,26 @@ final class IndexFileDeleter { */ final private static class RefCount { + // fileName used only for better assert error messages + final String fileName; + boolean initDone; + RefCount(String fileName) { + this.fileName = fileName; + } + int count; public int IncRef() { + if (!initDone) { + initDone = true; + } else { + assert count > 0: "RefCount is 0 pre-increment for file \"" + fileName + "\""; + } return ++count; } public int DecRef() { - assert count > 0; + assert count > 0: "RefCount is 0 pre-decrement for file \"" + fileName + "\""; return --count; } } diff --git a/src/java/org/apache/lucene/index/IndexReader.java b/src/java/org/apache/lucene/index/IndexReader.java index 3b3a9f9542e..9b3bb7c94fb 100644 --- a/src/java/org/apache/lucene/index/IndexReader.java +++ b/src/java/org/apache/lucene/index/IndexReader.java @@ -354,6 +354,11 @@ public abstract class IndexReader implements Cloneable { * Be sure to synchronize that code so that other threads, * if present, can never use reader after it has been * closed and before it's switched to newReader. + * + *

NOTE: If this reader is a near real-time + * reader (obtained from {@link IndexWriter#getReader()}, + * reopen() will simply call writer.getReader() again for + * you, though this may change in the future. * * @throws CorruptIndexException if the index is corrupt * @throws IOException if there is a low-level IO error diff --git a/src/java/org/apache/lucene/index/IndexWriter.java b/src/java/org/apache/lucene/index/IndexWriter.java index 6504e6c9b11..7c53937b2d8 100644 --- a/src/java/org/apache/lucene/index/IndexWriter.java +++ b/src/java/org/apache/lucene/index/IndexWriter.java @@ -27,7 +27,7 @@ import org.apache.lucene.store.FSDirectory; import org.apache.lucene.store.Lock; import org.apache.lucene.store.LockObtainFailedException; import org.apache.lucene.store.AlreadyClosedException; -import org.apache.lucene.util.BitVector; +import org.apache.lucene.store.BufferedIndexInput; import org.apache.lucene.util.Constants; import java.io.File; @@ -41,6 +41,7 @@ import java.util.Set; import java.util.HashSet; import java.util.LinkedList; import java.util.Iterator; +import java.util.Map; /** An IndexWriter creates and maintains an index. @@ -367,8 +368,273 @@ public class IndexWriter { // TODO: use ReadWriteLock once we are on 5.0 private int readCount; // count of how many threads are holding read lock private Thread writeThread; // non-null if any thread holds write lock + final ReaderPool readerPool = new ReaderPool(); private int upgradeCount; + + // This is a "write once" variable (like the organic dye + // on a DVD-R that may or may not be heated by a laser and + // then cooled to permanently record the event): it's + // false, until getReader() is called for the first time, + // at which point it's switched to true and never changes + // back to false. Once this is true, we hold open and + // reuse SegmentReader instances internally for applying + // deletes, doing merges, and reopening near real-time + // readers. + private volatile boolean poolReaders; + + /** + * Expert: returns a readonly reader containing all + * current updates. Flush is called automatically. This + * provides "near real-time" searching, in that changes + * made during an IndexWriter session can be made + * available for searching without closing the writer. + * + *

It's near real-time because there is no hard + * guarantee on how quickly you can get a new reader after + * making changes with IndexWriter. You'll have to + * experiment in your situation to determine if it's + * faster enough. As this is a new and experimental + * feature, please report back on your findings so we can + * learn, improve and iterate.

+ * + *

The resulting reader suppports {@link + * IndexReader#reopen}, but that call will simply forward + * back to this method (though this may change in the + * future).

+ * + *

The very first time this method is called, this + * writer instance will make every effort to pool the + * readers that it opens for doing merges, applying + * deletes, etc. This means additional resources (RAM, + * file descriptors, CPU time) will be consumed.

+ * + *

For lower latency on reopening a reader, you may + * want to call {@link #setMergedSegmentWarmer} to + * pre-warm a newly merged segment before it's committed + * to the index.

+ * + *

If an addIndexes* call is running in another thread, + * then this reader will only search those segments from + * the foreign index that have been successfully copied + * over, so far

. + * + *

NOTE: Once the writer is closed, any + * outstanding readers may continue to be used. However, + * if you attempt to reopen any of those readers, you'll + * hit an {@link AlreadyClosedException}.

+ * + *

NOTE: This API is experimental and might + * change in incompatible ways in the next release.

+ * + * @return IndexReader that covers entire index plus all + * changes made so far by this IndexWriter instance + * + * @throws IOException + */ + public IndexReader getReader() throws IOException { + if (infoStream != null) { + message("flush at getReader"); + } + // Do this up front before flushing so that the readers + // obtained during this flush are pooled, the first time + // this method is called: + poolReaders = true; + + flush(true, true, true); + + // Prevent segmentInfos from changing while opening the + // reader; in theory we could do similar retry logic, + // just like we do when loading segments_N + synchronized(this) { + return new ReadOnlyMultiSegmentReader(this, segmentInfos); + } + } + + /** Holds shared SegmentReader instances. IndexWriter uses + * SegmentReaders for 1) applying deletes, 2) doing + * merges, 3) handing out a real-time reader. This pool + * reuses instances of the SegmentReaders in all these + * places if it is in "near real-time mode" (getReader() + * has been called on this instance). */ + + class ReaderPool { + + private final Map readerMap = new HashMap(); + + /** Forcefully clear changes for the specifed segments, + * and remove from the pool. This is called on succesful merge. */ + synchronized void clear(SegmentInfos infos) throws IOException { + if (infos == null) { + Iterator iter = readerMap.entrySet().iterator(); + while (iter.hasNext()) { + Map.Entry ent = (Map.Entry) iter.next(); + ((SegmentReader) ent.getValue()).hasChanges = false; + } + } else { + final int numSegments = infos.size(); + for(int i=0;i 0) @@ -415,6 +681,10 @@ public class IndexWriter { notifyAll(); } + synchronized final boolean isOpen(boolean includePendingClose) { + return !(closed || (includePendingClose && closing)); + } + /** * Used internally to throw an {@link * AlreadyClosedException} if this IndexWriter has been @@ -422,7 +692,7 @@ public class IndexWriter { * @throws AlreadyClosedException if this IndexWriter is */ protected synchronized final void ensureOpen(boolean includePendingClose) throws AlreadyClosedException { - if (closed || (includePendingClose && closing)) { + if (!isOpen(includePendingClose)) { throw new AlreadyClosedException("this IndexWriter is closed"); } } @@ -1795,6 +2065,7 @@ public class IndexWriter { message("at close: " + segString()); synchronized(this) { + readerPool.close(); docWriter = null; deleter.close(); } @@ -1851,6 +2122,10 @@ public class IndexWriter { if (useCompoundDocStore && docStoreSegment != null && docWriter.closedFiles().size() != 0) { // Now build compound doc store file + if (infoStream != null) { + message("create compound file " + docStoreSegment + "." + IndexFileNames.COMPOUND_FILE_STORE_EXTENSION); + } + success = false; final int numSegments = segmentInfos.size(); @@ -2756,7 +3031,9 @@ public class IndexWriter { // First restore autoCommit in case we hit an exception below: autoCommit = localAutoCommit; - docWriter.setFlushedDocCount(localFlushedDocCount); + if (docWriter != null) { + docWriter.setFlushedDocCount(localFlushedDocCount); + } // Must finish merges before rolling back segmentInfos // so merges don't hit exceptions on trying to commit @@ -2912,6 +3189,9 @@ public class IndexWriter { deleter.refresh(); } + // Don't bother saving any changes in our segmentInfos + readerPool.clear(null); + lastCommitChangeCount = changeCount; success = true; @@ -3098,7 +3378,9 @@ public class IndexWriter { hitOOM = true; throw oom; } finally { - docWriter.resumeAllThreads(); + if (docWriter != null) { + docWriter.resumeAllThreads(); + } } } @@ -3239,7 +3521,9 @@ public class IndexWriter { hitOOM = true; throw oom; } finally { - docWriter.resumeAllThreads(); + if (docWriter != null) { + docWriter.resumeAllThreads(); + } } } @@ -3387,10 +3671,10 @@ public class IndexWriter { mergedName = newSegmentName(); merger = new SegmentMerger(this, mergedName, null); - IndexReader sReader = null; + SegmentReader sReader = null; synchronized(this) { if (segmentInfos.size() == 1) { // add existing index, if any - sReader = SegmentReader.get(true, segmentInfos.info(0)); + sReader = readerPool.get(segmentInfos.info(0), true); } } @@ -3405,11 +3689,6 @@ public class IndexWriter { int docCount = merger.merge(); // merge 'em - if(sReader != null) { - sReader.close(); - sReader = null; - } - synchronized(this) { segmentInfos.clear(); // pop old infos & add new info = new SegmentInfo(mergedName, docCount, directory, false, true, @@ -3424,7 +3703,7 @@ public class IndexWriter { } finally { if (sReader != null) { - sReader.close(); + readerPool.release(sReader); } } } finally { @@ -3485,7 +3764,9 @@ public class IndexWriter { hitOOM = true; throw oom; } finally { - docWriter.resumeAllThreads(); + if (docWriter != null) { + docWriter.resumeAllThreads(); + } } } @@ -3735,6 +4016,9 @@ public class IndexWriter { // stores when we flush flushDocStores |= autoCommit; String docStoreSegment = docWriter.getDocStoreSegment(); + + assert docStoreSegment != null || numDocs == 0; + if (docStoreSegment == null) flushDocStores = false; @@ -3876,7 +4160,7 @@ public class IndexWriter { int first = segmentInfos.indexOf(merge.segments.info(0)); if (first == -1) - throw new MergePolicy.MergeException("could not find segment " + merge.segments.info(0).name + " in current segments", directory); + throw new MergePolicy.MergeException("could not find segment " + merge.segments.info(0).name + " in current index " + segString(), directory); final int numSegments = segmentInfos.size(); @@ -3886,7 +4170,7 @@ public class IndexWriter { if (first + i >= numSegments || !segmentInfos.info(first+i).equals(info)) { if (segmentInfos.indexOf(info) == -1) - throw new MergePolicy.MergeException("MergePolicy selected a segment (" + info.name + ") that is not in the index", directory); + throw new MergePolicy.MergeException("MergePolicy selected a segment (" + info.name + ") that is not in the current index " + segString(), directory); else throw new MergePolicy.MergeException("MergePolicy selected non-contiguous segments to merge (" + merge.segString(directory) + " vs " + segString() + "), which IndexWriter (currently) cannot handle", directory); @@ -3905,11 +4189,10 @@ public class IndexWriter { * saves the resulting deletes file (incrementing the * delete generation for merge.info). If no deletes were * flushed, no new deletes file is saved. */ - synchronized private void commitMergedDeletes(MergePolicy.OneMerge merge) throws IOException { + synchronized private void commitMergedDeletes(MergePolicy.OneMerge merge, SegmentReader mergeReader) throws IOException { assert testPoint("startCommitMergeDeletes"); - final SegmentInfos sourceSegmentsClone = merge.segmentsClone; final SegmentInfos sourceSegments = merge.segments; if (infoStream != null) @@ -3917,21 +4200,15 @@ public class IndexWriter { // Carefully merge deletes that occurred after we // started merging: - - BitVector deletes = null; int docUpto = 0; int delCount = 0; - final int numSegmentsToMerge = sourceSegments.size(); - for(int i=0;i previousReader.numDeletedDocs()) { // This means this segment has had new deletes // committed since we started the merge, so we // must merge them: - if (deletes == null) - deletes = new BitVector(merge.info.docCount); - - BitVector currentDeletes = new BitVector(currentInfo.dir, currentInfo.getDelFileName()); for(int j=0;j= 0; } /* FIXME if we want to support non-contiguous segment merges */ - synchronized private boolean commitMerge(MergePolicy.OneMerge merge, SegmentMerger merger, int mergedDocCount) throws IOException { + synchronized private boolean commitMerge(MergePolicy.OneMerge merge, SegmentMerger merger, int mergedDocCount, SegmentReader mergedReader) throws IOException { assert testPoint("startCommitMerge"); @@ -4026,8 +4284,7 @@ public class IndexWriter { final int start = ensureContiguousMerge(merge); - commitMergedDeletes(merge); - + commitMergedDeletes(merge, mergedReader); docWriter.remapDeletes(segmentInfos, merger.getDocMaps(), merger.getDelCounts(), merge, mergedDocCount); // Simple optimization: if the doc store we are using @@ -4055,22 +4312,19 @@ public class IndexWriter { assert !segmentInfos.contains(merge.info); segmentInfos.add(start, merge.info); - // Must checkpoint before decrefing so any newly - // referenced files in the new merge.info are incref'd - // first: - checkpoint(); - - decrefMergeSegments(merge); + // If the merged segments had pending changes, clear + // them so that they don't bother writing them to + // disk, updating SegmentInfo, etc.: + readerPool.clear(merge.segments); if (merge.optimize) segmentsToOptimize.add(merge.info); return true; } - - private void decrefMergeSegments(MergePolicy.OneMerge merge) throws IOException { + + private synchronized void decrefMergeSegments(MergePolicy.OneMerge merge) throws IOException { assert merge.increfDone; merge.increfDone = false; - deleter.decRef(merge.segmentsClone); } final private void handleMergeException(Throwable t, MergePolicy.OneMerge merge) throws IOException { @@ -4322,15 +4576,8 @@ public class IndexWriter { if (infoStream != null) message("now flush at merge"); doFlush(true, false); - //flush(false, true, false); } - // We must take a full copy at this point so that we can - // properly merge deletes in commitMerge() - merge.segmentsClone = (SegmentInfos) merge.segments.clone(); - - deleter.incRef(merge.segmentsClone, false); - merge.increfDone = true; merge.mergeDocStores = mergeDocStores; @@ -4430,47 +4677,147 @@ public class IndexWriter { int mergedDocCount = 0; SegmentInfos sourceSegments = merge.segments; - SegmentInfos sourceSegmentsClone = merge.segmentsClone; final int numSegments = sourceSegments.size(); if (infoStream != null) message("merging " + merge.segString(directory)); merger = new SegmentMerger(this, mergedName, merge); + + merge.readers = new SegmentReader[numSegments]; + merge.readersClone = new SegmentReader[numSegments]; + + boolean mergeDocStores = false; + + final Set dss = new HashSet(); // This is try/finally to make sure merger's readers are // closed: + boolean success = false; try { int totDocCount = 0; for (int i = 0; i < numSegments; i++) { - SegmentInfo si = sourceSegmentsClone.info(i); - IndexReader reader = SegmentReader.get(true, si, MERGE_READ_BUFFER_SIZE, merge.mergeDocStores); // no need to set deleter (yet) - merger.add(reader); - totDocCount += reader.numDocs(); + + final SegmentInfo info = sourceSegments.info(i); + + // Hold onto the "live" reader; we will use this to + // commit merged deletes + SegmentReader reader = merge.readers[i] = readerPool.get(info, merge.mergeDocStores, + MERGE_READ_BUFFER_SIZE); + + // We clone the segment readers because other + // deletes may come in while we're merging so we + // need readers that will not change + SegmentReader clone = merge.readersClone[i] = (SegmentReader) reader.clone(true); + merger.add(clone); + + if (clone.hasDeletions()) { + mergeDocStores = true; + } + + if (info.getDocStoreOffset() != -1) { + dss.add(info.getDocStoreSegment()); + } + + totDocCount += clone.numDocs(); } + if (infoStream != null) { message("merge: total "+totDocCount+" docs"); } merge.checkAborted(directory); + // If deletions have arrived and it has now become + // necessary to merge doc stores, go and open them: + if (mergeDocStores && !merge.mergeDocStores) { + merge.mergeDocStores = true; + synchronized(this) { + if (dss.contains(docWriter.getDocStoreSegment())) { + if (infoStream != null) + message("now flush at mergeMiddle"); + doFlush(true, false); + } + } + + for(int i=0;iNOTE: This API is experimental and might + * change in incompatible ways in the next release.

+ * + *

NOTE: warm is called before any deletes have + * been carried over to the merged segment. */ + public static abstract class IndexReaderWarmer { + public abstract void warm(IndexReader reader) throws IOException; + } + + private IndexReaderWarmer mergedSegmentWarmer; + + /** Set the merged segment warmer. See {@link + * IndexReaderWarmer}. */ + public void setMergedSegmentWarmer(IndexReaderWarmer warmer) { + mergedSegmentWarmer = warmer; + } + // Used only by assert for testing. Current points: // startDoFlush // startCommitMerge diff --git a/src/java/org/apache/lucene/index/MergePolicy.java b/src/java/org/apache/lucene/index/MergePolicy.java index df3b9210051..7e75c5f960f 100644 --- a/src/java/org/apache/lucene/index/MergePolicy.java +++ b/src/java/org/apache/lucene/index/MergePolicy.java @@ -74,13 +74,13 @@ public abstract class MergePolicy { SegmentInfo info; // used by IndexWriter boolean mergeDocStores; // used by IndexWriter boolean optimize; // used by IndexWriter - SegmentInfos segmentsClone; // used by IndexWriter boolean increfDone; // used by IndexWriter boolean registerDone; // used by IndexWriter long mergeGen; // used by IndexWriter boolean isExternal; // used by IndexWriter int maxNumSegmentsOptimize; // used by IndexWriter - + SegmentReader[] readers; // used by IndexWriter + SegmentReader[] readersClone; // used by IndexWriter final SegmentInfos segments; final boolean useCompoundFile; boolean aborted; diff --git a/src/java/org/apache/lucene/index/MultiSegmentReader.java b/src/java/org/apache/lucene/index/MultiSegmentReader.java index a95791082eb..0d61359bad8 100644 --- a/src/java/org/apache/lucene/index/MultiSegmentReader.java +++ b/src/java/org/apache/lucene/index/MultiSegmentReader.java @@ -51,24 +51,73 @@ class MultiSegmentReader extends DirectoryIndexReader implements Cloneable { SegmentReader[] readers = new SegmentReader[sis.size()]; for (int i = sis.size()-1; i >= 0; i--) { + boolean success = false; try { readers[i] = SegmentReader.get(readOnly, sis.info(i)); - } catch (IOException e) { - // Close all readers we had opened: - for(i++;i=0;upto--) { + try { + readers[upto].close(); + } catch (Throwable ignore) { + // keep going - we want to clean up as much as possible + } + } + } + } + } + + this.writer = writer; + + if (upto < readers.length) { + // This means some segments were in a foreign Directory + SegmentReader[] newReaders = new SegmentReader[upto]; + System.arraycopy(readers, 0, newReaders, 0, upto); + readers = newReaders; + } + + initialize(readers); + } + /** This contructor is only used for {@link #reopen()} */ MultiSegmentReader(Directory directory, SegmentInfos infos, boolean closeDirectory, SegmentReader[] oldReaders, int[] oldStarts, Map oldNormsCache, boolean readOnly, boolean doClone) throws IOException { diff --git a/src/java/org/apache/lucene/index/ReadOnlyMultiSegmentReader.java b/src/java/org/apache/lucene/index/ReadOnlyMultiSegmentReader.java index b4e06140d19..302891cbabf 100644 --- a/src/java/org/apache/lucene/index/ReadOnlyMultiSegmentReader.java +++ b/src/java/org/apache/lucene/index/ReadOnlyMultiSegmentReader.java @@ -30,7 +30,11 @@ class ReadOnlyMultiSegmentReader extends MultiSegmentReader { ReadOnlyMultiSegmentReader(Directory directory, SegmentInfos infos, boolean closeDirectory, SegmentReader[] oldReaders, int[] oldStarts, Map oldNormsCache, boolean doClone) throws IOException { super(directory, infos, closeDirectory, oldReaders, oldStarts, oldNormsCache, true, doClone); } - + + ReadOnlyMultiSegmentReader(IndexWriter writer, SegmentInfos infos) throws IOException { + super(writer, infos); + } + protected void acquireWriteLock() { ReadOnlySegmentReader.noWrite(); } diff --git a/src/java/org/apache/lucene/index/SegmentInfo.java b/src/java/org/apache/lucene/index/SegmentInfo.java index 92fe3f604c3..d79ae243e38 100644 --- a/src/java/org/apache/lucene/index/SegmentInfo.java +++ b/src/java/org/apache/lucene/index/SegmentInfo.java @@ -79,6 +79,10 @@ final class SegmentInfo { private boolean hasProx; // True if this segment has any fields with omitTermFreqAndPositions==false + public String toString() { + return "si: "+dir.toString()+" "+name+" docCount: "+docCount+" delCount: "+delCount+" delFileName: "+getDelFileName(); + } + public SegmentInfo(String name, int docCount, Directory dir) { this.name = name; this.docCount = docCount; @@ -490,6 +494,12 @@ final class SegmentInfo { docStoreOffset = offset; clearFiles(); } + + void setDocStore(int offset, String segment, boolean isCompoundFile) { + docStoreOffset = offset; + docStoreSegment = segment; + docStoreIsCompoundFile = isCompoundFile; + } /** * Save this segment's info. diff --git a/src/java/org/apache/lucene/index/SegmentMergeInfo.java b/src/java/org/apache/lucene/index/SegmentMergeInfo.java index b9c43958a5c..cfedaace6a2 100644 --- a/src/java/org/apache/lucene/index/SegmentMergeInfo.java +++ b/src/java/org/apache/lucene/index/SegmentMergeInfo.java @@ -24,6 +24,7 @@ final class SegmentMergeInfo { int base; TermEnum termEnum; IndexReader reader; + int delCount; private TermPositions postings; // use getPositions() private int[] docMap; // use getDocMap() @@ -38,19 +39,21 @@ final class SegmentMergeInfo { // maps around deleted docs int[] getDocMap() { if (docMap == null) { - // build array which maps document numbers around deletions - if (reader.hasDeletions()) { - int maxDoc = reader.maxDoc(); - docMap = new int[maxDoc]; - int j = 0; - for (int i = 0; i < maxDoc; i++) { - if (reader.isDeleted(i)) - docMap[i] = -1; - else - docMap[i] = j++; + delCount = 0; + // build array which maps document numbers around deletions + if (reader.hasDeletions()) { + int maxDoc = reader.maxDoc(); + docMap = new int[maxDoc]; + int j = 0; + for (int i = 0; i < maxDoc; i++) { + if (reader.isDeleted(i)) { + delCount++; + docMap[i] = -1; + } else + docMap[i] = j++; + } } } - } return docMap; } diff --git a/src/java/org/apache/lucene/index/SegmentMerger.java b/src/java/org/apache/lucene/index/SegmentMerger.java index 949e965db44..3d30bfe0c65 100644 --- a/src/java/org/apache/lucene/index/SegmentMerger.java +++ b/src/java/org/apache/lucene/index/SegmentMerger.java @@ -21,7 +21,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; -import java.util.HashSet; import java.util.List; import org.apache.lucene.document.Document; @@ -514,8 +513,11 @@ final class SegmentMerger { docMaps[i] = docMap; delCounts[i] = smi.reader.maxDoc() - smi.reader.numDocs(); } - + base += reader.numDocs(); + + assert reader.numDocs() == reader.maxDoc() - smi.delCount; + if (smi.next()) queue.put(smi); // initialize queue else diff --git a/src/java/org/apache/lucene/index/SegmentReader.java b/src/java/org/apache/lucene/index/SegmentReader.java index 7e04a0c2b70..c33baa9e52c 100644 --- a/src/java/org/apache/lucene/index/SegmentReader.java +++ b/src/java/org/apache/lucene/index/SegmentReader.java @@ -469,6 +469,55 @@ class SegmentReader extends DirectoryIndexReader { return instance; } + synchronized void openDocStores() throws IOException { + if (fieldsReaderOrig == null) { + final Directory storeDir; + if (si.getDocStoreOffset() != -1) { + if (si.getDocStoreIsCompoundFile()) { + storeCFSReader = new CompoundFileReader(directory(), + si.getDocStoreSegment() + "." + IndexFileNames.COMPOUND_FILE_STORE_EXTENSION, + readBufferSize); + storeDir = storeCFSReader; + assert storeDir != null; + } else { + storeDir = directory(); + assert storeDir != null; + } + } else if (si.getUseCompoundFile()) { + // In some cases, we were originally opened when CFS + // was not used, but then we are asked to open doc + // stores after the segment has switched to CFS + if (cfsReader == null) { + cfsReader = new CompoundFileReader(directory(), segment + "." + IndexFileNames.COMPOUND_FILE_EXTENSION, readBufferSize); + } + storeDir = cfsReader; + assert storeDir != null; + } else { + storeDir = directory(); + assert storeDir != null; + } + + final String storesSegment; + if (si.getDocStoreOffset() != -1) { + storesSegment = si.getDocStoreSegment(); + } else { + storesSegment = segment; + } + + fieldsReaderOrig = new FieldsReader(storeDir, storesSegment, fieldInfos, readBufferSize, + si.getDocStoreOffset(), si.docCount); + + // Verify two sources of "maxDoc" agree: + if (si.getDocStoreOffset() == -1 && fieldsReaderOrig.size() != si.docCount) { + throw new CorruptIndexException("doc counts differ for segment " + si.name + ": fieldsReader shows " + fieldsReaderOrig.size() + " but segmentInfo shows " + si.docCount); + } + + if (fieldInfos.hasVectors()) { // open term vector files only as needed + termVectorsReaderOrig = new TermVectorsReader(storeDir, storesSegment, fieldInfos, readBufferSize, si.getDocStoreOffset(), si.docCount); + } + } + } + private void initialize(SegmentInfo si, int readBufferSize, boolean doOpenStores) throws CorruptIndexException, IOException { segment = si.name; this.si = si; @@ -484,23 +533,11 @@ class SegmentReader extends DirectoryIndexReader { cfsDir = cfsReader; } - final Directory storeDir; + fieldInfos = new FieldInfos(cfsDir, segment + ".fnm"); if (doOpenStores) { - if (si.getDocStoreOffset() != -1) { - if (si.getDocStoreIsCompoundFile()) { - storeCFSReader = new CompoundFileReader(directory(), si.getDocStoreSegment() + "." + IndexFileNames.COMPOUND_FILE_STORE_EXTENSION, readBufferSize); - storeDir = storeCFSReader; - } else { - storeDir = directory(); - } - } else { - storeDir = cfsDir; - } - } else - storeDir = null; - - fieldInfos = new FieldInfos(cfsDir, segment + ".fnm"); + openDocStores(); + } boolean anyProx = false; final int numFields = fieldInfos.size(); @@ -508,23 +545,6 @@ class SegmentReader extends DirectoryIndexReader { if (!fieldInfos.fieldInfo(i).omitTermFreqAndPositions) anyProx = true; - final String fieldsSegment; - - if (si.getDocStoreOffset() != -1) - fieldsSegment = si.getDocStoreSegment(); - else - fieldsSegment = segment; - - if (doOpenStores) { - fieldsReaderOrig = new FieldsReader(storeDir, fieldsSegment, fieldInfos, readBufferSize, - si.getDocStoreOffset(), si.docCount); - - // Verify two sources of "maxDoc" agree: - if (si.getDocStoreOffset() == -1 && fieldsReaderOrig.size() != si.docCount) { - throw new CorruptIndexException("doc counts differ for segment " + si.name + ": fieldsReader shows " + fieldsReaderOrig.size() + " but segmentInfo shows " + si.docCount); - } - } - tis = new TermInfosReader(cfsDir, segment, fieldInfos, readBufferSize); loadDeletedDocs(); @@ -536,14 +556,6 @@ class SegmentReader extends DirectoryIndexReader { proxStream = cfsDir.openInput(segment + ".prx", readBufferSize); openNorms(cfsDir, readBufferSize); - if (doOpenStores && fieldInfos.hasVectors()) { // open term vector files only as needed - final String vectorsSegment; - if (si.getDocStoreOffset() != -1) - vectorsSegment = si.getDocStoreSegment(); - else - vectorsSegment = segment; - termVectorsReaderOrig = new TermVectorsReader(storeDir, vectorsSegment, fieldInfos, readBufferSize, si.getDocStoreOffset(), si.docCount); - } success = true; } finally { @@ -1210,7 +1222,7 @@ class SegmentReader extends DirectoryIndexReader { /** * Return the name of the segment this reader is reading. */ - String getSegmentName() { + public String getSegmentName() { return segment; } diff --git a/src/test/org/apache/lucene/TestSnapshotDeletionPolicy.java b/src/test/org/apache/lucene/TestSnapshotDeletionPolicy.java index 85180255018..104f26f86be 100644 --- a/src/test/org/apache/lucene/TestSnapshotDeletionPolicy.java +++ b/src/test/org/apache/lucene/TestSnapshotDeletionPolicy.java @@ -61,6 +61,7 @@ public class TestSnapshotDeletionPolicy extends LuceneTestCase MockRAMDirectory dir2 = new MockRAMDirectory(); runTest(dir2); + dir2.close(); } public void testReuseAcrossWriters() throws Exception { diff --git a/src/test/org/apache/lucene/index/TestIndexFileDeleter.java b/src/test/org/apache/lucene/index/TestIndexFileDeleter.java index 9cc38cd7c0f..21e7b9f23d8 100644 --- a/src/test/org/apache/lucene/index/TestIndexFileDeleter.java +++ b/src/test/org/apache/lucene/index/TestIndexFileDeleter.java @@ -154,12 +154,41 @@ public class TestIndexFileDeleter extends LuceneTestCase Arrays.sort(files); Arrays.sort(files2); - + + Set dif = difFiles(files, files2); + if (!Arrays.equals(files, files2)) { - fail("IndexFileDeleter failed to delete unreferenced extra files: should have deleted " + (filesPre.length-files.length) + " files but only deleted " + (filesPre.length - files2.length) + "; expected files:\n " + asString(files) + "\n actual files:\n " + asString(files2)); + fail("IndexFileDeleter failed to delete unreferenced extra files: should have deleted " + (filesPre.length-files.length) + " files but only deleted " + (filesPre.length - files2.length) + "; expected files:\n " + asString(files) + "\n actual files:\n " + asString(files2)+"\ndif: "+dif); } } + private static Set difFiles(String[] files1, String[] files2) { + Set set1 = new HashSet(); + Set set2 = new HashSet(); + Set extra = new HashSet(); + for (int x=0; x < files1.length; x++) { + set1.add(files1[x]); + } + for (int x=0; x < files2.length; x++) { + set2.add(files2[x]); + } + Iterator i1 = set1.iterator(); + while (i1.hasNext()) { + Object o = i1.next(); + if (!set2.contains(o)) { + extra.add(o); + } + } + Iterator i2 = set2.iterator(); + while (i2.hasNext()) { + Object o = i2.next(); + if (!set1.contains(o)) { + extra.add(o); + } + } + return extra; + } + private String asString(String[] l) { String s = ""; for(int i=0;i 0); + final int count = warmer.warmCount; + + writer.addDocument(createDocument(17, "test", 4)); + writer.optimize(); + assertTrue(warmer.warmCount > count); + + writer.close(); + r1.close(); + dir1.close(); + } + + public void testAfterCommit() throws Exception { + Directory dir1 = new MockRAMDirectory(); + IndexWriter writer = new IndexWriter(dir1, new WhitespaceAnalyzer(), + IndexWriter.MaxFieldLength.LIMITED); + writer.setInfoStream(infoStream); + + // create the index + createIndexNoClose(false, "test", writer); + + // get a reader to put writer into near real-time mode + IndexReader r1 = writer.getReader(); + _TestUtil.checkIndex(dir1); + writer.commit(); + _TestUtil.checkIndex(dir1); + assertEquals(100, r1.numDocs()); + + for (int i = 0; i < 10; i++) { + writer.addDocument(createDocument(i, "test", 4)); + } + ((ConcurrentMergeScheduler) writer.getMergeScheduler()).sync(); + + IndexReader r2 = r1.reopen(); + if (r2 != r1) { + r1.close(); + r1 = r2; + } + assertEquals(110, r1.numDocs()); + writer.close(); + r1.close(); + dir1.close(); + } + + // Make sure reader remains usable even if IndexWriter closes + public void testAfterClose() throws Exception { + Directory dir1 = new MockRAMDirectory(); + IndexWriter writer = new IndexWriter(dir1, new WhitespaceAnalyzer(), + IndexWriter.MaxFieldLength.LIMITED); + writer.setInfoStream(infoStream); + + // create the index + createIndexNoClose(false, "test", writer); + + IndexReader r = writer.getReader(); + writer.close(); + + _TestUtil.checkIndex(dir1); + + // reader should remain usable even after IndexWriter is closed: + assertEquals(100, r.numDocs()); + Query q = new TermQuery(new Term("indexname", "test")); + assertEquals(100, new IndexSearcher(r).search(q, 10).totalHits); + + try { + r.reopen(); + fail("failed to hit AlreadyClosedException"); + } catch (AlreadyClosedException ace) { + // expected + } + r.close(); + dir1.close(); + } + + // Stress test reopen during addIndexes + public void testDuringAddIndexes() throws Exception { + Directory dir1 = new MockRAMDirectory(); + final IndexWriter writer = new IndexWriter(dir1, new WhitespaceAnalyzer(), + IndexWriter.MaxFieldLength.LIMITED); + writer.setInfoStream(infoStream); + writer.setMergeFactor(2); + + // create the index + createIndexNoClose(false, "test", writer); + writer.commit(); + + final Directory[] dirs = new Directory[10]; + for (int i=0;i<10;i++) { + dirs[i] = new MockRAMDirectory(dir1); + } + + IndexReader r = writer.getReader(); + + final int NUM_THREAD = 5; + final float SECONDS = 3; + + final long endTime = (long) (System.currentTimeMillis() + 1000.*SECONDS); + final List excs = Collections.synchronizedList(new ArrayList()); + + final Thread[] threads = new Thread[NUM_THREAD]; + for(int i=0;i= lastCount); + lastCount = count; + } + + for(int i=0;i 0); + + assertEquals(0, excs.size()); + writer.close(); + + _TestUtil.checkIndex(dir1); + r.close(); + dir1.close(); + } +} diff --git a/src/test/org/apache/lucene/index/TestStressIndexing2.java b/src/test/org/apache/lucene/index/TestStressIndexing2.java index 0ae023b8c8f..e7a94eade8a 100644 --- a/src/test/org/apache/lucene/index/TestStressIndexing2.java +++ b/src/test/org/apache/lucene/index/TestStressIndexing2.java @@ -51,7 +51,21 @@ public class TestStressIndexing2 extends LuceneTestCase { return true; } } - + + public void testRandomIWReader() throws Throwable { + r = newRandom(); + Directory dir = new MockRAMDirectory(); + + // TODO: verify equals using IW.getReader + DocsAndWriter dw = indexRandomIWReader(10, 100, 100, dir); + IndexReader r = dw.writer.getReader(); + dw.writer.commit(); + verifyEquals(r, dir, "id"); + r.close(); + dw.writer.close(); + dir.close(); + } + public void testRandom() throws Throwable { r = newRandom(); Directory dir1 = new MockRAMDirectory(); @@ -101,20 +115,69 @@ public class TestStressIndexing2 extends LuceneTestCase { // This test avoids using any extra synchronization in the multiple // indexing threads to test that IndexWriter does correctly synchronize // everything. + + public static class DocsAndWriter { + Map docs; + IndexWriter writer; + } + + public DocsAndWriter indexRandomIWReader(int nThreads, int iterations, int range, Directory dir) throws IOException, InterruptedException { + Map docs = new HashMap(); + IndexWriter w = new MockIndexWriter(dir, autoCommit, new WhitespaceAnalyzer(), true); + w.setUseCompoundFile(false); + /*** + w.setMaxMergeDocs(Integer.MAX_VALUE); + w.setMaxFieldLength(10000); + w.setRAMBufferSizeMB(1); + w.setMergeFactor(10); + ***/ + + // force many merges + w.setMergeFactor(mergeFactor); + w.setRAMBufferSizeMB(.1); + w.setMaxBufferedDocs(maxBufferedDocs); + + threads = new IndexingThread[nThreads]; + for (int i=0; i dir.maxUsedSize) { dir.maxUsedSize = realUsage; } - throw new IOException("fake disk full at " + dir.getRecomputedActualSizeInBytes() + " bytes"); + throw new IOException("fake disk full at " + dir.getRecomputedActualSizeInBytes() + " bytes when writing " + name); } else { super.writeBytes(b, offset, len); }