LUCENE-1194: add IndexWriter.deleteDocuments(Query)

git-svn-id: https://svn.apache.org/repos/asf/lucene/java/trunk@634219 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael McCandless 2008-03-06 11:02:04 +00:00
parent 33bee3c60c
commit 5b113c8af6
12 changed files with 905 additions and 376 deletions

View File

@ -105,6 +105,11 @@ New features
close/re-open of IndexWriter while still protecting an open
snapshot (Tim Brennan via Mike McCandless)
9. LUCENE-1194: Added IndexWriter.deleteDocuments(Query) to delete
documents matching the specified query. Also added static unlock
and isLocked methods (deprecating the ones in IndexReader). (Mike
McCandless)
Optimizations
1. LUCENE-705: When building a compound file, use

View File

@ -23,6 +23,10 @@ import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Fieldable;
import org.apache.lucene.search.Similarity;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Scorer;
import org.apache.lucene.search.Weight;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.IndexInput;
@ -33,9 +37,11 @@ import java.io.IOException;
import java.io.PrintStream;
import java.io.Reader;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.HashMap;
import java.util.ArrayList;
import java.util.Map.Entry;
import java.text.NumberFormat;
import java.util.Collections;
@ -154,14 +160,109 @@ final class DocumentsWriter {
private PrintStream infoStream;
// This Hashmap buffers delete terms in ram before they
// are applied. The key is delete term; the value is
// number of buffered documents the term applies to.
private HashMap bufferedDeleteTerms = new HashMap();
private int numBufferedDeleteTerms = 0;
// Holds buffered deletes, by docID, term or query. We
// hold two instances of this class: one for the deletes
// prior to the last flush, the other for deletes after
// the last flush. This is so if we need to abort
// (discard all buffered docs) we can also discard the
// buffered deletes yet keep the deletes done during
// previously flushed segments.
private static class BufferedDeletes {
int numTerms;
HashMap terms = new HashMap();
HashMap queries = new HashMap();
List docIDs = new ArrayList();
// Currently used only for deleting a doc on hitting an non-aborting exception
private List bufferedDeleteDocIDs = new ArrayList();
private void update(BufferedDeletes in) {
numTerms += in.numTerms;
terms.putAll(in.terms);
queries.putAll(in.queries);
docIDs.addAll(in.docIDs);
in.terms.clear();
in.numTerms = 0;
in.queries.clear();
in.docIDs.clear();
}
void clear() {
terms.clear();
queries.clear();
docIDs.clear();
numTerms = 0;
}
boolean any() {
return terms.size() > 0 || docIDs.size() > 0 || queries.size() > 0;
}
// Remaps all buffered deletes based on a completed
// merge
synchronized void remap(MergeDocIDRemapper mapper,
SegmentInfos infos,
int[][] docMaps,
int[] delCounts,
MergePolicy.OneMerge merge,
int mergeDocCount) {
final HashMap newDeleteTerms;
// Remap delete-by-term
if (terms.size() > 0) {
newDeleteTerms = new HashMap();
Iterator iter = terms.entrySet().iterator();
while(iter.hasNext()) {
Entry entry = (Entry) iter.next();
Num num = (Num) entry.getValue();
newDeleteTerms.put(entry.getKey(),
new Num(mapper.remap(num.getNum())));
}
} else
newDeleteTerms = null;
// Remap delete-by-docID
final List newDeleteDocIDs;
if (docIDs.size() > 0) {
newDeleteDocIDs = new ArrayList(docIDs.size());
Iterator iter = docIDs.iterator();
while(iter.hasNext()) {
Integer num = (Integer) iter.next();
newDeleteDocIDs.add(new Integer(mapper.remap(num.intValue())));
}
} else
newDeleteDocIDs = null;
// Remap delete-by-query
final HashMap newDeleteQueries;
if (queries.size() > 0) {
newDeleteQueries = new HashMap(queries.size());
Iterator iter = queries.entrySet().iterator();
while(iter.hasNext()) {
Entry entry = (Entry) iter.next();
Integer num = (Integer) entry.getValue();
newDeleteQueries.put(entry.getKey(),
new Integer(mapper.remap(num.intValue())));
}
} else
newDeleteQueries = null;
if (newDeleteTerms != null)
terms = newDeleteTerms;
if (newDeleteDocIDs != null)
docIDs = newDeleteDocIDs;
if (newDeleteQueries != null)
queries = newDeleteQueries;
}
}
// Deletes done after the last flush; these are discarded
// on abort
private BufferedDeletes deletesInRAM = new BufferedDeletes();
// Deletes done before the last flush; these are still
// kept on abort
private BufferedDeletes deletesFlushed = new BufferedDeletes();
// The max number of delete terms that can be buffered before
// they must be flushed to disk.
@ -175,20 +276,29 @@ final class DocumentsWriter {
// non-zero we will flush by RAM usage instead.
private int maxBufferedDocs = IndexWriter.DEFAULT_MAX_BUFFERED_DOCS;
private int flushedDocCount; // How many docs already flushed to index
synchronized void updateFlushedDocCount(int n) {
flushedDocCount += n;
}
synchronized int getFlushedDocCount() {
return flushedDocCount;
}
synchronized void setFlushedDocCount(int n) {
flushedDocCount = n;
}
private boolean closed;
// Coarse estimates used to measure RAM usage of buffered deletes
private static int OBJECT_HEADER_BYTES = 8;
private static int OBJECT_POINTER_BYTES = 4; // TODO: should be 8 on 64-bit platform
private static int BYTES_PER_CHAR = 2;
private static int BYTES_PER_INT = 4;
private BufferedNorms[] norms = new BufferedNorms[0]; // Holds norms until we flush
DocumentsWriter(Directory directory, IndexWriter writer) throws IOException {
this.directory = directory;
this.writer = writer;
flushedDocCount = writer.docCount();
postingsFreeList = new Posting[0];
}
@ -357,9 +467,7 @@ final class DocumentsWriter {
try {
bufferedDeleteTerms.clear();
bufferedDeleteDocIDs.clear();
numBufferedDeleteTerms = 0;
deletesInRAM.clear();
try {
abortedFiles = files();
@ -547,6 +655,8 @@ final class DocumentsWriter {
newFiles.addAll(writeSegment());
flushedDocCount += docCount;
success = true;
} finally {
@ -2111,11 +2221,6 @@ final class DocumentsWriter {
resetPostingsData();
nextDocID = 0;
nextWriteDocID = 0;
numDocsInRAM = 0;
files = null;
// Maybe downsize postingsFreeList array
if (postingsFreeList.length > 1.5*postingsFreeCount) {
int newSize = postingsFreeList.length;
@ -2130,6 +2235,10 @@ final class DocumentsWriter {
return flushedFiles;
}
synchronized void pushDeletes() {
deletesFlushed.update(deletesInRAM);
}
/** Returns the name of the file with this extension, on
* the current segment we are working on. */
private String segmentFileName(String extension) {
@ -2428,15 +2537,7 @@ final class DocumentsWriter {
// Next, wait until my thread state is idle (in case
// it's shared with other threads) and for threads to
// not be paused nor a flush pending:
while(!closed && (!state.isIdle || pauseThreads != 0 || flushPending || abortCount > 0))
try {
wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
if (closed)
throw new AlreadyClosedException("this IndexWriter is closed");
waitReady(state);
if (segment == null)
segment = writer.newSegmentName();
@ -2529,55 +2630,72 @@ final class DocumentsWriter {
return state.doFlushAfter || timeToFlushDeletes();
}
// for testing
synchronized int getNumBufferedDeleteTerms() {
return numBufferedDeleteTerms;
return deletesInRAM.numTerms;
}
// for testing
synchronized HashMap getBufferedDeleteTerms() {
return bufferedDeleteTerms;
return deletesInRAM.terms;
}
synchronized List getBufferedDeleteDocIDs() {
return bufferedDeleteDocIDs;
/** Called whenever a merge has completed and the merged segments had deletions */
synchronized void remapDeletes(SegmentInfos infos, int[][] docMaps, int[] delCounts, MergePolicy.OneMerge merge, int mergeDocCount) {
if (docMaps == null)
// The merged segments had no deletes so docIDs did not change and we have nothing to do
return;
MergeDocIDRemapper mapper = new MergeDocIDRemapper(infos, docMaps, delCounts, merge, mergeDocCount);
deletesInRAM.remap(mapper, infos, docMaps, delCounts, merge, mergeDocCount);
deletesFlushed.remap(mapper, infos, docMaps, delCounts, merge, mergeDocCount);
flushedDocCount -= mapper.docShift;
}
// Reset buffered deletes.
synchronized void clearBufferedDeletes() throws IOException {
bufferedDeleteTerms.clear();
bufferedDeleteDocIDs.clear();
numBufferedDeleteTerms = 0;
if (numBytesUsed > 0)
resetPostingsData();
}
synchronized boolean bufferDeleteTerms(Term[] terms) throws IOException {
while(pauseThreads != 0 || flushPending)
synchronized private void waitReady(ThreadState state) {
while(!closed && ((state != null && !state.isIdle) || pauseThreads != 0 || flushPending || abortCount > 0))
try {
wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
if (closed)
throw new AlreadyClosedException("this IndexWriter is closed");
}
synchronized boolean bufferDeleteTerms(Term[] terms) throws IOException {
waitReady(null);
for (int i = 0; i < terms.length; i++)
addDeleteTerm(terms[i], numDocsInRAM);
return timeToFlushDeletes();
}
synchronized boolean bufferDeleteTerm(Term term) throws IOException {
while(pauseThreads != 0 || flushPending)
try {
wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
waitReady(null);
addDeleteTerm(term, numDocsInRAM);
return timeToFlushDeletes();
}
synchronized boolean bufferDeleteQueries(Query[] queries) throws IOException {
waitReady(null);
for (int i = 0; i < queries.length; i++)
addDeleteQuery(queries[i], numDocsInRAM);
return timeToFlushDeletes();
}
synchronized boolean bufferDeleteQuery(Query query) throws IOException {
waitReady(null);
addDeleteQuery(query, numDocsInRAM);
return timeToFlushDeletes();
}
synchronized boolean deletesFull() {
return maxBufferedDeleteTerms != IndexWriter.DISABLE_AUTO_FLUSH
&& ((deletesInRAM.numTerms + deletesInRAM.queries.size() + deletesInRAM.docIDs.size()) >= maxBufferedDeleteTerms);
}
synchronized private boolean timeToFlushDeletes() {
return (bufferIsFull
|| (maxBufferedDeleteTerms != IndexWriter.DISABLE_AUTO_FLUSH
&& numBufferedDeleteTerms >= maxBufferedDeleteTerms))
&& setFlushPending();
return (bufferIsFull || deletesFull()) && setFlushPending();
}
void setMaxBufferedDeleteTerms(int maxBufferedDeleteTerms) {
@ -2589,7 +2707,108 @@ final class DocumentsWriter {
}
synchronized boolean hasDeletes() {
return bufferedDeleteTerms.size() > 0 || bufferedDeleteDocIDs.size() > 0;
return deletesFlushed.any();
}
synchronized boolean applyDeletes(SegmentInfos infos) throws IOException {
if (!hasDeletes())
return false;
if (infoStream != null)
infoStream.println("apply " + deletesFlushed.numTerms + " buffered deleted terms and " +
deletesFlushed.docIDs.size() + " deleted docIDs and " +
deletesFlushed.queries.size() + " deleted queries on " +
+ infos.size() + " segments.");
final int infosEnd = infos.size();
int docStart = 0;
boolean any = false;
for (int i = 0; i < infosEnd; i++) {
IndexReader reader = SegmentReader.get(infos.info(i), false);
boolean success = false;
try {
any |= applyDeletes(reader, docStart);
docStart += reader.maxDoc();
success = true;
} finally {
if (reader != null) {
try {
if (success)
reader.doCommit();
} finally {
reader.doClose();
}
}
}
}
deletesFlushed.clear();
return any;
}
// Apply buffered delete terms, queries and docIDs to the
// provided reader
private final synchronized boolean applyDeletes(IndexReader reader, int docIDStart)
throws CorruptIndexException, IOException {
final int docEnd = docIDStart + reader.maxDoc();
boolean any = false;
// Delete by term
Iterator iter = deletesFlushed.terms.entrySet().iterator();
while (iter.hasNext()) {
Entry entry = (Entry) iter.next();
Term term = (Term) entry.getKey();
TermDocs docs = reader.termDocs(term);
if (docs != null) {
int limit = ((DocumentsWriter.Num) entry.getValue()).getNum();
try {
while (docs.next()) {
int docID = docs.doc();
if (docIDStart+docID >= limit)
break;
reader.deleteDocument(docID);
any = true;
}
} finally {
docs.close();
}
}
}
// Delete by docID
iter = deletesFlushed.docIDs.iterator();
while(iter.hasNext()) {
int docID = ((Integer) iter.next()).intValue();
if (docID >= docIDStart && docID < docEnd) {
reader.deleteDocument(docID-docIDStart);
any = true;
}
}
// Delete by query
IndexSearcher searcher = new IndexSearcher(reader);
iter = deletesFlushed.queries.entrySet().iterator();
while(iter.hasNext()) {
Entry entry = (Entry) iter.next();
Query query = (Query) entry.getKey();
int limit = ((Integer) entry.getValue()).intValue();
Weight weight = query.weight(searcher);
Scorer scorer = weight.scorer(reader);
while(scorer.next()) {
final int docID = scorer.doc();
if (docIDStart + docID >= limit)
break;
reader.deleteDocument(docID);
any = true;
}
}
searcher.close();
return any;
}
// Number of documents a delete term applies to.
@ -2621,27 +2840,23 @@ final class DocumentsWriter {
// delete term will be applied to those documents as well
// as the disk segments.
synchronized private void addDeleteTerm(Term term, int docCount) {
Num num = (Num) bufferedDeleteTerms.get(term);
if (num == null) {
bufferedDeleteTerms.put(term, new Num(docCount));
// This is coarse approximation of actual bytes used:
numBytesUsed += (term.field().length() + term.text().length()) * BYTES_PER_CHAR
+ 4 + 5 * OBJECT_HEADER_BYTES + 5 * OBJECT_POINTER_BYTES;
if (ramBufferSize != IndexWriter.DISABLE_AUTO_FLUSH
&& numBytesUsed > ramBufferSize) {
bufferIsFull = true;
}
} else {
num.setNum(docCount);
}
numBufferedDeleteTerms++;
Num num = (Num) deletesInRAM.terms.get(term);
final int docIDUpto = flushedDocCount + docCount;
if (num == null)
deletesInRAM.terms.put(term, new Num(docIDUpto));
else
num.setNum(docIDUpto);
deletesInRAM.numTerms++;
}
// Buffer a specific docID for deletion. Currently only
// used when we hit a exception when adding a document
synchronized private void addDeleteDocID(int docId) {
bufferedDeleteDocIDs.add(new Integer(docId));
numBytesUsed += OBJECT_HEADER_BYTES + BYTES_PER_INT + OBJECT_POINTER_BYTES;
synchronized private void addDeleteDocID(int docID) {
deletesInRAM.docIDs.add(new Integer(flushedDocCount+docID));
}
synchronized private void addDeleteQuery(Query query, int docID) {
deletesInRAM.queries.put(query, new Integer(flushedDocCount + docID));
}
/** Does the synchronized work to finish/flush the
@ -3132,6 +3347,7 @@ final class DocumentsWriter {
postingsAllocCount++;
}
}
assert numBytesUsed <= numBytesAlloc;
}
synchronized void recyclePostings(Posting[] postings, int numPostings) {
@ -3164,6 +3380,7 @@ final class DocumentsWriter {
b = (byte[]) freeByteBlocks.remove(size-1);
if (trackAllocations)
numBytesUsed += BYTE_BLOCK_SIZE;
assert numBytesUsed <= numBytesAlloc;
return b;
}
@ -3194,6 +3411,7 @@ final class DocumentsWriter {
} else
c = (char[]) freeCharBlocks.remove(size-1);
numBytesUsed += CHAR_BLOCK_SIZE * CHAR_NUM_BYTE;
assert numBytesUsed <= numBytesAlloc;
return c;
}

View File

@ -852,6 +852,7 @@ public abstract class IndexReader {
* currently locked.
* @param directory the directory to check for a lock
* @throws IOException if there is a low-level IO error
* @deprecated Please use {@link IndexWriter#isLocked(Directory)} instead
*/
public static boolean isLocked(Directory directory) throws IOException {
return
@ -863,6 +864,7 @@ public abstract class IndexReader {
* currently locked.
* @param directory the directory to check for a lock
* @throws IOException if there is a low-level IO error
* @deprecated Please use {@link IndexWriter#isLocked(String)} instead
*/
public static boolean isLocked(String directory) throws IOException {
Directory dir = FSDirectory.getDirectory(directory);
@ -877,6 +879,7 @@ public abstract class IndexReader {
* Caution: this should only be used by failure recovery code,
* when it is known that no other process nor thread is in fact
* currently accessing this index.
* @deprecated Please use {@link IndexWriter#unlock(Directory)} instead
*/
public static void unlock(Directory directory) throws IOException {
directory.makeLock(IndexWriter.WRITE_LOCK_NAME).release();

View File

@ -20,6 +20,7 @@ package org.apache.lucene.index;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.search.Similarity;
import org.apache.lucene.search.Query;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.Lock;
@ -39,7 +40,6 @@ import java.util.Set;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Iterator;
import java.util.Map.Entry;
/**
An <code>IndexWriter</code> creates and maintains an index.
@ -60,7 +60,9 @@ import java.util.Map.Entry;
<p>In either case, documents are added with <a
href="#addDocument(org.apache.lucene.document.Document)"><b>addDocument</b></a>
and removed with <a
href="#deleteDocuments(org.apache.lucene.index.Term)"><b>deleteDocuments</b></a>.
href="#deleteDocuments(org.apache.lucene.index.Term)"><b>deleteDocuments</b></a>
or <a
href="#deleteDocuments(org.apache.lucene.search.Query)"><b>deleteDocuments</b></a>.
A document can be updated with <a href="#updateDocument(org.apache.lucene.index.Term, org.apache.lucene.document.Document)"><b>updateDocument</b></a>
(which just deletes and then adds the entire document).
When finished adding, deleting and updating documents, <a href="#close()"><b>close</b></a> should be called.</p>
@ -75,9 +77,10 @@ import java.util.Map.Entry;
#setRAMBufferSizeMB}) or the number of added documents.
The default is to flush when RAM usage hits 16 MB. For
best indexing speed you should flush by RAM usage with a
large RAM buffer. You can also force a flush by calling
{@link #flush}. When a flush occurs, both pending deletes
and added documents are flushed to the index. A flush may
large RAM buffer. Note that flushing just moves the
internal buffered state in IndexWriter into the index, but
these changes are not visible to IndexReader until either
{@link #commit} or {@link #close} is called. A flush may
also trigger one or more segment merges which by default
run with a background thread so as not to block the
addDocument calls (see <a href="#mergePolicy">below</a>
@ -296,17 +299,18 @@ public class IndexWriter {
private Similarity similarity = Similarity.getDefault(); // how to normalize
private volatile boolean commitPending; // true if segmentInfos has changes not yet committed
private volatile long changeCount; // increments every a change is completed
private long lastCommitChangeCount; // last changeCount that was committed
private SegmentInfos rollbackSegmentInfos; // segmentInfos we will fallback to if the commit fails
private HashMap rollbackSegments;
private SegmentInfos localRollbackSegmentInfos; // segmentInfos we will fallback to if the commit fails
private boolean localAutoCommit; // saved autoCommit during local transaction
private int localFlushedDocCount; // saved docWriter.getFlushedDocCount during local transaction
private boolean autoCommit = true; // false if we should commit only on close
private SegmentInfos segmentInfos = new SegmentInfos(); // the segments
private int syncCount;
private int syncCountSaved = -1;
private DocumentsWriter docWriter;
private IndexFileDeleter deleter;
@ -334,11 +338,9 @@ public class IndexWriter {
private boolean stopMerges;
private int flushCount;
private int flushDeletesCount;
private double maxSyncPauseSeconds = DEFAULT_MAX_SYNC_PAUSE_SECONDS;
// Last (right most) SegmentInfo created by a merge
private SegmentInfo lastMergeInfo;
/**
* Used internally to throw an {@link
* AlreadyClosedException} if this IndexWriter has been
@ -1085,10 +1087,10 @@ public class IndexWriter {
if (create) {
// Clear the write lock in case it's leftover:
directory.clearLock(IndexWriter.WRITE_LOCK_NAME);
directory.clearLock(WRITE_LOCK_NAME);
}
Lock writeLock = directory.makeLock(IndexWriter.WRITE_LOCK_NAME);
Lock writeLock = directory.makeLock(WRITE_LOCK_NAME);
if (!writeLock.obtain(writeLockTimeout)) // obtain write lock
throw new LockObtainFailedException("Index locked for write: " + writeLock);
this.writeLock = writeLock; // save it
@ -1653,7 +1655,7 @@ public class IndexWriter {
// Only allow a new merge to be triggered if we are
// going to wait for merges:
flush(waitForMerges, true);
flush(waitForMerges, true, true);
mergePolicy.close();
@ -1662,9 +1664,9 @@ public class IndexWriter {
mergeScheduler.close();
if (infoStream != null)
message("now call final sync()");
message("now call final commit()");
sync(true, 0);
commit(true, 0);
if (infoStream != null)
message("at close: " + segString());
@ -1790,7 +1792,11 @@ public class IndexWriter {
/** Returns the number of documents currently in this index. */
public synchronized int docCount() {
ensureOpen();
int count = docWriter.getNumDocsInRAM();
int count;
if (docWriter != null)
count = docWriter.getNumDocsInRAM();
else
count = 0;
for (int i = 0; i < segmentInfos.size(); i++) {
SegmentInfo si = segmentInfos.info(i);
count += si.docCount;
@ -1798,6 +1804,16 @@ public class IndexWriter {
return count;
}
public synchronized boolean hasDeletions() throws IOException {
ensureOpen();
if (docWriter.hasDeletes())
return true;
for (int i = 0; i < segmentInfos.size(); i++)
if (segmentInfos.info(i).hasDeletions())
return true;
return false;
}
/**
* The maximum number of terms that will be indexed for a single field in a
* document. This limits the amount of memory required for indexing, so that
@ -1893,7 +1909,7 @@ public class IndexWriter {
}
}
if (doFlush)
flush(true, false);
flush(true, false, false);
} catch (OutOfMemoryError oom) {
hitOOM = true;
throw oom;
@ -1911,7 +1927,7 @@ public class IndexWriter {
try {
boolean doFlush = docWriter.bufferDeleteTerm(term);
if (doFlush)
flush(true, false);
flush(true, false, false);
} catch (OutOfMemoryError oom) {
hitOOM = true;
throw oom;
@ -1931,13 +1947,41 @@ public class IndexWriter {
try {
boolean doFlush = docWriter.bufferDeleteTerms(terms);
if (doFlush)
flush(true, false);
flush(true, false, false);
} catch (OutOfMemoryError oom) {
hitOOM = true;
throw oom;
}
}
/**
* Deletes the document(s) matching the provided query.
* @param query the query to identify the documents to be deleted
* @throws CorruptIndexException if the index is corrupt
* @throws IOException if there is a low-level IO error
*/
public void deleteDocuments(Query query) throws CorruptIndexException, IOException {
ensureOpen();
boolean doFlush = docWriter.bufferDeleteQuery(query);
if (doFlush)
flush(true, false, false);
}
/**
* Deletes the document(s) matching any of the provided queries.
* All deletes are flushed at the same time.
* @param queries array of queries to identify the documents
* to be deleted
* @throws CorruptIndexException if the index is corrupt
* @throws IOException if there is a low-level IO error
*/
public void deleteDocuments(Query[] queries) throws CorruptIndexException, IOException {
ensureOpen();
boolean doFlush = docWriter.bufferDeleteQueries(queries);
if (doFlush)
flush(true, false, false);
}
/**
* Updates a document by first deleting the document(s)
* containing <code>term</code> and then adding the new
@ -1993,7 +2037,7 @@ public class IndexWriter {
}
}
if (doFlush)
flush(true, false);
flush(true, false, false);
} catch (OutOfMemoryError oom) {
hitOOM = true;
throw oom;
@ -2024,16 +2068,21 @@ public class IndexWriter {
return flushCount;
}
// for test purpose
final synchronized int getFlushDeletesCount() {
return flushDeletesCount;
}
final String newSegmentName() {
// Cannot synchronize on IndexWriter because that causes
// deadlock
synchronized(segmentInfos) {
// Important to set commitPending so that the
// Important to increment changeCount so that the
// segmentInfos is written on close. Otherwise we
// could close, re-open and re-return the same segment
// name that was previously returned which can cause
// problems at least with ConcurrentMergeScheduler.
commitPending = true;
changeCount++;
return "_" + Integer.toString(segmentInfos.counter++, Character.MAX_RADIX);
}
}
@ -2158,7 +2207,7 @@ public class IndexWriter {
if (infoStream != null)
message("optimize: index now " + segString());
flush(true, false);
flush(true, false, true);
synchronized(this) {
resetMergeExceptions();
@ -2408,13 +2457,14 @@ public class IndexWriter {
localRollbackSegmentInfos = (SegmentInfos) segmentInfos.clone();
localAutoCommit = autoCommit;
localFlushedDocCount = docWriter.getFlushedDocCount();
if (localAutoCommit) {
if (infoStream != null)
message("flush at startTransaction");
flush(true, false);
flush(true, false, false);
// Turn off auto-commit during our local transaction:
autoCommit = false;
@ -2435,6 +2485,7 @@ public class IndexWriter {
// First restore autoCommit in case we hit an exception below:
autoCommit = localAutoCommit;
docWriter.setFlushedDocCount(localFlushedDocCount);
// Keep the same segmentInfos instance but replace all
// of its SegmentInfo instances. This is so the next
@ -2454,7 +2505,6 @@ public class IndexWriter {
deleter.refresh();
finishMerges(false);
lastMergeInfo = null;
stopMerges = false;
}
@ -2477,7 +2527,7 @@ public class IndexWriter {
if (autoCommit) {
boolean success = false;
try {
sync(true, 0);
commit(true, 0);
success = true;
} finally {
if (!success) {
@ -2524,9 +2574,9 @@ public class IndexWriter {
finishMerges(false);
// Must pre-close these two, in case they set
// commitPending=true, so that we can then set it to
// false before calling closeInternal
// Must pre-close these two, in case they increment
// changeCount so that we can then set it to false
// before calling closeInternal
mergePolicy.close();
mergeScheduler.close();
@ -2547,7 +2597,7 @@ public class IndexWriter {
deleter.refresh();
}
commitPending = false;
lastCommitChangeCount = changeCount;
closeInternal(false);
} else
waitForClose();
@ -2614,7 +2664,7 @@ public class IndexWriter {
* index directory.
*/
private synchronized void checkpoint() throws IOException {
commitPending = true;
changeCount++;
deleter.checkpoint(segmentInfos, false);
}
@ -2677,21 +2727,27 @@ public class IndexWriter {
try {
if (infoStream != null)
message("flush at addIndexes");
flush(true, false);
flush(true, false, true);
boolean success = false;
startTransaction();
try {
int docCount = 0;
for (int i = 0; i < dirs.length; i++) {
SegmentInfos sis = new SegmentInfos(); // read infos from dir
sis.read(dirs[i]);
for (int j = 0; j < sis.size(); j++) {
segmentInfos.addElement(sis.info(j)); // add each info
final SegmentInfo info = sis.info(j);
docCount += info.docCount;
segmentInfos.addElement(info); // add each info
}
}
// Notify DocumentsWriter that the flushed count just increased
docWriter.updateFlushedDocCount(docCount);
optimize();
success = true;
@ -2745,7 +2801,7 @@ public class IndexWriter {
try {
if (infoStream != null)
message("flush at addIndexesNoOptimize");
flush(true, false);
flush(true, false, true);
boolean success = false;
@ -2753,6 +2809,7 @@ public class IndexWriter {
try {
int docCount = 0;
for (int i = 0; i < dirs.length; i++) {
if (directory == dirs[i]) {
// cannot add this index: segments may be deleted in merge before added
@ -2763,10 +2820,14 @@ public class IndexWriter {
sis.read(dirs[i]);
for (int j = 0; j < sis.size(); j++) {
SegmentInfo info = sis.info(j);
docCount += info.docCount;
segmentInfos.addElement(info); // add each info
}
}
// Notify DocumentsWriter that the flushed count just increased
docWriter.updateFlushedDocCount(docCount);
maybeMerge();
// If after merging there remain segments in the index
@ -2869,6 +2930,9 @@ public class IndexWriter {
-1, null, false);
segmentInfos.addElement(info);
// Notify DocumentsWriter that the flushed count just increased
docWriter.updateFlushedDocCount(docCount);
success = true;
} finally {
@ -2931,7 +2995,7 @@ public class IndexWriter {
* @deprecated please call {@link #commit}) instead
*/
public final void flush() throws CorruptIndexException, IOException {
flush(true, false);
flush(true, false, true);
}
/**
@ -2960,8 +3024,8 @@ public class IndexWriter {
}
private final void commit(boolean triggerMerges) throws CorruptIndexException, IOException {
flush(triggerMerges, true);
sync(true, 0);
flush(triggerMerges, true, true);
commit(true, 0);
}
/**
@ -2971,23 +3035,35 @@ public class IndexWriter {
* deletes or docs were flushed) if necessary
* @param flushDocStores if false we are allowed to keep
* doc stores open to share with the next segment
* @param flushDeletes whether pending deletes should also
* be flushed
*/
protected final void flush(boolean triggerMerge, boolean flushDocStores) throws CorruptIndexException, IOException {
protected final void flush(boolean triggerMerge, boolean flushDocStores, boolean flushDeletes) throws CorruptIndexException, IOException {
ensureOpen();
if (doFlush(flushDocStores) && triggerMerge)
if (doFlush(flushDocStores, flushDeletes) && triggerMerge)
maybeMerge();
}
// TODO: this method should not have to be entirely
// synchronized, ie, merges should be allowed to commit
// even while a flush is happening
private synchronized final boolean doFlush(boolean flushDocStores) throws CorruptIndexException, IOException {
private synchronized final boolean doFlush(boolean flushDocStores, boolean flushDeletes) throws CorruptIndexException, IOException {
// Make sure no threads are actively adding a document
assert testPoint("startDoFlush");
flushCount++;
flushDeletes |= docWriter.deletesFull();
// When autoCommit=true we must always flush deletes
// when flushing a segment; otherwise deletes may become
// visible before their corresponding added document
// from an updateDocument call
if (autoCommit)
flushDeletes = true;
// Returns true if docWriter is currently aborting, in
// which case we skip flushing this segment
if (docWriter.pauseAllThreads()) {
@ -3011,15 +3087,6 @@ public class IndexWriter {
if (docStoreSegment == null)
flushDocStores = false;
// Always flush deletes if there are any delete terms.
// TODO: when autoCommit=false we don't have to flush
// deletes with every flushed segment; we can save
// CPU/IO by buffering longer & flushing deletes only
// when they are full or writer is being closed. We
// have to fix the "applyDeletesSelectively" logic to
// apply to more than just the last flushed segment
boolean flushDeletes = docWriter.hasDeletes();
int docStoreOffset = docWriter.getDocStoreOffset();
// docStoreOffset should only be non-zero when
@ -3095,54 +3162,17 @@ public class IndexWriter {
docStoreIsCompoundFile);
}
if (flushDeletes) {
try {
SegmentInfos rollback = (SegmentInfos) segmentInfos.clone();
boolean success = false;
try {
// we should be able to change this so we can
// buffer deletes longer and then flush them to
// multiple flushed segments only when a commit()
// finally happens
applyDeletes(newSegment);
success = true;
} finally {
if (!success) {
if (infoStream != null)
message("hit exception flushing deletes");
// Carefully remove any partially written .del
// files
final int size = rollback.size();
for(int i=0;i<size;i++) {
final String newDelFileName = segmentInfos.info(i).getDelFileName();
final String delFileName = rollback.info(i).getDelFileName();
if (newDelFileName != null && !newDelFileName.equals(delFileName))
deleter.deleteFile(newDelFileName);
}
// Remove just flushed segment
deleter.refresh(segment);
// Fully replace the segmentInfos since flushed
// deletes could have changed any of the
// SegmentInfo instances:
segmentInfos.clear();
segmentInfos.addAll(rollback);
}
}
} finally {
// Regardless of success of failure in flushing
// deletes, we must clear them from our buffer:
docWriter.clearBufferedDeletes();
}
}
docWriter.pushDeletes();
if (flushDocs)
segmentInfos.addElement(newSegment);
if (flushDocs || flushDeletes)
if (flushDeletes) {
flushDeletesCount++;
applyDeletes();
}
if (flushDocs)
checkpoint();
doAfterFlush();
@ -3165,7 +3195,7 @@ public class IndexWriter {
checkpoint();
}
return flushDocs || flushDeletes;
return flushDocs;
} catch (OutOfMemoryError oom) {
hitOOM = true;
@ -3226,11 +3256,14 @@ public class IndexWriter {
* delete generation for merge.info). If no deletes were
* flushed, no new deletes file is saved. */
synchronized private void commitMergedDeletes(MergePolicy.OneMerge merge) throws IOException {
assert testPoint("startCommitMergeDeletes");
final SegmentInfos sourceSegmentsClone = merge.segmentsClone;
final SegmentInfos sourceSegments = merge.segments;
if (infoStream != null)
message("commitMerge " + merge.segString(directory));
message("commitMergeDeletes " + merge.segString(directory));
// Carefully merge deletes that occurred after we
// started merging:
@ -3301,12 +3334,15 @@ public class IndexWriter {
if (deletes != null) {
merge.info.advanceDelGen();
message("commit merge deletes to " + merge.info.getDelFileName());
deletes.write(directory, merge.info.getDelFileName());
}
}
/* FIXME if we want to support non-contiguous segment merges */
synchronized private boolean commitMerge(MergePolicy.OneMerge merge) throws IOException {
synchronized private boolean commitMerge(MergePolicy.OneMerge merge, SegmentMerger merger, int mergedDocCount) throws IOException {
assert testPoint("startCommitMerge");
if (hitOOM)
return false;
@ -3334,6 +3370,8 @@ public class IndexWriter {
commitMergedDeletes(merge);
docWriter.remapDeletes(segmentInfos, merger.getDocMaps(), merger.getDelCounts(), merge, mergedDocCount);
// Simple optimization: if the doc store we are using
// has been closed and is in now compound format (but
// wasn't when we started), then we will switch to the
@ -3355,11 +3393,6 @@ public class IndexWriter {
segmentInfos.subList(start, start + merge.segments.size()).clear();
segmentInfos.add(start, merge.info);
if (lastMergeInfo == null || segmentInfos.indexOf(lastMergeInfo) < start)
lastMergeInfo = merge.info;
if (merge.optimize)
segmentsToOptimize.add(merge.info);
// Must checkpoint before decrefing so any newly
// referenced files in the new merge.info are incref'd
@ -3368,6 +3401,8 @@ public class IndexWriter {
decrefMergeSegments(merge);
if (merge.optimize)
segmentsToOptimize.add(merge.info);
return true;
}
@ -3497,6 +3532,8 @@ public class IndexWriter {
* the synchronized lock on IndexWriter instance. */
final synchronized void mergeInit(MergePolicy.OneMerge merge) throws IOException {
assert testPoint("startMergeInit");
assert merge.registerDone;
assert !merge.optimize || merge.maxNumSegmentsOptimize > 0;
@ -3507,11 +3544,15 @@ public class IndexWriter {
if (merge.isAborted())
return;
boolean changed = applyDeletes();
// If autoCommit == true then all deletes should have
// been flushed when we flushed the last segment
assert !changed || !autoCommit;
final SegmentInfos sourceSegments = merge.segments;
final int end = sourceSegments.size();
ensureContiguousMerge(merge);
// Check whether this merge will allow us to skip
// merging the doc stores (stored field & vectors).
// This is a very substantial optimization (saves tons
@ -3598,7 +3639,7 @@ public class IndexWriter {
// make compound file out of them...
if (infoStream != null)
message("flush at merge");
flush(false, true);
flush(false, true, false);
}
// We must take a full copy at this point so that we can
@ -3746,7 +3787,7 @@ public class IndexWriter {
}
}
if (!commitMerge(merge))
if (!commitMerge(merge, merger, mergedDocCount))
// commitMerge will return false if this merge was aborted
return 0;
@ -3759,7 +3800,7 @@ public class IndexWriter {
synchronized(this) {
size = merge.info.sizeInBytes();
}
sync(false, size);
commit(false, size);
}
success = false;
@ -3812,7 +3853,7 @@ public class IndexWriter {
synchronized(this) {
size = merge.info.sizeInBytes();
}
sync(false, size);
commit(false, size);
}
return mergedDocCount;
@ -3823,63 +3864,41 @@ public class IndexWriter {
mergeExceptions.add(merge);
}
// Called during flush to apply any buffered deletes. If
// flushedNewSegment is true then a new segment was just
// created and flushed from the ram segments, so we will
// selectively apply the deletes to that new segment.
private final void applyDeletes(SegmentInfo newSegment) throws CorruptIndexException, IOException {
final HashMap bufferedDeleteTerms = docWriter.getBufferedDeleteTerms();
final List bufferedDeleteDocIDs = docWriter.getBufferedDeleteDocIDs();
// Apply buffered deletes to all segments.
private final synchronized boolean applyDeletes() throws CorruptIndexException, IOException {
assert testPoint("startApplyDeletes");
SegmentInfos rollback = (SegmentInfos) segmentInfos.clone();
boolean success = false;
boolean changed;
try {
changed = docWriter.applyDeletes(segmentInfos);
success = true;
} finally {
if (!success) {
if (infoStream != null)
message("flush " + docWriter.getNumBufferedDeleteTerms() + " buffered deleted terms and " +
bufferedDeleteDocIDs.size() + " deleted docIDs on "
+ segmentInfos.size() + " segments.");
message("hit exception flushing deletes");
if (newSegment != null) {
IndexReader reader = null;
try {
// Open readers w/o opening the stored fields /
// vectors because these files may still be held
// open for writing by docWriter
reader = SegmentReader.get(newSegment, false);
// Carefully remove any partially written .del
// files
final int size = rollback.size();
for(int i=0;i<size;i++) {
final String newDelFileName = segmentInfos.info(i).getDelFileName();
final String delFileName = rollback.info(i).getDelFileName();
if (newDelFileName != null && !newDelFileName.equals(delFileName))
deleter.deleteFile(newDelFileName);
}
// Apply delete terms to the segment just flushed from ram
// apply appropriately so that a delete term is only applied to
// the documents buffered before it, not those buffered after it.
applyDeletesSelectively(bufferedDeleteTerms, bufferedDeleteDocIDs, reader);
} finally {
if (reader != null) {
try {
reader.doCommit();
} finally {
reader.doClose();
}
}
// Fully replace the segmentInfos since flushed
// deletes could have changed any of the
// SegmentInfo instances:
segmentInfos.clear();
segmentInfos.addAll(rollback);
}
}
final int infosEnd = segmentInfos.size();
for (int i = 0; i < infosEnd; i++) {
IndexReader reader = null;
try {
reader = SegmentReader.get(segmentInfos.info(i), false);
// Apply delete terms to disk segments
// except the one just flushed from ram.
applyDeletes(bufferedDeleteTerms, reader);
} finally {
if (reader != null) {
try {
reader.doCommit();
} finally {
reader.doClose();
}
}
}
}
if (changed)
checkpoint();
return changed;
}
// For test purposes.
@ -3892,65 +3911,24 @@ public class IndexWriter {
return docWriter.getNumBufferedDeleteTerms();
}
// Apply buffered delete terms to the segment just flushed from ram
// apply appropriately so that a delete term is only applied to
// the documents buffered before it, not those buffered after it.
private final void applyDeletesSelectively(HashMap deleteTerms, List deleteIds,
IndexReader reader)
throws CorruptIndexException, IOException {
Iterator iter = deleteTerms.entrySet().iterator();
while (iter.hasNext()) {
Entry entry = (Entry) iter.next();
Term term = (Term) entry.getKey();
TermDocs docs = reader.termDocs(term);
if (docs != null) {
int num = ((DocumentsWriter.Num) entry.getValue()).getNum();
try {
while (docs.next()) {
int doc = docs.doc();
if (doc >= num) {
break;
}
reader.deleteDocument(doc);
}
} finally {
docs.close();
}
}
}
if (deleteIds.size() > 0) {
iter = deleteIds.iterator();
while(iter.hasNext())
reader.deleteDocument(((Integer) iter.next()).intValue());
}
}
// Apply buffered delete terms to this reader.
private final void applyDeletes(HashMap deleteTerms, IndexReader reader)
throws CorruptIndexException, IOException {
Iterator iter = deleteTerms.entrySet().iterator();
while (iter.hasNext()) {
Entry entry = (Entry) iter.next();
reader.deleteDocuments((Term) entry.getKey());
}
}
// utility routines for tests
SegmentInfo newestSegment() {
return segmentInfos.info(segmentInfos.size()-1);
}
public synchronized String segString() {
return segString(segmentInfos);
}
private synchronized String segString(SegmentInfos infos) {
StringBuffer buffer = new StringBuffer();
for(int i = 0; i < segmentInfos.size(); i++) {
final int count = infos.size();
for(int i = 0; i < count; i++) {
if (i > 0) {
buffer.append(' ');
}
buffer.append(segmentInfos.info(i).segString(directory));
buffer.append(infos.info(i).segString(directory));
}
return buffer.toString();
}
@ -4042,68 +4020,49 @@ public class IndexWriter {
* sync each file, if it wasn't already. If that
* succeeds, then we write a new segments_N file & sync
* that. */
private void sync(boolean includeFlushes, long sizeInBytes) throws IOException {
private void commit(boolean skipWait, long sizeInBytes) throws IOException {
assert testPoint("startCommit");
if (hitOOM)
return;
try {
message("start sync() includeFlushes=" + includeFlushes);
if (infoStream != null)
message("start commit() skipWait=" + skipWait + " sizeInBytes=" + sizeInBytes);
if (!includeFlushes)
if (!skipWait)
syncPause(sizeInBytes);
SegmentInfos toSync = null;
final long myChangeCount;
synchronized(this) {
assert lastCommitChangeCount <= changeCount;
if (changeCount == lastCommitChangeCount) {
if (infoStream != null)
message(" skip commit(): no changes pending");
return;
}
// First, we clone & incref the segmentInfos we intend
// to sync, then, without locking, we sync() each file
// referenced by toSync, in the background. Multiple
// threads can be doing this at once, if say a large
// merge and a small merge finish at the same time:
SegmentInfos toSync = null;
final int mySyncCount;
synchronized(this) {
if (!commitPending) {
message(" skip sync(): no commit pending");
return;
}
// Create the segmentInfos we want to sync, by copying
// the current one and possibly removing flushed
// segments:
toSync = (SegmentInfos) segmentInfos.clone();
final int numSegmentsToSync = toSync.size();
boolean newCommitPending = false;
if (!includeFlushes) {
// Do not sync flushes:
assert lastMergeInfo != null;
assert toSync.contains(lastMergeInfo);
int downTo = numSegmentsToSync-1;
while(!toSync.info(downTo).equals(lastMergeInfo)) {
message(" skip segment " + toSync.info(downTo).name);
toSync.remove(downTo);
downTo--;
newCommitPending = true;
}
} else if (numSegmentsToSync > 0)
// Force all subsequent syncs to include up through
// the final info in the current segments. This
// ensure that a call to commit() will force another
// sync (due to merge finishing) to sync all flushed
// segments as well:
lastMergeInfo = toSync.info(numSegmentsToSync-1);
mySyncCount = syncCount++;
deleter.incRef(toSync, false);
commitPending = newCommitPending;
myChangeCount = changeCount;
}
boolean success0 = false;
if (infoStream != null)
message("commit index=" + segString(toSync));
assert testPoint("midCommit");
try {
@ -4143,12 +4102,14 @@ public class IndexWriter {
break;
}
assert testPoint("midCommit2");
synchronized(this) {
// If someone saved a newer version of segments file
// since I first started syncing my version, I can
// safely skip saving myself since I've been
// superseded:
if (mySyncCount > syncCountSaved) {
if (myChangeCount > lastCommitChangeCount) {
if (segmentInfos.getGeneration() > toSync.getGeneration())
toSync.updateGeneration(segmentInfos);
@ -4161,14 +4122,13 @@ public class IndexWriter {
// Have our master segmentInfos record the
// generations we just sync'd
segmentInfos.updateGeneration(toSync);
if (!success) {
commitPending = true;
if (!success)
message("hit exception committing segments file");
}
}
message("commit complete");
syncCountSaved = mySyncCount;
lastCommitChangeCount = myChangeCount;
deleter.checkpoint(toSync, true);
setRollbackSegmentInfos();
@ -4178,19 +4138,54 @@ public class IndexWriter {
message("done all syncs");
success0 = true;
assert testPoint("midCommitSuccess");
} finally {
synchronized(this) {
deleter.decRef(toSync);
if (!success0)
commitPending = true;
}
}
} catch (OutOfMemoryError oom) {
hitOOM = true;
throw oom;
}
assert testPoint("finishCommit");
}
/**
* Returns <code>true</code> iff the index in the named directory is
* currently locked.
* @param directory the directory to check for a lock
* @throws IOException if there is a low-level IO error
*/
public static boolean isLocked(Directory directory) throws IOException {
return directory.makeLock(WRITE_LOCK_NAME).isLocked();
}
/**
* Returns <code>true</code> iff the index in the named directory is
* currently locked.
* @param directory the directory to check for a lock
* @throws IOException if there is a low-level IO error
*/
public static boolean isLocked(String directory) throws IOException {
Directory dir = FSDirectory.getDirectory(directory);
try {
return isLocked(dir);
} finally {
dir.close();
}
}
/**
* Forcibly unlocks the index in the named directory.
* <P>
* Caution: this should only be used by failure recovery code,
* when it is known that no other process nor thread is in fact
* currently accessing this index.
*/
public static void unlock(Directory directory) throws IOException {
directory.makeLock(IndexWriter.WRITE_LOCK_NAME).release();
}
/**
@ -4245,7 +4240,17 @@ public class IndexWriter {
}
// Used only by assert for testing. Current points:
// "DocumentsWriter.ThreadState.init start"
// startDoFlush
// startCommitMerge
// startCommit
// midCommit
// midCommit2
// midCommitSuccess
// finishCommit
// startCommitMergeDeletes
// startMergeInit
// startApplyDeletes
// DocumentsWriter.ThreadState.init start
boolean testPoint(String name) {
return true;
}

View File

@ -0,0 +1,110 @@
package org.apache.lucene.index;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/** Remaps docIDs after a merge has completed, where the
* merged segments had at least one deletion. This is used
* to renumber the buffered deletes in IndexWriter when a
* merge of segments with deletions commits. */
final class MergeDocIDRemapper {
int[] starts; // used for binary search of mapped docID
int[] newStarts; // starts, minus the deletes
int[][] docMaps; // maps docIDs in the merged set
int minDocID; // minimum docID that needs renumbering
int maxDocID; // 1+ the max docID that needs renumbering
int docShift; // total # deleted docs that were compacted by this merge
public MergeDocIDRemapper(SegmentInfos infos, int[][] docMaps, int[] delCounts, MergePolicy.OneMerge merge, int mergedDocCount) {
this.docMaps = docMaps;
SegmentInfo firstSegment = merge.segments.info(0);
int i = 0;
while(true) {
SegmentInfo info = infos.info(i);
if (info.equals(firstSegment))
break;
minDocID += info.docCount;
i++;
}
int numDocs = 0;
for(int j=0;j<docMaps.length;i++,j++) {
numDocs += infos.info(i).docCount;
assert infos.info(i).equals(merge.segments.info(j));
}
maxDocID = minDocID + numDocs;
starts = new int[docMaps.length];
newStarts = new int[docMaps.length];
starts[0] = minDocID;
newStarts[0] = minDocID;
for(i=1;i<docMaps.length;i++) {
final int lastDocCount = merge.segments.info(i-1).docCount;
starts[i] = starts[i-1] + lastDocCount;
newStarts[i] = newStarts[i-1] + lastDocCount - delCounts[i-1];
}
docShift = numDocs - mergedDocCount;
// There are rare cases when docShift is 0. It happens
// if you try to delete a docID that's out of bounds,
// because the SegmentReader still allocates deletedDocs
// and pretends it has deletions ... so we can't make
// this assert here
// assert docShift > 0;
// Make sure it all adds up:
assert docShift == maxDocID - (newStarts[docMaps.length-1] + merge.segments.info(docMaps.length-1).docCount - delCounts[docMaps.length-1]);
}
public int remap(int oldDocID) {
if (oldDocID < minDocID)
// Unaffected by merge
return oldDocID;
else if (oldDocID >= maxDocID)
// This doc was "after" the merge, so simple shift
return oldDocID - docShift;
else {
// Binary search to locate this document & find its new docID
int lo = 0; // search starts array
int hi = docMaps.length - 1; // for first element less
while (hi >= lo) {
int mid = (lo + hi) >> 1;
int midValue = starts[mid];
if (oldDocID < midValue)
hi = mid - 1;
else if (oldDocID > midValue)
lo = mid + 1;
else { // found a match
while (mid+1 < docMaps.length && starts[mid+1] == midValue) {
mid++; // scan to last match
}
if (docMaps[mid] != null)
return newStarts[mid] + docMaps[mid][oldDocID-starts[mid]];
else
return newStarts[mid] + oldDocID-starts[mid];
}
}
if (docMaps[hi] != null)
return newStarts[hi] + docMaps[hi][oldDocID-starts[hi]];
else
return newStarts[hi] + oldDocID-starts[hi];
}
}
}

View File

@ -436,10 +436,21 @@ final class SegmentMerger {
private final void mergeTermInfos() throws CorruptIndexException, IOException {
int base = 0;
for (int i = 0; i < readers.size(); i++) {
final int readerCount = readers.size();
for (int i = 0; i < readerCount; i++) {
IndexReader reader = (IndexReader) readers.elementAt(i);
TermEnum termEnum = reader.terms();
SegmentMergeInfo smi = new SegmentMergeInfo(base, termEnum, reader);
int[] docMap = smi.getDocMap();
if (docMap != null) {
if (docMaps == null) {
docMaps = new int[readerCount][];
delCounts = new int[readerCount];
}
docMaps[i] = docMap;
delCounts[i] = smi.reader.maxDoc() - smi.reader.numDocs();
}
base += reader.numDocs();
if (smi.next())
queue.put(smi); // initialize queue
@ -504,7 +515,15 @@ final class SegmentMerger {
return df;
}
private byte[] payloadBuffer = null;
private byte[] payloadBuffer;
private int[][] docMaps;
int[][] getDocMaps() {
return docMaps;
}
private int[] delCounts;
int[] getDelCounts() {
return delCounts;
}
/** Process postings from multiple segments all positioned on the
* same term. Writes out merged entries into freqOutput and

View File

@ -91,6 +91,7 @@ class TermVectorsReader implements Cloneable {
if (-1 == docStoreOffset) {
this.docStoreOffset = 0;
this.size = numTotalDocs;
assert size == 0 || numTotalDocs == size;
} else {
this.docStoreOffset = docStoreOffset;
this.size = size;
@ -176,6 +177,7 @@ class TermVectorsReader implements Cloneable {
} else {
tvdPosition = tvd.length();
tvfPosition = tvf.length();
assert count == numDocs-1;
}
tvdLengths[count] = (int) (tvdPosition-lastTvdPosition);
tvfLengths[count] = (int) (tvfPosition-lastTvfPosition);

View File

@ -24,11 +24,11 @@ import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.analysis.WhitespaceAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.RAMDirectory;
import org.apache.lucene.search.PhraseQuery;
public class TestAddIndexesNoOptimize extends LuceneTestCase {
public void testSimpleCase() throws IOException {
// main directory
@ -122,6 +122,118 @@ public class TestAddIndexesNoOptimize extends LuceneTestCase {
verifyTermDocs(dir, new Term("content", "bbb"), 51);
}
public void testWithPendingDeletes() throws IOException {
// main directory
Directory dir = new RAMDirectory();
// auxiliary directory
Directory aux = new RAMDirectory();
setUpDirs(dir, aux);
IndexWriter writer = newWriter(dir, false);
writer.addIndexesNoOptimize(new Directory[] {aux});
// Adds 10 docs, then replaces them with another 10
// docs, so 10 pending deletes:
for (int i = 0; i < 20; i++) {
Document doc = new Document();
doc.add(new Field("id", "" + (i % 10), Field.Store.NO, Field.Index.UN_TOKENIZED));
doc.add(new Field("content", "bbb " + i, Field.Store.NO,
Field.Index.TOKENIZED));
writer.updateDocument(new Term("id", "" + (i%10)), doc);
}
// Deletes one of the 10 added docs, leaving 9:
PhraseQuery q = new PhraseQuery();
q.add(new Term("content", "bbb"));
q.add(new Term("content", "14"));
writer.deleteDocuments(q);
writer.optimize();
verifyNumDocs(dir, 1039);
verifyTermDocs(dir, new Term("content", "aaa"), 1030);
verifyTermDocs(dir, new Term("content", "bbb"), 9);
writer.close();
dir.close();
aux.close();
}
public void testWithPendingDeletes2() throws IOException {
// main directory
Directory dir = new RAMDirectory();
// auxiliary directory
Directory aux = new RAMDirectory();
setUpDirs(dir, aux);
IndexWriter writer = newWriter(dir, false);
// Adds 10 docs, then replaces them with another 10
// docs, so 10 pending deletes:
for (int i = 0; i < 20; i++) {
Document doc = new Document();
doc.add(new Field("id", "" + (i % 10), Field.Store.NO, Field.Index.UN_TOKENIZED));
doc.add(new Field("content", "bbb " + i, Field.Store.NO,
Field.Index.TOKENIZED));
writer.updateDocument(new Term("id", "" + (i%10)), doc);
}
writer.addIndexesNoOptimize(new Directory[] {aux});
// Deletes one of the 10 added docs, leaving 9:
PhraseQuery q = new PhraseQuery();
q.add(new Term("content", "bbb"));
q.add(new Term("content", "14"));
writer.deleteDocuments(q);
writer.optimize();
verifyNumDocs(dir, 1039);
verifyTermDocs(dir, new Term("content", "aaa"), 1030);
verifyTermDocs(dir, new Term("content", "bbb"), 9);
writer.close();
dir.close();
aux.close();
}
public void testWithPendingDeletes3() throws IOException {
// main directory
Directory dir = new RAMDirectory();
// auxiliary directory
Directory aux = new RAMDirectory();
setUpDirs(dir, aux);
IndexWriter writer = newWriter(dir, false);
// Adds 10 docs, then replaces them with another 10
// docs, so 10 pending deletes:
for (int i = 0; i < 20; i++) {
Document doc = new Document();
doc.add(new Field("id", "" + (i % 10), Field.Store.NO, Field.Index.UN_TOKENIZED));
doc.add(new Field("content", "bbb " + i, Field.Store.NO,
Field.Index.TOKENIZED));
writer.updateDocument(new Term("id", "" + (i%10)), doc);
}
// Deletes one of the 10 added docs, leaving 9:
PhraseQuery q = new PhraseQuery();
q.add(new Term("content", "bbb"));
q.add(new Term("content", "14"));
writer.deleteDocuments(q);
writer.addIndexesNoOptimize(new Directory[] {aux});
writer.optimize();
verifyNumDocs(dir, 1039);
verifyTermDocs(dir, new Term("content", "aaa"), 1030);
verifyTermDocs(dir, new Term("content", "bbb"), 9);
writer.close();
dir.close();
aux.close();
}
// case 0: add self or exceed maxMergeDocs, expect exception
public void testAddSelf() throws IOException {
// main directory

View File

@ -25,11 +25,26 @@ import org.apache.lucene.queryParser.*;
import java.util.Random;
import java.io.File;
import java.io.IOException;
public class TestAtomicUpdate extends LuceneTestCase {
private static final Analyzer ANALYZER = new SimpleAnalyzer();
private static final Random RANDOM = new Random();
public class MockIndexWriter extends IndexWriter {
public MockIndexWriter(Directory dir, boolean autoCommit, Analyzer a, boolean create, MaxFieldLength mfl) throws IOException {
super(dir, autoCommit, a, create, mfl);
}
boolean testPoint(String name) {
// if (name.equals("startCommit")) {
if (RANDOM.nextInt(4) == 2)
Thread.yield();
return true;
}
}
private static abstract class TimedThread extends Thread {
boolean failed;
int count;
@ -113,7 +128,9 @@ public class TestAtomicUpdate extends LuceneTestCase {
TimedThread[] threads = new TimedThread[4];
IndexWriter writer = new IndexWriter(directory, ANALYZER, true, IndexWriter.MaxFieldLength.LIMITED);
IndexWriter writer = new MockIndexWriter(directory, true, ANALYZER, true, IndexWriter.MaxFieldLength.LIMITED);
writer.setMaxBufferedDocs(7);
writer.setMergeFactor(3);
// Establish a base index of 100 docs:
for(int i=0;i<100;i++) {

View File

@ -1353,7 +1353,7 @@ public class TestIndexWriter extends LuceneTestCase
assertTrue(flushCount > lastFlushCount);
lastFlushCount = flushCount;
writer.setRAMBufferSizeMB(0.000001);
writer.setMaxBufferedDeleteTerms(IndexWriter.DISABLE_AUTO_FLUSH);
writer.setMaxBufferedDeleteTerms(1);
} else if (j < 20) {
assertTrue(flushCount > lastFlushCount);
lastFlushCount = flushCount;
@ -1366,6 +1366,7 @@ public class TestIndexWriter extends LuceneTestCase
} else if (30 == j) {
writer.setRAMBufferSizeMB(0.000001);
writer.setMaxBufferedDeleteTerms(IndexWriter.DISABLE_AUTO_FLUSH);
writer.setMaxBufferedDeleteTerms(1);
} else if (j < 40) {
assertTrue(flushCount> lastFlushCount);
lastFlushCount = flushCount;
@ -1554,7 +1555,7 @@ public class TestIndexWriter extends LuceneTestCase
doc.add(new Field("field", "aaa", Field.Store.YES, Field.Index.TOKENIZED, Field.TermVector.WITH_POSITIONS_OFFSETS));
for(int i=0;i<19;i++)
writer.addDocument(doc);
writer.flush(false, true);
writer.flush(false, true, true);
writer.close();
SegmentInfos sis = new SegmentInfos();
sis.read(dir);

View File

@ -107,7 +107,6 @@ public class TestIndexWriterDelete extends LuceneTestCase {
assertEquals(7, reader.numDocs());
reader.close();
modifier.deleteDocuments(new Term("value", String.valueOf(value)));
modifier.deleteDocuments(new Term("value", String.valueOf(value)));
modifier.commit();
@ -120,9 +119,26 @@ public class TestIndexWriterDelete extends LuceneTestCase {
}
}
public void testMaxBufferedDeletes() throws IOException {
for(int pass=0;pass<2;pass++) {
boolean autoCommit = (0==pass);
Directory dir = new MockRAMDirectory();
IndexWriter writer = new IndexWriter(dir, autoCommit,
new WhitespaceAnalyzer(), true, IndexWriter.MaxFieldLength.LIMITED);
writer.setMaxBufferedDeleteTerms(1);
writer.deleteDocuments(new Term("foobar", "1"));
writer.deleteDocuments(new Term("foobar", "1"));
writer.deleteDocuments(new Term("foobar", "1"));
assertEquals(3, writer.getFlushDeletesCount());
writer.close();
dir.close();
}
}
// test when delete terms only apply to ram segments
public void testRAMDeletes() throws IOException {
for(int pass=0;pass<2;pass++) {
for(int t=0;t<2;t++) {
boolean autoCommit = (0==pass);
Directory dir = new MockRAMDirectory();
IndexWriter modifier = new IndexWriter(dir, autoCommit,
@ -134,12 +150,18 @@ public class TestIndexWriterDelete extends LuceneTestCase {
int value = 100;
addDoc(modifier, ++id, value);
if (0 == t)
modifier.deleteDocuments(new Term("value", String.valueOf(value)));
else
modifier.deleteDocuments(new TermQuery(new Term("value", String.valueOf(value))));
addDoc(modifier, ++id, value);
if (0 == t) {
modifier.deleteDocuments(new Term("value", String.valueOf(value)));
assertEquals(2, modifier.getNumBufferedDeleteTerms());
assertEquals(1, modifier.getBufferedDeleteTermsSize());
}
else
modifier.deleteDocuments(new TermQuery(new Term("value", String.valueOf(value))));
addDoc(modifier, ++id, value);
assertEquals(0, modifier.getSegmentCount());
@ -157,6 +179,7 @@ public class TestIndexWriterDelete extends LuceneTestCase {
dir.close();
}
}
}
// test when delete terms apply to both disk and ram segments
public void testBothDeletes() throws IOException {
@ -306,6 +329,7 @@ public class TestIndexWriterDelete extends LuceneTestCase {
// Iterate w/ ever increasing free disk space:
while (!done) {
MockRAMDirectory dir = new MockRAMDirectory(startDir);
dir.setPreventDoubleWrite(false);
IndexWriter modifier = new IndexWriter(dir, autoCommit,
new WhitespaceAnalyzer(), IndexWriter.MaxFieldLength.LIMITED);

View File

@ -38,6 +38,19 @@ public class TestStressIndexing2 extends LuceneTestCase {
static Random r = new Random(0);
public class MockIndexWriter extends IndexWriter {
public MockIndexWriter(Directory dir, boolean autoCommit, Analyzer a, boolean create, MaxFieldLength mfl) throws IOException {
super(dir, autoCommit, a, create, mfl);
}
boolean testPoint(String name) {
// if (name.equals("startCommit")) {
if (r.nextInt(4) == 2)
Thread.yield();
return true;
}
}
public void testRandom() throws Exception {
Directory dir1 = new MockRAMDirectory();
@ -99,7 +112,7 @@ public class TestStressIndexing2 extends LuceneTestCase {
// everything.
public Map indexRandom(int nThreads, int iterations, int range, Directory dir) throws IOException, InterruptedException {
IndexWriter w = new IndexWriter(dir, autoCommit, new WhitespaceAnalyzer(), true, IndexWriter.MaxFieldLength.UNLIMITED);
IndexWriter w = new MockIndexWriter(dir, autoCommit, new WhitespaceAnalyzer(), true, IndexWriter.MaxFieldLength.UNLIMITED);
w.setUseCompoundFile(false);
/***
w.setMaxMergeDocs(Integer.MAX_VALUE);