LUCENE-1516: add near real-time search to IndexWriter

git-svn-id: https://svn.apache.org/repos/asf/lucene/java/trunk@763737 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael McCandless 2009-04-09 17:17:46 +00:00
parent bb8b2e1462
commit 3c91517add
27 changed files with 1885 additions and 200 deletions

View File

@ -216,6 +216,13 @@ New features
19. LUCENE-1586: Add IndexReader.getUniqueTermCount(). (Mike 19. LUCENE-1586: Add IndexReader.getUniqueTermCount(). (Mike
McCandless via Derek) McCandless via Derek)
20. LUCENE-1516: Added "near real-time search" to IndexWriter, via a
new expert getReader() method. This method returns a reader that
searches the full index, including any uncommitted changes in the
current IndexWriter session. This should result in a faster
turnaround than the normal approach of commiting the changes and
then reopening a reader. (Jason Rutherglen via Mike McCandless)
Optimizations Optimizations
1. LUCENE-1427: Fixed QueryWrapperFilter to not waste time computing 1. LUCENE-1427: Fixed QueryWrapperFilter to not waste time computing

View File

@ -27,6 +27,8 @@ import java.io.BufferedReader;
import java.io.IOException; import java.io.IOException;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.InputStreamReader; import java.io.InputStreamReader;
import java.util.Random;
/** /**
* A DocMaker reading one line at a time as a Document from * A DocMaker reading one line at a time as a Document from
@ -38,6 +40,11 @@ import java.io.InputStreamReader;
* Config properties: * Config properties:
* docs.file=<path to the file%gt; * docs.file=<path to the file%gt;
* doc.reuse.fields=true|false (default true) * doc.reuse.fields=true|false (default true)
* doc.random.id.limit=N (default -1) -- create random
* docid in the range 0..N; this is useful
* with UpdateDoc to test updating random documents; if
* this is unspecified or -1, then docid is sequentially
* assigned
*/ */
public class LineDocMaker extends BasicDocMaker { public class LineDocMaker extends BasicDocMaker {
@ -50,6 +57,8 @@ public class LineDocMaker extends BasicDocMaker {
private final DocState localDocState = new DocState(); private final DocState localDocState = new DocState();
private boolean doReuseFields = true; private boolean doReuseFields = true;
private Random r;
private int numDocs;
class DocState { class DocState {
Document doc; Document doc;
@ -86,6 +95,11 @@ public class LineDocMaker extends BasicDocMaker {
final static String SEP = WriteLineDocTask.SEP; final static String SEP = WriteLineDocTask.SEP;
private int numDocsCreated;
private synchronized int incrNumDocsCreated() {
return numDocsCreated++;
}
public Document setFields(String line) { public Document setFields(String line) {
// title <TAB> date <TAB> body <NEWLINE> // title <TAB> date <TAB> body <NEWLINE>
final String title, date, body; final String title, date, body;
@ -102,12 +116,22 @@ public class LineDocMaker extends BasicDocMaker {
} else } else
title = date = body = ""; title = date = body = "";
final String docID;
if (r != null) {
docID = "doc" + r.nextInt(numDocs);
} else {
docID = "doc" + incrNumDocsCreated();
}
if (doReuseFields) { if (doReuseFields) {
idField.setValue(docID);
titleField.setValue(title); titleField.setValue(title);
dateField.setValue(date); dateField.setValue(date);
bodyField.setValue(body); bodyField.setValue(body);
return doc; return doc;
} else { } else {
Field localIDField = new Field(BasicDocMaker.ID_FIELD, docID, Field.Store.YES, Field.Index.NOT_ANALYZED_NO_NORMS);
Field localTitleField = new Field(BasicDocMaker.TITLE_FIELD, Field localTitleField = new Field(BasicDocMaker.TITLE_FIELD,
title, title,
storeVal, storeVal,
@ -124,6 +148,7 @@ public class LineDocMaker extends BasicDocMaker {
Field.Index.ANALYZED, Field.Index.ANALYZED,
termVecVal); termVecVal);
Document localDoc = new Document(); Document localDoc = new Document();
localDoc.add(localIDField);
localDoc.add(localBodyField); localDoc.add(localBodyField);
localDoc.add(localTitleField); localDoc.add(localTitleField);
localDoc.add(localDateField); localDoc.add(localDateField);
@ -183,6 +208,10 @@ public class LineDocMaker extends BasicDocMaker {
public void setConfig(Config config) { public void setConfig(Config config) {
super.setConfig(config); super.setConfig(config);
doReuseFields = config.get("doc.reuse.fields", true); doReuseFields = config.get("doc.reuse.fields", true);
numDocs = config.get("doc.random.id.limit", -1);
if (numDocs != -1) {
r = new Random(179);
}
} }
synchronized void openFile() { synchronized void openFile() {

View File

@ -0,0 +1,134 @@
package org.apache.lucene.benchmark.byTask.tasks;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.IOException;
import org.apache.lucene.benchmark.byTask.PerfRunData;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.TopFieldDocs;
import org.apache.lucene.index.Term;
/**
* Spawns a BG thread that periodically (defaults to 3.0
* seconds, but accepts param in seconds) wakes up and asks
* IndexWriter for a near real-time reader. Then runs a
* single query (body: 1) sorted by docdate, and prints
* time to reopen and time to run the search.
*
* <b>NOTE</b>: this is very experimental at this point, and
* subject to change. It's also not generally usable, eg
* you cannot change which query is executed.
*/
public class NearRealtimeReaderTask extends PerfTask {
ReopenThread t;
float pauseSec = 3.0f;
private static class ReopenThread extends Thread {
final IndexWriter writer;
final int pauseMsec;
public volatile boolean done;
ReopenThread(IndexWriter writer, float pauseSec) {
this.writer = writer;
this.pauseMsec = (int) (1000*pauseSec);
setDaemon(true);
}
public void run() {
IndexReader reader = null;
final Query query = new TermQuery(new Term("body", "1"));
final SortField sf = new SortField("docdate", SortField.LONG);
final Sort sort = new Sort(sf);
try {
while(!done) {
final long t0 = System.currentTimeMillis();
if (reader == null) {
reader = writer.getReader();
} else {
final IndexReader newReader = reader.reopen();
if (reader != newReader) {
reader.close();
reader = newReader;
}
}
final long t1 = System.currentTimeMillis();
final TopFieldDocs hits = new IndexSearcher(reader).search(query, null, 10, sort);
final long t2 = System.currentTimeMillis();
System.out.println("nrt: open " + (t1-t0) + " msec; search " + (t2-t1) + " msec, " + hits.totalHits +
" results; " + reader.numDocs() + " docs");
final long t4 = System.currentTimeMillis();
final int delay = (int) (pauseMsec - (t4-t0));
if (delay > 0) {
try {
Thread.sleep(delay);
} catch (InterruptedException ie) {
throw new RuntimeException(ie);
}
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
public NearRealtimeReaderTask(PerfRunData runData) {
super(runData);
}
public int doLogic() throws IOException {
if (t == null) {
IndexWriter w = getRunData().getIndexWriter();
t = new ReopenThread(w, pauseSec);
t.start();
}
return 1;
}
public void setParams(String params) {
super.setParams(params);
pauseSec = Float.parseFloat(params);
}
public boolean supportsParams() {
return true;
}
// Close the thread
public void close() throws InterruptedException {
if (t != null) {
t.done = true;
t.join();
}
}
}

View File

@ -71,6 +71,9 @@ public abstract class PerfTask implements Cloneable {
return super.clone(); return super.clone();
} }
public void close() throws Exception {
}
/** /**
* Run the task, record statistics. * Run the task, record statistics.
* @return number of work items done by this task. * @return number of work items done by this task.

View File

@ -57,6 +57,13 @@ public class TaskSequence extends PerfTask {
tasks = new ArrayList(); tasks = new ArrayList();
} }
public void close() throws Exception {
initTasksArray();
for(int i=0;i<tasksArray.length;i++) {
tasksArray[i].close();
}
}
private void initTasksArray() { private void initTasksArray() {
if (tasksArray == null) { if (tasksArray == null) {
final int numTasks = tasks.size(); final int numTasks = tasks.size();

View File

@ -0,0 +1,110 @@
package org.apache.lucene.benchmark.byTask.tasks;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.lucene.benchmark.byTask.PerfRunData;
import org.apache.lucene.benchmark.byTask.feeds.DocMaker;
import org.apache.lucene.benchmark.byTask.feeds.BasicDocMaker;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.Term;
import java.text.NumberFormat;
/**
* Update a document, using IndexWriter.updateDocument,
* optionally with of a certain size.
* <br>Other side effects: none.
* <br>Relevant properties: <code>doc.add.log.step</code>.
* <br>Takes optional param: document size.
*/
public class UpdateDocTask extends PerfTask {
public UpdateDocTask(PerfRunData runData) {
super(runData);
}
private int logStep = -1;
private int docSize = 0;
int count = 0;
// volatile data passed between setup(), doLogic(), tearDown().
private Document doc = null;
/*
* (non-Javadoc)
* @see PerfTask#setup()
*/
public void setup() throws Exception {
super.setup();
DocMaker docMaker = getRunData().getDocMaker();
if (docSize > 0) {
doc = docMaker.makeDocument(docSize);
} else {
doc = docMaker.makeDocument();
}
}
/* (non-Javadoc)
* @see PerfTask#tearDown()
*/
public void tearDown() throws Exception {
log(++count);
doc = null;
super.tearDown();
}
public int doLogic() throws Exception {
final String docID = doc.get(BasicDocMaker.ID_FIELD);
if (docID == null) {
throw new IllegalStateException("document must define the docid field");
}
getRunData().getIndexWriter().updateDocument(new Term(BasicDocMaker.ID_FIELD, docID),
doc);
return 1;
}
private void log (int count) {
if (logStep<0) {
// init once per instance
logStep = getRunData().getConfig().get("doc.add.log.step",AddDocTask.DEFAULT_ADD_DOC_LOG_STEP);
}
if (logStep>0 && (count%logStep)==0) {
double seconds = (System.currentTimeMillis() - getRunData().getStartTimeMillis())/1000.0;
NumberFormat nf = NumberFormat.getInstance();
nf.setMaximumFractionDigits(2);
System.out.println("--> "+nf.format(seconds) + " sec: " + Thread.currentThread().getName()+" processed (update) "+count+" docs");
}
}
/**
* Set the params (docSize only)
* @param params docSize, or 0 for no limit.
*/
public void setParams(String params) {
super.setParams(params);
docSize = (int) Float.parseFloat(params);
}
/* (non-Javadoc)
* @see org.apache.lucene.benchmark.byTask.tasks.PerfTask#supportsParams()
*/
public boolean supportsParams() {
return true;
}
}

View File

@ -243,7 +243,11 @@ public class Algorithm {
* @throws Exception * @throws Exception
*/ */
public void execute() throws Exception { public void execute() throws Exception {
try {
sequence.runAndMaybeStats(true); sequence.runAndMaybeStats(true);
} finally {
sequence.close();
}
} }
/** /**

View File

@ -29,6 +29,7 @@ import java.util.Collections;
import org.apache.lucene.store.Directory; import org.apache.lucene.store.Directory;
import org.apache.lucene.store.Lock; import org.apache.lucene.store.Lock;
import org.apache.lucene.store.LockObtainFailedException; import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.FSDirectory; import org.apache.lucene.store.FSDirectory;
/** /**
@ -51,6 +52,7 @@ abstract class DirectoryIndexReader extends IndexReader implements Cloneable {
* rollback is necessary */ * rollback is necessary */
private boolean rollbackHasChanges; private boolean rollbackHasChanges;
private SegmentInfos rollbackSegmentInfos; private SegmentInfos rollbackSegmentInfos;
IndexWriter writer;
protected boolean readOnly; protected boolean readOnly;
@ -183,10 +185,12 @@ abstract class DirectoryIndexReader extends IndexReader implements Cloneable {
newReader.init(directory, clonedInfos, closeDirectory, openReadOnly); newReader.init(directory, clonedInfos, closeDirectory, openReadOnly);
newReader.deletionPolicy = deletionPolicy; newReader.deletionPolicy = deletionPolicy;
} }
newReader.writer = writer;
// If we're cloning a non-readOnly reader, move the // If we're cloning a non-readOnly reader, move the
// writeLock (if there is one) to the new reader: // writeLock (if there is one) to the new reader:
if (!openReadOnly && writeLock != null) { if (!openReadOnly && writeLock != null) {
// In near real-time search, reader is always readonly
assert writer == null;
newReader.writeLock = writeLock; newReader.writeLock = writeLock;
writeLock = null; writeLock = null;
hasChanges = false; hasChanges = false;
@ -203,6 +207,29 @@ abstract class DirectoryIndexReader extends IndexReader implements Cloneable {
assert commit == null || openReadOnly; assert commit == null || openReadOnly;
// If we were obtained by writer.getReader(), re-ask the
// writer to get a new reader.
if (writer != null) {
assert readOnly;
if (!openReadOnly) {
throw new IllegalArgumentException("a reader obtained from IndexWriter.getReader() can only be reopened with openReadOnly=true (got false)");
}
if (commit != null) {
throw new IllegalArgumentException("a reader obtained from IndexWriter.getReader() cannot currently accept a commit");
}
if (!writer.isOpen(true)) {
throw new AlreadyClosedException("cannot reopen: the IndexWriter this reader was obtained from is now closed");
}
// TODO: right now we *always* make a new reader; in
// the future we could have write make some effort to
// detect that no changes have occurred
return writer.getReader();
}
if (commit == null) { if (commit == null) {
if (hasChanges) { if (hasChanges) {
// We have changes, which means we are not readOnly: // We have changes, which means we are not readOnly:

View File

@ -915,25 +915,17 @@ final class DocumentsWriter {
int docStart = 0; int docStart = 0;
boolean any = false; boolean any = false;
for (int i = 0; i < infosEnd; i++) { for (int i = 0; i < infosEnd; i++) {
IndexReader reader = SegmentReader.get(infos.info(i), false);
boolean success = false; // Make sure we never attempt to apply deletes to
// segment in external dir
assert infos.info(i).dir == directory;
SegmentReader reader = writer.readerPool.get(infos.info(i), false);
try { try {
any |= applyDeletes(reader, docStart); any |= applyDeletes(reader, docStart);
docStart += reader.maxDoc(); docStart += reader.maxDoc();
success = true;
} finally { } finally {
if (reader != null) { writer.readerPool.release(reader);
try {
if (success)
reader.commit();
} finally {
// Force reader to not have changes; if we hit
// an exception during commit, we don't want
// close to retry the commit:
reader.hasChanges = false;
reader.close();
}
}
} }
} }

View File

@ -485,7 +485,7 @@ final class IndexFileDeleter {
private RefCount getRefCount(String fileName) { private RefCount getRefCount(String fileName) {
RefCount rc; RefCount rc;
if (!refCounts.containsKey(fileName)) { if (!refCounts.containsKey(fileName)) {
rc = new RefCount(); rc = new RefCount(fileName);
refCounts.put(fileName, rc); refCounts.put(fileName, rc);
} else { } else {
rc = (RefCount) refCounts.get(fileName); rc = (RefCount) refCounts.get(fileName);
@ -543,14 +543,26 @@ final class IndexFileDeleter {
*/ */
final private static class RefCount { final private static class RefCount {
// fileName used only for better assert error messages
final String fileName;
boolean initDone;
RefCount(String fileName) {
this.fileName = fileName;
}
int count; int count;
public int IncRef() { public int IncRef() {
if (!initDone) {
initDone = true;
} else {
assert count > 0: "RefCount is 0 pre-increment for file \"" + fileName + "\"";
}
return ++count; return ++count;
} }
public int DecRef() { public int DecRef() {
assert count > 0; assert count > 0: "RefCount is 0 pre-decrement for file \"" + fileName + "\"";
return --count; return --count;
} }
} }

View File

@ -355,6 +355,11 @@ public abstract class IndexReader implements Cloneable {
* if present, can never use reader after it has been * if present, can never use reader after it has been
* closed and before it's switched to newReader. * closed and before it's switched to newReader.
* *
* <p><b>NOTE</b>: If this reader is a near real-time
* reader (obtained from {@link IndexWriter#getReader()},
* reopen() will simply call writer.getReader() again for
* you, though this may change in the future.
*
* @throws CorruptIndexException if the index is corrupt * @throws CorruptIndexException if the index is corrupt
* @throws IOException if there is a low-level IO error * @throws IOException if there is a low-level IO error
*/ */

View File

@ -27,7 +27,7 @@ import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.Lock; import org.apache.lucene.store.Lock;
import org.apache.lucene.store.LockObtainFailedException; import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.util.BitVector; import org.apache.lucene.store.BufferedIndexInput;
import org.apache.lucene.util.Constants; import org.apache.lucene.util.Constants;
import java.io.File; import java.io.File;
@ -41,6 +41,7 @@ import java.util.Set;
import java.util.HashSet; import java.util.HashSet;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map;
/** /**
An <code>IndexWriter</code> creates and maintains an index. An <code>IndexWriter</code> creates and maintains an index.
@ -367,8 +368,273 @@ public class IndexWriter {
// TODO: use ReadWriteLock once we are on 5.0 // TODO: use ReadWriteLock once we are on 5.0
private int readCount; // count of how many threads are holding read lock private int readCount; // count of how many threads are holding read lock
private Thread writeThread; // non-null if any thread holds write lock private Thread writeThread; // non-null if any thread holds write lock
final ReaderPool readerPool = new ReaderPool();
private int upgradeCount; private int upgradeCount;
// This is a "write once" variable (like the organic dye
// on a DVD-R that may or may not be heated by a laser and
// then cooled to permanently record the event): it's
// false, until getReader() is called for the first time,
// at which point it's switched to true and never changes
// back to false. Once this is true, we hold open and
// reuse SegmentReader instances internally for applying
// deletes, doing merges, and reopening near real-time
// readers.
private volatile boolean poolReaders;
/**
* Expert: returns a readonly reader containing all
* current updates. Flush is called automatically. This
* provides "near real-time" searching, in that changes
* made during an IndexWriter session can be made
* available for searching without closing the writer.
*
* <p>It's near real-time because there is no hard
* guarantee on how quickly you can get a new reader after
* making changes with IndexWriter. You'll have to
* experiment in your situation to determine if it's
* faster enough. As this is a new and experimental
* feature, please report back on your findings so we can
* learn, improve and iterate.</p>
*
* <p>The resulting reader suppports {@link
* IndexReader#reopen}, but that call will simply forward
* back to this method (though this may change in the
* future).</p>
*
* <p>The very first time this method is called, this
* writer instance will make every effort to pool the
* readers that it opens for doing merges, applying
* deletes, etc. This means additional resources (RAM,
* file descriptors, CPU time) will be consumed.</p>
*
* <p>For lower latency on reopening a reader, you may
* want to call {@link #setMergedSegmentWarmer} to
* pre-warm a newly merged segment before it's committed
* to the index.</p>
*
* <p>If an addIndexes* call is running in another thread,
* then this reader will only search those segments from
* the foreign index that have been successfully copied
* over, so far</p>.
*
* <p><b>NOTE</b>: Once the writer is closed, any
* outstanding readers may continue to be used. However,
* if you attempt to reopen any of those readers, you'll
* hit an {@link AlreadyClosedException}.</p>
*
* <p><b>NOTE:</b> This API is experimental and might
* change in incompatible ways in the next release.</p>
*
* @return IndexReader that covers entire index plus all
* changes made so far by this IndexWriter instance
*
* @throws IOException
*/
public IndexReader getReader() throws IOException {
if (infoStream != null) {
message("flush at getReader");
}
// Do this up front before flushing so that the readers
// obtained during this flush are pooled, the first time
// this method is called:
poolReaders = true;
flush(true, true, true);
// Prevent segmentInfos from changing while opening the
// reader; in theory we could do similar retry logic,
// just like we do when loading segments_N
synchronized(this) {
return new ReadOnlyMultiSegmentReader(this, segmentInfos);
}
}
/** Holds shared SegmentReader instances. IndexWriter uses
* SegmentReaders for 1) applying deletes, 2) doing
* merges, 3) handing out a real-time reader. This pool
* reuses instances of the SegmentReaders in all these
* places if it is in "near real-time mode" (getReader()
* has been called on this instance). */
class ReaderPool {
private final Map readerMap = new HashMap();
/** Forcefully clear changes for the specifed segments,
* and remove from the pool. This is called on succesful merge. */
synchronized void clear(SegmentInfos infos) throws IOException {
if (infos == null) {
Iterator iter = readerMap.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry ent = (Map.Entry) iter.next();
((SegmentReader) ent.getValue()).hasChanges = false;
}
} else {
final int numSegments = infos.size();
for(int i=0;i<numSegments;i++) {
final SegmentInfo info = infos.info(i);
if (readerMap.containsKey(info)) {
((SegmentReader) readerMap.get(info)).hasChanges = false;
}
}
}
}
// used only by asserts
synchronized boolean infoIsLive(SegmentInfo info) {
int idx = segmentInfos.indexOf(info);
assert idx != -1;
assert segmentInfos.get(idx) == info;
return true;
}
synchronized SegmentInfo mapToLive(SegmentInfo info) {
int idx = segmentInfos.indexOf(info);
if (idx != -1) {
info = (SegmentInfo) segmentInfos.get(idx);
}
return info;
}
synchronized void release(SegmentReader sr) throws IOException {
release(sr, false);
}
synchronized void release(SegmentReader sr, boolean drop) throws IOException {
final boolean pooled = readerMap.containsKey(sr.getSegmentInfo());
assert !pooled | readerMap.get(sr.getSegmentInfo()) == sr;
// Drop caller's ref
sr.decRef();
if (pooled && (drop || (!poolReaders && sr.getRefCount() == 1))) {
// We are the last ref to this reader; since we're
// not pooling readers, we release it:
readerMap.remove(sr.getSegmentInfo());
// TODO: java 5
// assert !sr.hasChanges || Thread.holdsLock(IndexWriter.this);
// Drop our ref -- this will commit any pending
// changes to the dir
boolean success = false;
try {
sr.close();
success = true;
} finally {
if (!success && sr.hasChanges) {
// Abandon the changes & retry closing:
sr.hasChanges = false;
try {
sr.close();
} catch (Throwable ignore) {
// Keep throwing original exception
}
}
}
}
}
/** Remove all our references to readers, and commits
* any pending changes. */
public synchronized void close() throws IOException {
Iterator iter = readerMap.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry ent = (Map.Entry) iter.next();
SegmentReader sr = (SegmentReader) ent.getValue();
if (sr.hasChanges) {
assert infoIsLive(sr.getSegmentInfo());
sr.startCommit();
boolean success = false;
try {
sr.doCommit(null);
success = true;
} finally {
if (!success) {
sr.rollbackCommit();
}
}
}
iter.remove();
// NOTE: it is allowed that this decRef does not
// actually close the SR; this can happen when a
// near real-time reader is kept open after the
// IndexWriter instance is closed
sr.decRef();
}
}
public synchronized void commit() throws IOException {
Iterator iter = readerMap.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry ent = (Map.Entry) iter.next();
SegmentReader sr = (SegmentReader) ent.getValue();
if (sr.hasChanges) {
assert infoIsLive(sr.getSegmentInfo());
sr.startCommit();
boolean success = false;
try {
sr.doCommit(null);
success = true;
} finally {
if (!success) {
sr.rollbackCommit();
}
}
}
}
}
// Returns a ref to a clone. NOTE: this clone is not
// enrolled in the pool, so you should simply close()
// it when you're done (ie, do not call release()).
public synchronized SegmentReader getReadOnlyClone(SegmentInfo info, boolean doOpenStores) throws IOException {
SegmentReader sr = get(info, doOpenStores);
try {
return (SegmentReader) sr.clone(true);
} finally {
sr.decRef();
}
}
// Returns a ref
public synchronized SegmentReader get(SegmentInfo info, boolean doOpenStores) throws IOException {
return get(info, doOpenStores, BufferedIndexInput.BUFFER_SIZE);
}
public synchronized SegmentReader get(SegmentInfo info, boolean doOpenStores, int readBufferSize) throws IOException {
if (poolReaders) {
readBufferSize = BufferedIndexInput.BUFFER_SIZE;
}
SegmentReader sr = (SegmentReader) readerMap.get(info);
if (sr == null) {
// TODO: we may want to avoid doing this while
// synchronized
// Returns a ref, which we xfer to readerMap:
sr = SegmentReader.get(info, readBufferSize, doOpenStores);
sr.writer = IndexWriter.this;
readerMap.put(info, sr);
} else if (doOpenStores) {
sr.openDocStores();
}
// Return a ref to our caller
sr.incRef();
return sr;
}
}
synchronized void acquireWrite() { synchronized void acquireWrite() {
assert writeThread != Thread.currentThread(); assert writeThread != Thread.currentThread();
while(writeThread != null || readCount > 0) while(writeThread != null || readCount > 0)
@ -415,6 +681,10 @@ public class IndexWriter {
notifyAll(); notifyAll();
} }
synchronized final boolean isOpen(boolean includePendingClose) {
return !(closed || (includePendingClose && closing));
}
/** /**
* Used internally to throw an {@link * Used internally to throw an {@link
* AlreadyClosedException} if this IndexWriter has been * AlreadyClosedException} if this IndexWriter has been
@ -422,7 +692,7 @@ public class IndexWriter {
* @throws AlreadyClosedException if this IndexWriter is * @throws AlreadyClosedException if this IndexWriter is
*/ */
protected synchronized final void ensureOpen(boolean includePendingClose) throws AlreadyClosedException { protected synchronized final void ensureOpen(boolean includePendingClose) throws AlreadyClosedException {
if (closed || (includePendingClose && closing)) { if (!isOpen(includePendingClose)) {
throw new AlreadyClosedException("this IndexWriter is closed"); throw new AlreadyClosedException("this IndexWriter is closed");
} }
} }
@ -1795,6 +2065,7 @@ public class IndexWriter {
message("at close: " + segString()); message("at close: " + segString());
synchronized(this) { synchronized(this) {
readerPool.close();
docWriter = null; docWriter = null;
deleter.close(); deleter.close();
} }
@ -1851,6 +2122,10 @@ public class IndexWriter {
if (useCompoundDocStore && docStoreSegment != null && docWriter.closedFiles().size() != 0) { if (useCompoundDocStore && docStoreSegment != null && docWriter.closedFiles().size() != 0) {
// Now build compound doc store file // Now build compound doc store file
if (infoStream != null) {
message("create compound file " + docStoreSegment + "." + IndexFileNames.COMPOUND_FILE_STORE_EXTENSION);
}
success = false; success = false;
final int numSegments = segmentInfos.size(); final int numSegments = segmentInfos.size();
@ -2756,7 +3031,9 @@ public class IndexWriter {
// First restore autoCommit in case we hit an exception below: // First restore autoCommit in case we hit an exception below:
autoCommit = localAutoCommit; autoCommit = localAutoCommit;
if (docWriter != null) {
docWriter.setFlushedDocCount(localFlushedDocCount); docWriter.setFlushedDocCount(localFlushedDocCount);
}
// Must finish merges before rolling back segmentInfos // Must finish merges before rolling back segmentInfos
// so merges don't hit exceptions on trying to commit // so merges don't hit exceptions on trying to commit
@ -2912,6 +3189,9 @@ public class IndexWriter {
deleter.refresh(); deleter.refresh();
} }
// Don't bother saving any changes in our segmentInfos
readerPool.clear(null);
lastCommitChangeCount = changeCount; lastCommitChangeCount = changeCount;
success = true; success = true;
@ -3098,9 +3378,11 @@ public class IndexWriter {
hitOOM = true; hitOOM = true;
throw oom; throw oom;
} finally { } finally {
if (docWriter != null) {
docWriter.resumeAllThreads(); docWriter.resumeAllThreads();
} }
} }
}
private synchronized void resetMergeExceptions() { private synchronized void resetMergeExceptions() {
mergeExceptions = new ArrayList(); mergeExceptions = new ArrayList();
@ -3239,9 +3521,11 @@ public class IndexWriter {
hitOOM = true; hitOOM = true;
throw oom; throw oom;
} finally { } finally {
if (docWriter != null) {
docWriter.resumeAllThreads(); docWriter.resumeAllThreads();
} }
} }
}
private boolean hasExternalSegments() { private boolean hasExternalSegments() {
return segmentInfos.hasExternalSegments(directory); return segmentInfos.hasExternalSegments(directory);
@ -3387,10 +3671,10 @@ public class IndexWriter {
mergedName = newSegmentName(); mergedName = newSegmentName();
merger = new SegmentMerger(this, mergedName, null); merger = new SegmentMerger(this, mergedName, null);
IndexReader sReader = null; SegmentReader sReader = null;
synchronized(this) { synchronized(this) {
if (segmentInfos.size() == 1) { // add existing index, if any if (segmentInfos.size() == 1) { // add existing index, if any
sReader = SegmentReader.get(true, segmentInfos.info(0)); sReader = readerPool.get(segmentInfos.info(0), true);
} }
} }
@ -3405,11 +3689,6 @@ public class IndexWriter {
int docCount = merger.merge(); // merge 'em int docCount = merger.merge(); // merge 'em
if(sReader != null) {
sReader.close();
sReader = null;
}
synchronized(this) { synchronized(this) {
segmentInfos.clear(); // pop old infos & add new segmentInfos.clear(); // pop old infos & add new
info = new SegmentInfo(mergedName, docCount, directory, false, true, info = new SegmentInfo(mergedName, docCount, directory, false, true,
@ -3424,7 +3703,7 @@ public class IndexWriter {
} finally { } finally {
if (sReader != null) { if (sReader != null) {
sReader.close(); readerPool.release(sReader);
} }
} }
} finally { } finally {
@ -3485,9 +3764,11 @@ public class IndexWriter {
hitOOM = true; hitOOM = true;
throw oom; throw oom;
} finally { } finally {
if (docWriter != null) {
docWriter.resumeAllThreads(); docWriter.resumeAllThreads();
} }
} }
}
// This is called after pending added and deleted // This is called after pending added and deleted
// documents have been flushed to the Directory but before // documents have been flushed to the Directory but before
@ -3735,6 +4016,9 @@ public class IndexWriter {
// stores when we flush // stores when we flush
flushDocStores |= autoCommit; flushDocStores |= autoCommit;
String docStoreSegment = docWriter.getDocStoreSegment(); String docStoreSegment = docWriter.getDocStoreSegment();
assert docStoreSegment != null || numDocs == 0;
if (docStoreSegment == null) if (docStoreSegment == null)
flushDocStores = false; flushDocStores = false;
@ -3876,7 +4160,7 @@ public class IndexWriter {
int first = segmentInfos.indexOf(merge.segments.info(0)); int first = segmentInfos.indexOf(merge.segments.info(0));
if (first == -1) if (first == -1)
throw new MergePolicy.MergeException("could not find segment " + merge.segments.info(0).name + " in current segments", directory); throw new MergePolicy.MergeException("could not find segment " + merge.segments.info(0).name + " in current index " + segString(), directory);
final int numSegments = segmentInfos.size(); final int numSegments = segmentInfos.size();
@ -3886,7 +4170,7 @@ public class IndexWriter {
if (first + i >= numSegments || !segmentInfos.info(first+i).equals(info)) { if (first + i >= numSegments || !segmentInfos.info(first+i).equals(info)) {
if (segmentInfos.indexOf(info) == -1) if (segmentInfos.indexOf(info) == -1)
throw new MergePolicy.MergeException("MergePolicy selected a segment (" + info.name + ") that is not in the index", directory); throw new MergePolicy.MergeException("MergePolicy selected a segment (" + info.name + ") that is not in the current index " + segString(), directory);
else else
throw new MergePolicy.MergeException("MergePolicy selected non-contiguous segments to merge (" + merge.segString(directory) + " vs " + segString() + "), which IndexWriter (currently) cannot handle", throw new MergePolicy.MergeException("MergePolicy selected non-contiguous segments to merge (" + merge.segString(directory) + " vs " + segString() + "), which IndexWriter (currently) cannot handle",
directory); directory);
@ -3905,11 +4189,10 @@ public class IndexWriter {
* saves the resulting deletes file (incrementing the * saves the resulting deletes file (incrementing the
* delete generation for merge.info). If no deletes were * delete generation for merge.info). If no deletes were
* flushed, no new deletes file is saved. */ * flushed, no new deletes file is saved. */
synchronized private void commitMergedDeletes(MergePolicy.OneMerge merge) throws IOException { synchronized private void commitMergedDeletes(MergePolicy.OneMerge merge, SegmentReader mergeReader) throws IOException {
assert testPoint("startCommitMergeDeletes"); assert testPoint("startCommitMergeDeletes");
final SegmentInfos sourceSegmentsClone = merge.segmentsClone;
final SegmentInfos sourceSegments = merge.segments; final SegmentInfos sourceSegments = merge.segments;
if (infoStream != null) if (infoStream != null)
@ -3917,21 +4200,15 @@ public class IndexWriter {
// Carefully merge deletes that occurred after we // Carefully merge deletes that occurred after we
// started merging: // started merging:
BitVector deletes = null;
int docUpto = 0; int docUpto = 0;
int delCount = 0; int delCount = 0;
final int numSegmentsToMerge = sourceSegments.size(); for(int i=0; i < sourceSegments.size(); i++) {
for(int i=0;i<numSegmentsToMerge;i++) { SegmentInfo info = sourceSegments.info(i);
final SegmentInfo previousInfo = sourceSegmentsClone.info(i); int docCount = info.docCount;
final SegmentInfo currentInfo = sourceSegments.info(i); SegmentReader previousReader = merge.readersClone[i];
SegmentReader currentReader = merge.readers[i];
assert currentInfo.docCount == previousInfo.docCount; if (previousReader.hasDeletions()) {
final int docCount = currentInfo.docCount;
if (previousInfo.hasDeletions()) {
// There were deletes on this segment when the merge // There were deletes on this segment when the merge
// started. The merge has collapsed away those // started. The merge has collapsed away those
@ -3940,65 +4217,46 @@ public class IndexWriter {
// newly flushed deletes but mapping them to the new // newly flushed deletes but mapping them to the new
// docIDs. // docIDs.
assert currentInfo.hasDeletions(); if (currentReader.numDeletedDocs() > previousReader.numDeletedDocs()) {
// Load deletes present @ start of merge, for this segment:
BitVector previousDeletes = new BitVector(previousInfo.dir, previousInfo.getDelFileName());
if (!currentInfo.getDelFileName().equals(previousInfo.getDelFileName())) {
// This means this segment has had new deletes // This means this segment has had new deletes
// committed since we started the merge, so we // committed since we started the merge, so we
// must merge them: // must merge them:
if (deletes == null)
deletes = new BitVector(merge.info.docCount);
BitVector currentDeletes = new BitVector(currentInfo.dir, currentInfo.getDelFileName());
for(int j=0;j<docCount;j++) { for(int j=0;j<docCount;j++) {
if (previousDeletes.get(j)) if (previousReader.isDeleted(j))
assert currentDeletes.get(j); assert currentReader.isDeleted(j);
else { else {
if (currentDeletes.get(j)) { if (currentReader.isDeleted(j)) {
deletes.set(docUpto); mergeReader.doDelete(docUpto);
delCount++; delCount++;
} }
docUpto++; docUpto++;
} }
} }
} else } else {
docUpto += docCount - previousDeletes.count(); docUpto += docCount - previousReader.numDeletedDocs();
}
} else if (currentInfo.hasDeletions()) { } else if (currentReader.hasDeletions()) {
// This segment had no deletes before but now it // This segment had no deletes before but now it
// does: // does:
if (deletes == null) for(int j=0; j<docCount; j++) {
deletes = new BitVector(merge.info.docCount); if (currentReader.isDeleted(j)) {
BitVector currentDeletes = new BitVector(directory, currentInfo.getDelFileName()); mergeReader.doDelete(docUpto);
for(int j=0;j<docCount;j++) {
if (currentDeletes.get(j)) {
deletes.set(docUpto);
delCount++; delCount++;
} }
docUpto++; docUpto++;
} }
} else } else
// No deletes before or after // No deletes before or after
docUpto += currentInfo.docCount; docUpto += info.docCount;
} }
if (deletes != null) { assert mergeReader.numDeletedDocs() == delCount;
merge.info.advanceDelGen();
if (infoStream != null) mergeReader.hasChanges = delCount >= 0;
message("commit merge deletes to " + merge.info.getDelFileName());
deletes.write(directory, merge.info.getDelFileName());
merge.info.setDelCount(delCount);
assert delCount == deletes.count();
}
} }
/* FIXME if we want to support non-contiguous segment merges */ /* FIXME if we want to support non-contiguous segment merges */
synchronized private boolean commitMerge(MergePolicy.OneMerge merge, SegmentMerger merger, int mergedDocCount) throws IOException { synchronized private boolean commitMerge(MergePolicy.OneMerge merge, SegmentMerger merger, int mergedDocCount, SegmentReader mergedReader) throws IOException {
assert testPoint("startCommitMerge"); assert testPoint("startCommitMerge");
@ -4026,8 +4284,7 @@ public class IndexWriter {
final int start = ensureContiguousMerge(merge); final int start = ensureContiguousMerge(merge);
commitMergedDeletes(merge); commitMergedDeletes(merge, mergedReader);
docWriter.remapDeletes(segmentInfos, merger.getDocMaps(), merger.getDelCounts(), merge, mergedDocCount); docWriter.remapDeletes(segmentInfos, merger.getDocMaps(), merger.getDelCounts(), merge, mergedDocCount);
// Simple optimization: if the doc store we are using // Simple optimization: if the doc store we are using
@ -4055,22 +4312,19 @@ public class IndexWriter {
assert !segmentInfos.contains(merge.info); assert !segmentInfos.contains(merge.info);
segmentInfos.add(start, merge.info); segmentInfos.add(start, merge.info);
// Must checkpoint before decrefing so any newly // If the merged segments had pending changes, clear
// referenced files in the new merge.info are incref'd // them so that they don't bother writing them to
// first: // disk, updating SegmentInfo, etc.:
checkpoint(); readerPool.clear(merge.segments);
decrefMergeSegments(merge);
if (merge.optimize) if (merge.optimize)
segmentsToOptimize.add(merge.info); segmentsToOptimize.add(merge.info);
return true; return true;
} }
private void decrefMergeSegments(MergePolicy.OneMerge merge) throws IOException { private synchronized void decrefMergeSegments(MergePolicy.OneMerge merge) throws IOException {
assert merge.increfDone; assert merge.increfDone;
merge.increfDone = false; merge.increfDone = false;
deleter.decRef(merge.segmentsClone);
} }
final private void handleMergeException(Throwable t, MergePolicy.OneMerge merge) throws IOException { final private void handleMergeException(Throwable t, MergePolicy.OneMerge merge) throws IOException {
@ -4322,15 +4576,8 @@ public class IndexWriter {
if (infoStream != null) if (infoStream != null)
message("now flush at merge"); message("now flush at merge");
doFlush(true, false); doFlush(true, false);
//flush(false, true, false);
} }
// We must take a full copy at this point so that we can
// properly merge deletes in commitMerge()
merge.segmentsClone = (SegmentInfos) merge.segments.clone();
deleter.incRef(merge.segmentsClone, false);
merge.increfDone = true; merge.increfDone = true;
merge.mergeDocStores = mergeDocStores; merge.mergeDocStores = mergeDocStores;
@ -4430,7 +4677,6 @@ public class IndexWriter {
int mergedDocCount = 0; int mergedDocCount = 0;
SegmentInfos sourceSegments = merge.segments; SegmentInfos sourceSegments = merge.segments;
SegmentInfos sourceSegmentsClone = merge.segmentsClone;
final int numSegments = sourceSegments.size(); final int numSegments = sourceSegments.size();
if (infoStream != null) if (infoStream != null)
@ -4438,39 +4684,140 @@ public class IndexWriter {
merger = new SegmentMerger(this, mergedName, merge); merger = new SegmentMerger(this, mergedName, merge);
merge.readers = new SegmentReader[numSegments];
merge.readersClone = new SegmentReader[numSegments];
boolean mergeDocStores = false;
final Set dss = new HashSet();
// This is try/finally to make sure merger's readers are // This is try/finally to make sure merger's readers are
// closed: // closed:
boolean success = false;
try { try {
int totDocCount = 0; int totDocCount = 0;
for (int i = 0; i < numSegments; i++) { for (int i = 0; i < numSegments; i++) {
SegmentInfo si = sourceSegmentsClone.info(i);
IndexReader reader = SegmentReader.get(true, si, MERGE_READ_BUFFER_SIZE, merge.mergeDocStores); // no need to set deleter (yet) final SegmentInfo info = sourceSegments.info(i);
merger.add(reader);
totDocCount += reader.numDocs(); // Hold onto the "live" reader; we will use this to
// commit merged deletes
SegmentReader reader = merge.readers[i] = readerPool.get(info, merge.mergeDocStores,
MERGE_READ_BUFFER_SIZE);
// We clone the segment readers because other
// deletes may come in while we're merging so we
// need readers that will not change
SegmentReader clone = merge.readersClone[i] = (SegmentReader) reader.clone(true);
merger.add(clone);
if (clone.hasDeletions()) {
mergeDocStores = true;
} }
if (info.getDocStoreOffset() != -1) {
dss.add(info.getDocStoreSegment());
}
totDocCount += clone.numDocs();
}
if (infoStream != null) { if (infoStream != null) {
message("merge: total "+totDocCount+" docs"); message("merge: total "+totDocCount+" docs");
} }
merge.checkAborted(directory); merge.checkAborted(directory);
// If deletions have arrived and it has now become
// necessary to merge doc stores, go and open them:
if (mergeDocStores && !merge.mergeDocStores) {
merge.mergeDocStores = true;
synchronized(this) {
if (dss.contains(docWriter.getDocStoreSegment())) {
if (infoStream != null)
message("now flush at mergeMiddle");
doFlush(true, false);
}
}
for(int i=0;i<numSegments;i++) {
merge.readersClone[i].openDocStores();
}
// Clear DSS
synchronized(this) {
merge.info.setDocStore(-1, null, false);
}
}
// This is where all the work happens: // This is where all the work happens:
mergedDocCount = merge.info.docCount = merger.merge(merge.mergeDocStores); mergedDocCount = merge.info.docCount = merger.merge(merge.mergeDocStores);
assert mergedDocCount == totDocCount; assert mergedDocCount == totDocCount;
// TODO: in the non-realtime case, we may want to only
// keep deletes (it's costly to open entire reader
// when we just need deletes)
final SegmentReader mergedReader = readerPool.get(merge.info, false);
try {
if (poolReaders && mergedSegmentWarmer != null) {
mergedSegmentWarmer.warm(mergedReader);
}
if (!commitMerge(merge, merger, mergedDocCount, mergedReader))
// commitMerge will return false if this merge was aborted
return 0;
} finally { } finally {
// close readers before we attempt to delete synchronized(this) {
// now-obsolete segments readerPool.release(mergedReader);
if (merger != null) {
merger.closeReaders();
} }
} }
if (!commitMerge(merge, merger, mergedDocCount)) success = true;
// commitMerge will return false if this merge was aborted } finally {
return 0; synchronized(this) {
if (!success) {
// Suppress any new exceptions so we throw the
// original cause
for (int i=0;i<numSegments;i++) {
if (merge.readers[i] != null) {
try {
readerPool.release(merge.readers[i], true);
} catch (Throwable t) {
}
}
if (merge.readersClone[i] != null) {
try {
merge.readersClone[i].close();
} catch (Throwable t) {
}
// This was a private clone and we had the only reference
assert merge.readersClone[i].getRefCount() == 0;
}
}
} else {
for (int i=0;i<numSegments;i++) {
if (merge.readers[i] != null) {
readerPool.release(merge.readers[i], true);
}
if (merge.readersClone[i] != null) {
merge.readersClone[i].close();
// This was a private clone and we had the only reference
assert merge.readersClone[i].getRefCount() == 0;
}
}
}
}
}
// Must checkpoint before decrefing so any newly
// referenced files in the new merge.info are incref'd
// first:
checkpoint();
decrefMergeSegments(merge);
if (merge.useCompoundFile) { if (merge.useCompoundFile) {
@ -4484,7 +4831,7 @@ public class IndexWriter {
commit(size); commit(size);
} }
boolean success = false; success = false;
final String compoundFileName = mergedName + "." + IndexFileNames.COMPOUND_FILE_EXTENSION; final String compoundFileName = mergedName + "." + IndexFileNames.COMPOUND_FILE_EXTENSION;
try { try {
@ -4795,6 +5142,8 @@ public class IndexWriter {
if (infoStream != null) if (infoStream != null)
message("startCommit index=" + segString(segmentInfos) + " changeCount=" + changeCount); message("startCommit index=" + segString(segmentInfos) + " changeCount=" + changeCount);
readerPool.commit();
toSync = (SegmentInfos) segmentInfos.clone(); toSync = (SegmentInfos) segmentInfos.clone();
if (commitUserData != null) if (commitUserData != null)
@ -5012,6 +5361,31 @@ public class IndexWriter {
= new MaxFieldLength("LIMITED", DEFAULT_MAX_FIELD_LENGTH); = new MaxFieldLength("LIMITED", DEFAULT_MAX_FIELD_LENGTH);
} }
/** If {@link #getReader} has been called (ie, this writer
* is in near real-time mode), then after a merge
* completes, this class can be invoked to warm the
* reader on the newly merged segment, before the merge
* commits. This is not required for near real-time
* search, but will reduce search latency on opening a
* new near real-time reader after a merge completes.
*
* <p><b>NOTE:</b> This API is experimental and might
* change in incompatible ways in the next release.</p>
*
* <p><b>NOTE</b>: warm is called before any deletes have
* been carried over to the merged segment. */
public static abstract class IndexReaderWarmer {
public abstract void warm(IndexReader reader) throws IOException;
}
private IndexReaderWarmer mergedSegmentWarmer;
/** Set the merged segment warmer. See {@link
* IndexReaderWarmer}. */
public void setMergedSegmentWarmer(IndexReaderWarmer warmer) {
mergedSegmentWarmer = warmer;
}
// Used only by assert for testing. Current points: // Used only by assert for testing. Current points:
// startDoFlush // startDoFlush
// startCommitMerge // startCommitMerge

View File

@ -74,13 +74,13 @@ public abstract class MergePolicy {
SegmentInfo info; // used by IndexWriter SegmentInfo info; // used by IndexWriter
boolean mergeDocStores; // used by IndexWriter boolean mergeDocStores; // used by IndexWriter
boolean optimize; // used by IndexWriter boolean optimize; // used by IndexWriter
SegmentInfos segmentsClone; // used by IndexWriter
boolean increfDone; // used by IndexWriter boolean increfDone; // used by IndexWriter
boolean registerDone; // used by IndexWriter boolean registerDone; // used by IndexWriter
long mergeGen; // used by IndexWriter long mergeGen; // used by IndexWriter
boolean isExternal; // used by IndexWriter boolean isExternal; // used by IndexWriter
int maxNumSegmentsOptimize; // used by IndexWriter int maxNumSegmentsOptimize; // used by IndexWriter
SegmentReader[] readers; // used by IndexWriter
SegmentReader[] readersClone; // used by IndexWriter
final SegmentInfos segments; final SegmentInfos segments;
final boolean useCompoundFile; final boolean useCompoundFile;
boolean aborted; boolean aborted;

View File

@ -51,20 +51,69 @@ class MultiSegmentReader extends DirectoryIndexReader implements Cloneable {
SegmentReader[] readers = new SegmentReader[sis.size()]; SegmentReader[] readers = new SegmentReader[sis.size()];
for (int i = sis.size()-1; i >= 0; i--) { for (int i = sis.size()-1; i >= 0; i--) {
boolean success = false;
try { try {
readers[i] = SegmentReader.get(readOnly, sis.info(i)); readers[i] = SegmentReader.get(readOnly, sis.info(i));
} catch (IOException e) { success = true;
} finally {
if (!success) {
// Close all readers we had opened: // Close all readers we had opened:
for(i++;i<sis.size();i++) { for(i++;i<sis.size();i++) {
try { try {
readers[i].close(); readers[i].close();
} catch (IOException ignore) { } catch (Throwable ignore) {
// keep going - we want to clean up as much as possible // keep going - we want to clean up as much as possible
} }
} }
throw e;
} }
} }
}
initialize(readers);
}
// Used by near real-time search
MultiSegmentReader(IndexWriter writer, SegmentInfos infos) throws IOException {
super(writer.getDirectory(), infos, false, true);
// IndexWriter synchronizes externally before calling
// us, which ensures infos will not change; so there's
// no need to process segments in reverse order
final int numSegments = infos.size();
SegmentReader[] readers = new SegmentReader[numSegments];
final Directory dir = writer.getDirectory();
int upto = 0;
for (int i=0;i<numSegments;i++) {
boolean success = false;
try {
final SegmentInfo info = infos.info(upto);
if (info.dir == dir) {
readers[upto++] = writer.readerPool.getReadOnlyClone(info, true);
}
success = true;
} finally {
if (!success) {
// Close all readers we had opened:
for(upto--;upto>=0;upto--) {
try {
readers[upto].close();
} catch (Throwable ignore) {
// keep going - we want to clean up as much as possible
}
}
}
}
}
this.writer = writer;
if (upto < readers.length) {
// This means some segments were in a foreign Directory
SegmentReader[] newReaders = new SegmentReader[upto];
System.arraycopy(readers, 0, newReaders, 0, upto);
readers = newReaders;
}
initialize(readers); initialize(readers);
} }

View File

@ -31,6 +31,10 @@ class ReadOnlyMultiSegmentReader extends MultiSegmentReader {
super(directory, infos, closeDirectory, oldReaders, oldStarts, oldNormsCache, true, doClone); super(directory, infos, closeDirectory, oldReaders, oldStarts, oldNormsCache, true, doClone);
} }
ReadOnlyMultiSegmentReader(IndexWriter writer, SegmentInfos infos) throws IOException {
super(writer, infos);
}
protected void acquireWriteLock() { protected void acquireWriteLock() {
ReadOnlySegmentReader.noWrite(); ReadOnlySegmentReader.noWrite();
} }

View File

@ -79,6 +79,10 @@ final class SegmentInfo {
private boolean hasProx; // True if this segment has any fields with omitTermFreqAndPositions==false private boolean hasProx; // True if this segment has any fields with omitTermFreqAndPositions==false
public String toString() {
return "si: "+dir.toString()+" "+name+" docCount: "+docCount+" delCount: "+delCount+" delFileName: "+getDelFileName();
}
public SegmentInfo(String name, int docCount, Directory dir) { public SegmentInfo(String name, int docCount, Directory dir) {
this.name = name; this.name = name;
this.docCount = docCount; this.docCount = docCount;
@ -491,6 +495,12 @@ final class SegmentInfo {
clearFiles(); clearFiles();
} }
void setDocStore(int offset, String segment, boolean isCompoundFile) {
docStoreOffset = offset;
docStoreSegment = segment;
docStoreIsCompoundFile = isCompoundFile;
}
/** /**
* Save this segment's info. * Save this segment's info.
*/ */

View File

@ -24,6 +24,7 @@ final class SegmentMergeInfo {
int base; int base;
TermEnum termEnum; TermEnum termEnum;
IndexReader reader; IndexReader reader;
int delCount;
private TermPositions postings; // use getPositions() private TermPositions postings; // use getPositions()
private int[] docMap; // use getDocMap() private int[] docMap; // use getDocMap()
@ -38,15 +39,17 @@ final class SegmentMergeInfo {
// maps around deleted docs // maps around deleted docs
int[] getDocMap() { int[] getDocMap() {
if (docMap == null) { if (docMap == null) {
delCount = 0;
// build array which maps document numbers around deletions // build array which maps document numbers around deletions
if (reader.hasDeletions()) { if (reader.hasDeletions()) {
int maxDoc = reader.maxDoc(); int maxDoc = reader.maxDoc();
docMap = new int[maxDoc]; docMap = new int[maxDoc];
int j = 0; int j = 0;
for (int i = 0; i < maxDoc; i++) { for (int i = 0; i < maxDoc; i++) {
if (reader.isDeleted(i)) if (reader.isDeleted(i)) {
delCount++;
docMap[i] = -1; docMap[i] = -1;
else } else
docMap[i] = j++; docMap[i] = j++;
} }
} }

View File

@ -21,7 +21,6 @@ import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Iterator; import java.util.Iterator;
import java.util.HashSet;
import java.util.List; import java.util.List;
import org.apache.lucene.document.Document; import org.apache.lucene.document.Document;
@ -516,6 +515,9 @@ final class SegmentMerger {
} }
base += reader.numDocs(); base += reader.numDocs();
assert reader.numDocs() == reader.maxDoc() - smi.delCount;
if (smi.next()) if (smi.next())
queue.put(smi); // initialize queue queue.put(smi); // initialize queue
else else

View File

@ -469,6 +469,55 @@ class SegmentReader extends DirectoryIndexReader {
return instance; return instance;
} }
synchronized void openDocStores() throws IOException {
if (fieldsReaderOrig == null) {
final Directory storeDir;
if (si.getDocStoreOffset() != -1) {
if (si.getDocStoreIsCompoundFile()) {
storeCFSReader = new CompoundFileReader(directory(),
si.getDocStoreSegment() + "." + IndexFileNames.COMPOUND_FILE_STORE_EXTENSION,
readBufferSize);
storeDir = storeCFSReader;
assert storeDir != null;
} else {
storeDir = directory();
assert storeDir != null;
}
} else if (si.getUseCompoundFile()) {
// In some cases, we were originally opened when CFS
// was not used, but then we are asked to open doc
// stores after the segment has switched to CFS
if (cfsReader == null) {
cfsReader = new CompoundFileReader(directory(), segment + "." + IndexFileNames.COMPOUND_FILE_EXTENSION, readBufferSize);
}
storeDir = cfsReader;
assert storeDir != null;
} else {
storeDir = directory();
assert storeDir != null;
}
final String storesSegment;
if (si.getDocStoreOffset() != -1) {
storesSegment = si.getDocStoreSegment();
} else {
storesSegment = segment;
}
fieldsReaderOrig = new FieldsReader(storeDir, storesSegment, fieldInfos, readBufferSize,
si.getDocStoreOffset(), si.docCount);
// Verify two sources of "maxDoc" agree:
if (si.getDocStoreOffset() == -1 && fieldsReaderOrig.size() != si.docCount) {
throw new CorruptIndexException("doc counts differ for segment " + si.name + ": fieldsReader shows " + fieldsReaderOrig.size() + " but segmentInfo shows " + si.docCount);
}
if (fieldInfos.hasVectors()) { // open term vector files only as needed
termVectorsReaderOrig = new TermVectorsReader(storeDir, storesSegment, fieldInfos, readBufferSize, si.getDocStoreOffset(), si.docCount);
}
}
}
private void initialize(SegmentInfo si, int readBufferSize, boolean doOpenStores) throws CorruptIndexException, IOException { private void initialize(SegmentInfo si, int readBufferSize, boolean doOpenStores) throws CorruptIndexException, IOException {
segment = si.name; segment = si.name;
this.si = si; this.si = si;
@ -484,23 +533,11 @@ class SegmentReader extends DirectoryIndexReader {
cfsDir = cfsReader; cfsDir = cfsReader;
} }
final Directory storeDir; fieldInfos = new FieldInfos(cfsDir, segment + ".fnm");
if (doOpenStores) { if (doOpenStores) {
if (si.getDocStoreOffset() != -1) { openDocStores();
if (si.getDocStoreIsCompoundFile()) {
storeCFSReader = new CompoundFileReader(directory(), si.getDocStoreSegment() + "." + IndexFileNames.COMPOUND_FILE_STORE_EXTENSION, readBufferSize);
storeDir = storeCFSReader;
} else {
storeDir = directory();
} }
} else {
storeDir = cfsDir;
}
} else
storeDir = null;
fieldInfos = new FieldInfos(cfsDir, segment + ".fnm");
boolean anyProx = false; boolean anyProx = false;
final int numFields = fieldInfos.size(); final int numFields = fieldInfos.size();
@ -508,23 +545,6 @@ class SegmentReader extends DirectoryIndexReader {
if (!fieldInfos.fieldInfo(i).omitTermFreqAndPositions) if (!fieldInfos.fieldInfo(i).omitTermFreqAndPositions)
anyProx = true; anyProx = true;
final String fieldsSegment;
if (si.getDocStoreOffset() != -1)
fieldsSegment = si.getDocStoreSegment();
else
fieldsSegment = segment;
if (doOpenStores) {
fieldsReaderOrig = new FieldsReader(storeDir, fieldsSegment, fieldInfos, readBufferSize,
si.getDocStoreOffset(), si.docCount);
// Verify two sources of "maxDoc" agree:
if (si.getDocStoreOffset() == -1 && fieldsReaderOrig.size() != si.docCount) {
throw new CorruptIndexException("doc counts differ for segment " + si.name + ": fieldsReader shows " + fieldsReaderOrig.size() + " but segmentInfo shows " + si.docCount);
}
}
tis = new TermInfosReader(cfsDir, segment, fieldInfos, readBufferSize); tis = new TermInfosReader(cfsDir, segment, fieldInfos, readBufferSize);
loadDeletedDocs(); loadDeletedDocs();
@ -536,14 +556,6 @@ class SegmentReader extends DirectoryIndexReader {
proxStream = cfsDir.openInput(segment + ".prx", readBufferSize); proxStream = cfsDir.openInput(segment + ".prx", readBufferSize);
openNorms(cfsDir, readBufferSize); openNorms(cfsDir, readBufferSize);
if (doOpenStores && fieldInfos.hasVectors()) { // open term vector files only as needed
final String vectorsSegment;
if (si.getDocStoreOffset() != -1)
vectorsSegment = si.getDocStoreSegment();
else
vectorsSegment = segment;
termVectorsReaderOrig = new TermVectorsReader(storeDir, vectorsSegment, fieldInfos, readBufferSize, si.getDocStoreOffset(), si.docCount);
}
success = true; success = true;
} finally { } finally {
@ -1210,7 +1222,7 @@ class SegmentReader extends DirectoryIndexReader {
/** /**
* Return the name of the segment this reader is reading. * Return the name of the segment this reader is reading.
*/ */
String getSegmentName() { public String getSegmentName() {
return segment; return segment;
} }

View File

@ -61,6 +61,7 @@ public class TestSnapshotDeletionPolicy extends LuceneTestCase
MockRAMDirectory dir2 = new MockRAMDirectory(); MockRAMDirectory dir2 = new MockRAMDirectory();
runTest(dir2); runTest(dir2);
dir2.close();
} }
public void testReuseAcrossWriters() throws Exception { public void testReuseAcrossWriters() throws Exception {

View File

@ -155,11 +155,40 @@ public class TestIndexFileDeleter extends LuceneTestCase
Arrays.sort(files); Arrays.sort(files);
Arrays.sort(files2); Arrays.sort(files2);
Set dif = difFiles(files, files2);
if (!Arrays.equals(files, files2)) { if (!Arrays.equals(files, files2)) {
fail("IndexFileDeleter failed to delete unreferenced extra files: should have deleted " + (filesPre.length-files.length) + " files but only deleted " + (filesPre.length - files2.length) + "; expected files:\n " + asString(files) + "\n actual files:\n " + asString(files2)); fail("IndexFileDeleter failed to delete unreferenced extra files: should have deleted " + (filesPre.length-files.length) + " files but only deleted " + (filesPre.length - files2.length) + "; expected files:\n " + asString(files) + "\n actual files:\n " + asString(files2)+"\ndif: "+dif);
} }
} }
private static Set difFiles(String[] files1, String[] files2) {
Set set1 = new HashSet();
Set set2 = new HashSet();
Set extra = new HashSet();
for (int x=0; x < files1.length; x++) {
set1.add(files1[x]);
}
for (int x=0; x < files2.length; x++) {
set2.add(files2[x]);
}
Iterator i1 = set1.iterator();
while (i1.hasNext()) {
Object o = i1.next();
if (!set2.contains(o)) {
extra.add(o);
}
}
Iterator i2 = set2.iterator();
while (i2.hasNext()) {
Object o = i2.next();
if (!set1.contains(o)) {
extra.add(o);
}
}
return extra;
}
private String asString(String[] l) { private String asString(String[] l) {
String s = ""; String s = "";
for(int i=0;i<l.length;i++) { for(int i=0;i<l.length;i++) {

View File

@ -1131,7 +1131,7 @@ public class TestIndexReader extends LuceneTestCase
// IllegalStateException because above out-of-bounds // IllegalStateException because above out-of-bounds
// deleteDocument corrupted the index: // deleteDocument corrupted the index:
writer.optimize(); writer.optimize();
writer.close();
if (!gotException) { if (!gotException) {
fail("delete of out-of-bounds doc number failed to hit exception"); fail("delete of out-of-bounds doc number failed to hit exception");
} }

View File

@ -949,7 +949,7 @@ public class TestIndexReaderReopen extends LuceneTestCase {
r.close(); r.close();
} }
private static Document createDocument(int n, int numFields) { public static Document createDocument(int n, int numFields) {
StringBuffer sb = new StringBuffer(); StringBuffer sb = new StringBuffer();
Document doc = new Document(); Document doc = new Document();
sb.append("a"); sb.append("a");

View File

@ -0,0 +1,793 @@
package org.apache.lucene.index;
/**
* Copyright 2004 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.IOException;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lucene.analysis.WhitespaceAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.Field.Index;
import org.apache.lucene.document.Field.Store;
import org.apache.lucene.document.Field.TermVector;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.MockRAMDirectory;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util._TestUtil;
public class TestIndexWriterReader extends LuceneTestCase {
static PrintStream infoStream;
public static int count(Term t, IndexReader r) throws IOException {
int count = 0;
TermDocs td = r.termDocs(t);
while (td.next()) {
td.doc();
count++;
}
td.close();
return count;
}
public void testUpdateDocument() throws Exception {
boolean optimize = true;
Directory dir1 = new MockRAMDirectory();
IndexWriter writer = new IndexWriter(dir1, new WhitespaceAnalyzer(),
IndexWriter.MaxFieldLength.LIMITED);
// create the index
createIndexNoClose(!optimize, "index1", writer);
// writer.flush(false, true, true);
// get a reader
IndexReader r1 = writer.getReader();
String id10 = r1.document(10).getField("id").stringValue();
Document newDoc = r1.document(10);
newDoc.removeField("id");
newDoc.add(new Field("id", Integer.toString(8000), Store.YES, Index.NOT_ANALYZED));
writer.updateDocument(new Term("id", id10), newDoc);
IndexReader r2 = writer.getReader();
assertEquals(0, count(new Term("id", id10), r2));
assertEquals(1, count(new Term("id", Integer.toString(8000)), r2));
r1.close();
r2.close();
writer.close();
IndexReader r3 = IndexReader.open(dir1);
assertEquals(0, count(new Term("id", id10), r3));
assertEquals(1, count(new Term("id", Integer.toString(8000)), r3));
r3.close();
dir1.close();
}
/**
* Test using IW.addIndexes
*
* @throws Exception
*/
public void testAddIndexes() throws Exception {
boolean optimize = false;
Directory dir1 = new MockRAMDirectory();
IndexWriter writer = new IndexWriter(dir1, new WhitespaceAnalyzer(),
IndexWriter.MaxFieldLength.LIMITED);
writer.setInfoStream(infoStream);
// create the index
createIndexNoClose(!optimize, "index1", writer);
writer.flush(false, true, true);
// create a 2nd index
Directory dir2 = new MockRAMDirectory();
IndexWriter writer2 = new IndexWriter(dir2, new WhitespaceAnalyzer(),
IndexWriter.MaxFieldLength.LIMITED);
writer2.setInfoStream(infoStream);
createIndexNoClose(!optimize, "index2", writer2);
writer2.close();
writer.addIndexesNoOptimize(new Directory[] { dir2 });
IndexReader r1 = writer.getReader();
assertEquals(200, r1.maxDoc());
int index2df = r1.docFreq(new Term("indexname", "index2"));
assertEquals(100, index2df);
// verify the docs are from different indexes
Document doc5 = r1.document(5);
assertEquals("index1", doc5.get("indexname"));
Document doc150 = r1.document(150);
assertEquals("index2", doc150.get("indexname"));
r1.close();
writer.close();
dir1.close();
}
public void testAddIndexes2() throws Exception {
boolean optimize = false;
Directory dir1 = new MockRAMDirectory();
IndexWriter writer = new IndexWriter(dir1, new WhitespaceAnalyzer(),
IndexWriter.MaxFieldLength.LIMITED);
writer.setInfoStream(infoStream);
// create a 2nd index
Directory dir2 = new MockRAMDirectory();
IndexWriter writer2 = new IndexWriter(dir2, new WhitespaceAnalyzer(),
IndexWriter.MaxFieldLength.LIMITED);
writer2.setInfoStream(infoStream);
createIndexNoClose(!optimize, "index2", writer2);
writer2.close();
writer.addIndexesNoOptimize(new Directory[] { dir2 });
writer.addIndexesNoOptimize(new Directory[] { dir2 });
writer.addIndexesNoOptimize(new Directory[] { dir2 });
writer.addIndexesNoOptimize(new Directory[] { dir2 });
writer.addIndexesNoOptimize(new Directory[] { dir2 });
IndexReader r1 = writer.getReader();
assertEquals(500, r1.maxDoc());
r1.close();
writer.close();
dir1.close();
}
/**
* Deletes using IW.deleteDocuments
*
* @throws Exception
*/
public void testDeleteFromIndexWriter() throws Exception {
boolean optimize = true;
Directory dir1 = new MockRAMDirectory();
IndexWriter writer = new IndexWriter(dir1, new WhitespaceAnalyzer(),
IndexWriter.MaxFieldLength.LIMITED);
writer.setInfoStream(infoStream);
// create the index
createIndexNoClose(!optimize, "index1", writer);
writer.flush(false, true, true);
// get a reader
IndexReader r1 = writer.getReader();
String id10 = r1.document(10).getField("id").stringValue();
// deleted IW docs should not show up in the next getReader
writer.deleteDocuments(new Term("id", id10));
IndexReader r2 = writer.getReader();
assertEquals(1, count(new Term("id", id10), r1));
assertEquals(0, count(new Term("id", id10), r2));
String id50 = r1.document(50).getField("id").stringValue();
assertEquals(1, count(new Term("id", id50), r1));
writer.deleteDocuments(new Term("id", id50));
IndexReader r3 = writer.getReader();
assertEquals(0, count(new Term("id", id10), r3));
assertEquals(0, count(new Term("id", id50), r3));
String id75 = r1.document(75).getField("id").stringValue();
writer.deleteDocuments(new TermQuery(new Term("id", id75)));
IndexReader r4 = writer.getReader();
assertEquals(1, count(new Term("id", id75), r3));
assertEquals(0, count(new Term("id", id75), r4));
r1.close();
r2.close();
r3.close();
r4.close();
writer.close();
// reopen the writer to verify the delete made it to the directory
writer = new IndexWriter(dir1, new WhitespaceAnalyzer(),
IndexWriter.MaxFieldLength.LIMITED);
writer.setInfoStream(infoStream);
IndexReader w2r1 = writer.getReader();
assertEquals(0, count(new Term("id", id10), w2r1));
w2r1.close();
writer.close();
dir1.close();
}
public void testAddIndexesAndDoDeletesThreads() throws Throwable {
final int numIter = 5;
int numDirs = 3;
Directory mainDir = new MockRAMDirectory();
IndexWriter mainWriter = new IndexWriter(mainDir, new WhitespaceAnalyzer(),
IndexWriter.MaxFieldLength.LIMITED);
mainWriter.setInfoStream(infoStream);
AddDirectoriesThreads addDirThreads = new AddDirectoriesThreads(numIter, mainWriter);
addDirThreads.launchThreads(numDirs);
addDirThreads.joinThreads();
//assertEquals(100 + numDirs * (3 * numIter / 4) * addDirThreads.NUM_THREADS
// * addDirThreads.NUM_INIT_DOCS, addDirThreads.mainWriter.numDocs());
assertEquals(addDirThreads.count.intValue(), addDirThreads.mainWriter.numDocs());
addDirThreads.close(true);
assertTrue(addDirThreads.failures.size() == 0);
_TestUtil.checkIndex(mainDir);
IndexReader reader = IndexReader.open(mainDir);
assertEquals(addDirThreads.count.intValue(), reader.numDocs());
//assertEquals(100 + numDirs * (3 * numIter / 4) * addDirThreads.NUM_THREADS
// * addDirThreads.NUM_INIT_DOCS, reader.numDocs());
reader.close();
addDirThreads.closeDir();
mainDir.close();
}
private class DeleteThreads {
final static int NUM_THREADS = 5;
final Thread[] threads = new Thread[NUM_THREADS];
IndexWriter mainWriter;
AtomicInteger delCount = new AtomicInteger();
List deletedTerms = new ArrayList();
LinkedList toDeleteTerms = new LinkedList();
Random random;
final List failures = new ArrayList();
public DeleteThreads(IndexWriter mainWriter) throws IOException {
this.mainWriter = mainWriter;
IndexReader reader = mainWriter.getReader();
int maxDoc = reader.maxDoc();
random = newRandom();
int iter = random.nextInt(maxDoc);
for (int x=0; x < iter; x++) {
int doc = random.nextInt(iter);
String id = reader.document(doc).get("id");
toDeleteTerms.add(new Term("id", id));
}
}
Term getDeleteTerm() {
synchronized (toDeleteTerms) {
return (Term)toDeleteTerms.removeFirst();
}
}
void launchThreads(final int numIter) {
for (int i = 0; i < NUM_THREADS; i++) {
threads[i] = new Thread() {
public void run() {
try {
Term term = getDeleteTerm();
mainWriter.deleteDocuments(term);
synchronized (deletedTerms) {
deletedTerms.add(term);
}
} catch (Throwable t) {
handle(t);
}
}
};
}
}
void handle(Throwable t) {
t.printStackTrace(System.out);
synchronized (failures) {
failures.add(t);
}
}
void joinThreads() {
for (int i = 0; i < NUM_THREADS; i++)
try {
threads[i].join();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
}
}
private class AddDirectoriesThreads {
Directory addDir;
final static int NUM_THREADS = 5;
final static int NUM_INIT_DOCS = 100;
int numDirs;
final Thread[] threads = new Thread[NUM_THREADS];
IndexWriter mainWriter;
final List failures = new ArrayList();
IndexReader[] readers;
boolean didClose = false;
AtomicInteger count = new AtomicInteger(0);
AtomicInteger numAddIndexesNoOptimize = new AtomicInteger(0);
public AddDirectoriesThreads(int numDirs, IndexWriter mainWriter) throws Throwable {
this.numDirs = numDirs;
this.mainWriter = mainWriter;
addDir = new MockRAMDirectory();
IndexWriter writer = new IndexWriter(addDir, new WhitespaceAnalyzer(),
IndexWriter.MaxFieldLength.LIMITED);
writer.setMaxBufferedDocs(2);
for (int i = 0; i < NUM_INIT_DOCS; i++) {
Document doc = createDocument(i, "addindex", 4);
writer.addDocument(doc);
}
writer.close();
readers = new IndexReader[numDirs];
for (int i = 0; i < numDirs; i++)
readers[i] = IndexReader.open(addDir);
}
void joinThreads() {
for (int i = 0; i < NUM_THREADS; i++)
try {
threads[i].join();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
}
void close(boolean doWait) throws Throwable {
didClose = true;
mainWriter.close(doWait);
}
void closeDir() throws Throwable {
for (int i = 0; i < numDirs; i++)
readers[i].close();
addDir.close();
}
void handle(Throwable t) {
t.printStackTrace(System.out);
synchronized (failures) {
failures.add(t);
}
}
void launchThreads(final int numIter) {
for (int i = 0; i < NUM_THREADS; i++) {
threads[i] = new Thread() {
public void run() {
try {
final Directory[] dirs = new Directory[numDirs];
for (int k = 0; k < numDirs; k++)
dirs[k] = new MockRAMDirectory(addDir);
//int j = 0;
//while (true) {
// System.out.println(Thread.currentThread().getName() + ": iter
// j=" + j);
for (int x=0; x < numIter; x++) {
// only do addIndexesNoOptimize
doBody(x, dirs);
}
//if (numIter > 0 && j == numIter)
// break;
//doBody(j++, dirs);
//doBody(5, dirs);
//}
} catch (Throwable t) {
handle(t);
}
}
};
}
for (int i = 0; i < NUM_THREADS; i++)
threads[i].start();
}
void doBody(int j, Directory[] dirs) throws Throwable {
switch (j % 4) {
case 0:
mainWriter.addIndexes(dirs);
break;
case 1:
mainWriter.addIndexesNoOptimize(dirs);
numAddIndexesNoOptimize.incrementAndGet();
break;
case 2:
mainWriter.addIndexes(readers);
break;
case 3:
mainWriter.commit();
}
count.addAndGet(dirs.length*NUM_INIT_DOCS);
}
}
public void testIndexWriterReopenSegmentOptimize() throws Exception {
doTestIndexWriterReopenSegment(true);
}
public void testIndexWriterReopenSegment() throws Exception {
doTestIndexWriterReopenSegment(false);
}
/**
* Tests creating a segment, then check to insure the segment can be seen via
* IW.getReader
*/
public void doTestIndexWriterReopenSegment(boolean optimize) throws Exception {
Directory dir1 = new MockRAMDirectory();
IndexWriter writer = new IndexWriter(dir1, new WhitespaceAnalyzer(),
IndexWriter.MaxFieldLength.LIMITED);
writer.setInfoStream(infoStream);
DirectoryIndexReader r1 = (DirectoryIndexReader) writer.getReader();
assertEquals(0, r1.maxDoc());
createIndexNoClose(false, "index1", writer);
writer.flush(!optimize, true, true);
DirectoryIndexReader iwr1 = (DirectoryIndexReader) writer.getReader();
assertEquals(100, iwr1.maxDoc());
DirectoryIndexReader r2 = (DirectoryIndexReader) writer.getReader();
assertEquals(r2.maxDoc(), 100);
// add 100 documents
for (int x = 10000; x < 10000 + 100; x++) {
Document d = createDocument(x, "index1", 5);
writer.addDocument(d);
}
writer.flush(false, true, true);
// verify the reader was reopened internally
IndexReader iwr2 = writer.getReader();
assertTrue(iwr2 != r1);
assertEquals(200, iwr2.maxDoc());
// should have flushed out a segment
IndexReader r3 = writer.getReader();
assertTrue(r2 != r3);
assertEquals(200, r3.maxDoc());
// dec ref the readers rather than close them because
// closing flushes changes to the writer
r1.close();
iwr1.close();
r2.close();
r3.close();
iwr2.close();
writer.close();
// test whether the changes made it to the directory
writer = new IndexWriter(dir1, new WhitespaceAnalyzer(),
IndexWriter.MaxFieldLength.LIMITED);
IndexReader w2r1 = writer.getReader();
// insure the deletes were actually flushed to the directory
assertEquals(200, w2r1.maxDoc());
w2r1.close();
writer.close();
dir1.close();
}
public static Document createDocument(int n, String indexName, int numFields) {
StringBuffer sb = new StringBuffer();
Document doc = new Document();
doc.add(new Field("id", Integer.toString(n), Store.YES, Index.NOT_ANALYZED, TermVector.WITH_POSITIONS_OFFSETS));
doc.add(new Field("indexname", indexName, Store.YES, Index.NOT_ANALYZED, TermVector.WITH_POSITIONS_OFFSETS));
sb.append("a");
sb.append(n);
doc.add(new Field("field1", sb.toString(), Store.YES, Index.ANALYZED, TermVector.WITH_POSITIONS_OFFSETS));
sb.append(" b");
sb.append(n);
for (int i = 1; i < numFields; i++) {
doc.add(new Field("field" + (i + 1), sb.toString(), Store.YES,
Index.ANALYZED, TermVector.WITH_POSITIONS_OFFSETS));
}
return doc;
}
/**
* Delete a document by term and return the doc id
*
* @return
*
* public static int deleteDocument(Term term, IndexWriter writer) throws
* IOException { IndexReader reader = writer.getReader(); TermDocs td =
* reader.termDocs(term); int doc = -1; //if (td.next()) { // doc = td.doc();
* //} //writer.deleteDocuments(term); td.close(); return doc; }
*/
public static void createIndex(Directory dir1, String indexName,
boolean multiSegment) throws IOException {
IndexWriter w = new IndexWriter(dir1, new WhitespaceAnalyzer(),
IndexWriter.MaxFieldLength.LIMITED);
w.setMergePolicy(new LogDocMergePolicy());
for (int i = 0; i < 100; i++) {
w.addDocument(createDocument(i, indexName, 4));
if (multiSegment && (i % 10) == 0) {
}
}
if (!multiSegment) {
w.optimize();
}
w.close();
}
public static void createIndexNoClose(boolean multiSegment, String indexName,
IndexWriter w) throws IOException {
w.setMergePolicy(new LogDocMergePolicy());
for (int i = 0; i < 100; i++) {
w.addDocument(createDocument(i, indexName, 4));
}
if (!multiSegment) {
w.optimize();
}
}
private static class MyWarmer extends IndexWriter.IndexReaderWarmer {
int warmCount;
public void warm(IndexReader reader) throws IOException {
warmCount++;
}
}
public void testMergeWarmer() throws Exception {
Directory dir1 = new MockRAMDirectory();
IndexWriter writer = new IndexWriter(dir1, new WhitespaceAnalyzer(),
IndexWriter.MaxFieldLength.LIMITED);
writer.setInfoStream(infoStream);
// create the index
createIndexNoClose(false, "test", writer);
// get a reader to put writer into near real-time mode
IndexReader r1 = writer.getReader();
// Enroll warmer
MyWarmer warmer = new MyWarmer();
writer.setMergedSegmentWarmer(warmer);
writer.setMergeFactor(2);
writer.setMaxBufferedDocs(2);
for (int i = 0; i < 10; i++) {
writer.addDocument(createDocument(i, "test", 4));
}
((ConcurrentMergeScheduler) writer.getMergeScheduler()).sync();
assertTrue(warmer.warmCount > 0);
final int count = warmer.warmCount;
writer.addDocument(createDocument(17, "test", 4));
writer.optimize();
assertTrue(warmer.warmCount > count);
writer.close();
r1.close();
dir1.close();
}
public void testAfterCommit() throws Exception {
Directory dir1 = new MockRAMDirectory();
IndexWriter writer = new IndexWriter(dir1, new WhitespaceAnalyzer(),
IndexWriter.MaxFieldLength.LIMITED);
writer.setInfoStream(infoStream);
// create the index
createIndexNoClose(false, "test", writer);
// get a reader to put writer into near real-time mode
IndexReader r1 = writer.getReader();
_TestUtil.checkIndex(dir1);
writer.commit();
_TestUtil.checkIndex(dir1);
assertEquals(100, r1.numDocs());
for (int i = 0; i < 10; i++) {
writer.addDocument(createDocument(i, "test", 4));
}
((ConcurrentMergeScheduler) writer.getMergeScheduler()).sync();
IndexReader r2 = r1.reopen();
if (r2 != r1) {
r1.close();
r1 = r2;
}
assertEquals(110, r1.numDocs());
writer.close();
r1.close();
dir1.close();
}
// Make sure reader remains usable even if IndexWriter closes
public void testAfterClose() throws Exception {
Directory dir1 = new MockRAMDirectory();
IndexWriter writer = new IndexWriter(dir1, new WhitespaceAnalyzer(),
IndexWriter.MaxFieldLength.LIMITED);
writer.setInfoStream(infoStream);
// create the index
createIndexNoClose(false, "test", writer);
IndexReader r = writer.getReader();
writer.close();
_TestUtil.checkIndex(dir1);
// reader should remain usable even after IndexWriter is closed:
assertEquals(100, r.numDocs());
Query q = new TermQuery(new Term("indexname", "test"));
assertEquals(100, new IndexSearcher(r).search(q, 10).totalHits);
try {
r.reopen();
fail("failed to hit AlreadyClosedException");
} catch (AlreadyClosedException ace) {
// expected
}
r.close();
dir1.close();
}
// Stress test reopen during addIndexes
public void testDuringAddIndexes() throws Exception {
Directory dir1 = new MockRAMDirectory();
final IndexWriter writer = new IndexWriter(dir1, new WhitespaceAnalyzer(),
IndexWriter.MaxFieldLength.LIMITED);
writer.setInfoStream(infoStream);
writer.setMergeFactor(2);
// create the index
createIndexNoClose(false, "test", writer);
writer.commit();
final Directory[] dirs = new Directory[10];
for (int i=0;i<10;i++) {
dirs[i] = new MockRAMDirectory(dir1);
}
IndexReader r = writer.getReader();
final int NUM_THREAD = 5;
final float SECONDS = 3;
final long endTime = (long) (System.currentTimeMillis() + 1000.*SECONDS);
final List excs = Collections.synchronizedList(new ArrayList());
final Thread[] threads = new Thread[NUM_THREAD];
for(int i=0;i<NUM_THREAD;i++) {
threads[i] = new Thread() {
public void run() {
while(System.currentTimeMillis() < endTime) {
try {
writer.addIndexesNoOptimize(dirs);
} catch (Throwable t) {
excs.add(t);
throw new RuntimeException(t);
}
}
}
};
threads[i].setDaemon(true);
threads[i].start();
}
int lastCount = 0;
while(System.currentTimeMillis() < endTime) {
IndexReader r2 = r.reopen();
if (r2 != r) {
r.close();
r = r2;
}
Query q = new TermQuery(new Term("indexname", "test"));
final int count = new IndexSearcher(r).search(q, 10).totalHits;
assertTrue(count >= lastCount);
lastCount = count;
}
for(int i=0;i<NUM_THREAD;i++) {
threads[i].join();
}
assertEquals(0, excs.size());
writer.close();
_TestUtil.checkIndex(dir1);
r.close();
dir1.close();
}
// Stress test reopen during add/delete
public void testDuringAddDelete() throws Exception {
Directory dir1 = new MockRAMDirectory();
final IndexWriter writer = new IndexWriter(dir1, new WhitespaceAnalyzer(),
IndexWriter.MaxFieldLength.LIMITED);
writer.setInfoStream(infoStream);
writer.setMergeFactor(2);
// create the index
createIndexNoClose(false, "test", writer);
writer.commit();
IndexReader r = writer.getReader();
final int NUM_THREAD = 5;
final float SECONDS = 3;
final long endTime = (long) (System.currentTimeMillis() + 1000.*SECONDS);
final List excs = Collections.synchronizedList(new ArrayList());
final Thread[] threads = new Thread[NUM_THREAD];
for(int i=0;i<NUM_THREAD;i++) {
threads[i] = new Thread() {
public void run() {
int count = 0;
final Random r = new Random();
while(System.currentTimeMillis() < endTime) {
try {
for(int i=0;i<10;i++) {
writer.addDocument(createDocument(10*count+i, "test", 4));
}
count++;
final int limit = count*10;
for(int i=0;i<5;i++) {
int x = r.nextInt(limit);
writer.deleteDocuments(new Term("field3", "b"+x));
}
} catch (Throwable t) {
excs.add(t);
throw new RuntimeException(t);
}
}
}
};
threads[i].setDaemon(true);
threads[i].start();
}
int sum = 0;
while(System.currentTimeMillis() < endTime) {
IndexReader r2 = r.reopen();
if (r2 != r) {
r.close();
r = r2;
}
Query q = new TermQuery(new Term("indexname", "test"));
sum += new IndexSearcher(r).search(q, 10).totalHits;
}
for(int i=0;i<NUM_THREAD;i++) {
threads[i].join();
}
assertTrue(sum > 0);
assertEquals(0, excs.size());
writer.close();
_TestUtil.checkIndex(dir1);
r.close();
dir1.close();
}
}

View File

@ -52,6 +52,20 @@ public class TestStressIndexing2 extends LuceneTestCase {
} }
} }
public void testRandomIWReader() throws Throwable {
r = newRandom();
Directory dir = new MockRAMDirectory();
// TODO: verify equals using IW.getReader
DocsAndWriter dw = indexRandomIWReader(10, 100, 100, dir);
IndexReader r = dw.writer.getReader();
dw.writer.commit();
verifyEquals(r, dir, "id");
r.close();
dw.writer.close();
dir.close();
}
public void testRandom() throws Throwable { public void testRandom() throws Throwable {
r = newRandom(); r = newRandom();
Directory dir1 = new MockRAMDirectory(); Directory dir1 = new MockRAMDirectory();
@ -102,9 +116,13 @@ public class TestStressIndexing2 extends LuceneTestCase {
// indexing threads to test that IndexWriter does correctly synchronize // indexing threads to test that IndexWriter does correctly synchronize
// everything. // everything.
public Map indexRandom(int nThreads, int iterations, int range, Directory dir) throws IOException, InterruptedException { public static class DocsAndWriter {
Map docs;
IndexWriter writer;
}
public DocsAndWriter indexRandomIWReader(int nThreads, int iterations, int range, Directory dir) throws IOException, InterruptedException {
Map docs = new HashMap(); Map docs = new HashMap();
for(int iter=0;iter<3;iter++) {
IndexWriter w = new MockIndexWriter(dir, autoCommit, new WhitespaceAnalyzer(), true); IndexWriter w = new MockIndexWriter(dir, autoCommit, new WhitespaceAnalyzer(), true);
w.setUseCompoundFile(false); w.setUseCompoundFile(false);
@ -137,6 +155,51 @@ public class TestStressIndexing2 extends LuceneTestCase {
threads[i].join(); threads[i].join();
} }
// w.optimize();
//w.close();
for (int i=0; i<threads.length; i++) {
IndexingThread th = threads[i];
synchronized(th) {
docs.putAll(th.docs);
}
}
_TestUtil.checkIndex(dir);
DocsAndWriter dw = new DocsAndWriter();
dw.docs = docs;
dw.writer = w;
return dw;
}
public Map indexRandom(int nThreads, int iterations, int range, Directory dir) throws IOException, InterruptedException {
Map docs = new HashMap();
for(int iter=0;iter<3;iter++) {
IndexWriter w = new MockIndexWriter(dir, autoCommit, new WhitespaceAnalyzer(), true);
w.setUseCompoundFile(false);
// force many merges
w.setMergeFactor(mergeFactor);
w.setRAMBufferSizeMB(.1);
w.setMaxBufferedDocs(maxBufferedDocs);
threads = new IndexingThread[nThreads];
for (int i=0; i<threads.length; i++) {
IndexingThread th = new IndexingThread();
th.w = w;
th.base = 1000000*i;
th.range = range;
th.iterations = iterations;
threads[i] = th;
}
for (int i=0; i<threads.length; i++) {
threads[i].start();
}
for (int i=0; i<threads.length; i++) {
threads[i].join();
}
// w.optimize(); // w.optimize();
w.close(); w.close();
@ -178,6 +241,12 @@ public class TestStressIndexing2 extends LuceneTestCase {
w.close(); w.close();
} }
public static void verifyEquals(IndexReader r1, Directory dir2, String idField) throws Throwable {
IndexReader r2 = IndexReader.open(dir2);
verifyEquals(r1, r2, idField);
r2.close();
}
public static void verifyEquals(Directory dir1, Directory dir2, String idField) throws Throwable { public static void verifyEquals(Directory dir1, Directory dir2, String idField) throws Throwable {
IndexReader r1 = IndexReader.open(dir1); IndexReader r1 = IndexReader.open(dir1);
IndexReader r2 = IndexReader.open(dir2); IndexReader r2 = IndexReader.open(dir2);
@ -222,7 +291,14 @@ public class TestStressIndexing2 extends LuceneTestCase {
r2r1[id2] = id1; r2r1[id2] = id1;
// verify stored fields are equivalent // verify stored fields are equivalent
try {
verifyEquals(r1.document(id1), r2.document(id2)); verifyEquals(r1.document(id1), r2.document(id2));
} catch (Throwable t) {
System.out.println("FAILED id=" + term + " id1=" + id1 + " id2=" + id2 + " term="+ term);
System.out.println(" d1=" + r1.document(id1));
System.out.println(" d2=" + r2.document(id2));
throw t;
}
try { try {
// verify term vectors are equivalent // verify term vectors are equivalent

View File

@ -234,7 +234,7 @@ public class MockRAMDirectory extends RAMDirectory {
} }
} }
return new MockRAMOutputStream(this, file); return new MockRAMOutputStream(this, file, name);
} }
public IndexInput openInput(String name) throws IOException { public IndexInput openInput(String name) throws IOException {

View File

@ -29,13 +29,15 @@ import java.io.IOException;
public class MockRAMOutputStream extends RAMOutputStream { public class MockRAMOutputStream extends RAMOutputStream {
private MockRAMDirectory dir; private MockRAMDirectory dir;
private boolean first=true; private boolean first=true;
private final String name;
byte[] singleByte = new byte[1]; byte[] singleByte = new byte[1];
/** Construct an empty output buffer. */ /** Construct an empty output buffer. */
public MockRAMOutputStream(MockRAMDirectory dir, RAMFile f) { public MockRAMOutputStream(MockRAMDirectory dir, RAMFile f, String name) {
super(f); super(f);
this.dir = dir; this.dir = dir;
this.name = name;
} }
public void close() throws IOException { public void close() throws IOException {
@ -66,7 +68,7 @@ public class MockRAMOutputStream extends RAMOutputStream {
// If MockRAMDir crashed since we were opened, then // If MockRAMDir crashed since we were opened, then
// don't write anything: // don't write anything:
if (dir.crashed) if (dir.crashed)
throw new IOException("MockRAMDirectory was crashed"); throw new IOException("MockRAMDirectory was crashed; cannot write to " + name);
// Enforce disk full: // Enforce disk full:
if (dir.maxSize != 0 && freeSpace <= len) { if (dir.maxSize != 0 && freeSpace <= len) {
@ -84,7 +86,7 @@ public class MockRAMOutputStream extends RAMOutputStream {
if (realUsage > dir.maxUsedSize) { if (realUsage > dir.maxUsedSize) {
dir.maxUsedSize = realUsage; dir.maxUsedSize = realUsage;
} }
throw new IOException("fake disk full at " + dir.getRecomputedActualSizeInBytes() + " bytes"); throw new IOException("fake disk full at " + dir.getRecomputedActualSizeInBytes() + " bytes when writing " + name);
} else { } else {
super.writeBytes(b, offset, len); super.writeBytes(b, offset, len);
} }