From 5b113c8af63c26efd16bf2710fa5196244b515bc Mon Sep 17 00:00:00 2001 From: Michael McCandless Date: Thu, 6 Mar 2008 11:02:04 +0000 Subject: [PATCH] LUCENE-1194: add IndexWriter.deleteDocuments(Query) git-svn-id: https://svn.apache.org/repos/asf/lucene/java/trunk@634219 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 5 + .../apache/lucene/index/DocumentsWriter.java | 366 +++++++++--- .../org/apache/lucene/index/IndexReader.java | 3 + .../org/apache/lucene/index/IndexWriter.java | 529 +++++++++--------- .../lucene/index/MergeDocIDRemapper.java | 110 ++++ .../apache/lucene/index/SegmentMerger.java | 23 +- .../lucene/index/TermVectorsReader.java | 2 + .../index/TestAddIndexesNoOptimize.java | 116 +++- .../apache/lucene/index/TestAtomicUpdate.java | 19 +- .../apache/lucene/index/TestIndexWriter.java | 5 +- .../lucene/index/TestIndexWriterDelete.java | 88 +-- .../lucene/index/TestStressIndexing2.java | 15 +- 12 files changed, 905 insertions(+), 376 deletions(-) create mode 100644 src/java/org/apache/lucene/index/MergeDocIDRemapper.java diff --git a/CHANGES.txt b/CHANGES.txt index bff812e363b..4bb2bf997ec 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -105,6 +105,11 @@ New features close/re-open of IndexWriter while still protecting an open snapshot (Tim Brennan via Mike McCandless) + 9. LUCENE-1194: Added IndexWriter.deleteDocuments(Query) to delete + documents matching the specified query. Also added static unlock + and isLocked methods (deprecating the ones in IndexReader). (Mike + McCandless) + Optimizations 1. LUCENE-705: When building a compound file, use diff --git a/src/java/org/apache/lucene/index/DocumentsWriter.java b/src/java/org/apache/lucene/index/DocumentsWriter.java index b0782e78a0b..f403846c247 100644 --- a/src/java/org/apache/lucene/index/DocumentsWriter.java +++ b/src/java/org/apache/lucene/index/DocumentsWriter.java @@ -23,6 +23,10 @@ import org.apache.lucene.analysis.TokenStream; import org.apache.lucene.document.Document; import org.apache.lucene.document.Fieldable; import org.apache.lucene.search.Similarity; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.Scorer; +import org.apache.lucene.search.Weight; import org.apache.lucene.store.Directory; import org.apache.lucene.store.IndexOutput; import org.apache.lucene.store.IndexInput; @@ -33,9 +37,11 @@ import java.io.IOException; import java.io.PrintStream; import java.io.Reader; import java.util.Arrays; +import java.util.Iterator; import java.util.List; import java.util.HashMap; import java.util.ArrayList; +import java.util.Map.Entry; import java.text.NumberFormat; import java.util.Collections; @@ -154,14 +160,109 @@ final class DocumentsWriter { private PrintStream infoStream; - // This Hashmap buffers delete terms in ram before they - // are applied. The key is delete term; the value is - // number of buffered documents the term applies to. - private HashMap bufferedDeleteTerms = new HashMap(); - private int numBufferedDeleteTerms = 0; + // Holds buffered deletes, by docID, term or query. We + // hold two instances of this class: one for the deletes + // prior to the last flush, the other for deletes after + // the last flush. This is so if we need to abort + // (discard all buffered docs) we can also discard the + // buffered deletes yet keep the deletes done during + // previously flushed segments. + private static class BufferedDeletes { + int numTerms; + HashMap terms = new HashMap(); + HashMap queries = new HashMap(); + List docIDs = new ArrayList(); - // Currently used only for deleting a doc on hitting an non-aborting exception - private List bufferedDeleteDocIDs = new ArrayList(); + private void update(BufferedDeletes in) { + numTerms += in.numTerms; + terms.putAll(in.terms); + queries.putAll(in.queries); + docIDs.addAll(in.docIDs); + in.terms.clear(); + in.numTerms = 0; + in.queries.clear(); + in.docIDs.clear(); + } + + void clear() { + terms.clear(); + queries.clear(); + docIDs.clear(); + numTerms = 0; + } + + boolean any() { + return terms.size() > 0 || docIDs.size() > 0 || queries.size() > 0; + } + + // Remaps all buffered deletes based on a completed + // merge + synchronized void remap(MergeDocIDRemapper mapper, + SegmentInfos infos, + int[][] docMaps, + int[] delCounts, + MergePolicy.OneMerge merge, + int mergeDocCount) { + + final HashMap newDeleteTerms; + + // Remap delete-by-term + if (terms.size() > 0) { + newDeleteTerms = new HashMap(); + Iterator iter = terms.entrySet().iterator(); + while(iter.hasNext()) { + Entry entry = (Entry) iter.next(); + Num num = (Num) entry.getValue(); + newDeleteTerms.put(entry.getKey(), + new Num(mapper.remap(num.getNum()))); + } + } else + newDeleteTerms = null; + + // Remap delete-by-docID + final List newDeleteDocIDs; + + if (docIDs.size() > 0) { + newDeleteDocIDs = new ArrayList(docIDs.size()); + Iterator iter = docIDs.iterator(); + while(iter.hasNext()) { + Integer num = (Integer) iter.next(); + newDeleteDocIDs.add(new Integer(mapper.remap(num.intValue()))); + } + } else + newDeleteDocIDs = null; + + // Remap delete-by-query + final HashMap newDeleteQueries; + + if (queries.size() > 0) { + newDeleteQueries = new HashMap(queries.size()); + Iterator iter = queries.entrySet().iterator(); + while(iter.hasNext()) { + Entry entry = (Entry) iter.next(); + Integer num = (Integer) entry.getValue(); + newDeleteQueries.put(entry.getKey(), + new Integer(mapper.remap(num.intValue()))); + } + } else + newDeleteQueries = null; + + if (newDeleteTerms != null) + terms = newDeleteTerms; + if (newDeleteDocIDs != null) + docIDs = newDeleteDocIDs; + if (newDeleteQueries != null) + queries = newDeleteQueries; + } + } + + // Deletes done after the last flush; these are discarded + // on abort + private BufferedDeletes deletesInRAM = new BufferedDeletes(); + + // Deletes done before the last flush; these are still + // kept on abort + private BufferedDeletes deletesFlushed = new BufferedDeletes(); // The max number of delete terms that can be buffered before // they must be flushed to disk. @@ -175,20 +276,29 @@ final class DocumentsWriter { // non-zero we will flush by RAM usage instead. private int maxBufferedDocs = IndexWriter.DEFAULT_MAX_BUFFERED_DOCS; + private int flushedDocCount; // How many docs already flushed to index + + synchronized void updateFlushedDocCount(int n) { + flushedDocCount += n; + } + synchronized int getFlushedDocCount() { + return flushedDocCount; + } + synchronized void setFlushedDocCount(int n) { + flushedDocCount = n; + } + private boolean closed; // Coarse estimates used to measure RAM usage of buffered deletes private static int OBJECT_HEADER_BYTES = 8; - private static int OBJECT_POINTER_BYTES = 4; // TODO: should be 8 on 64-bit platform - private static int BYTES_PER_CHAR = 2; - private static int BYTES_PER_INT = 4; private BufferedNorms[] norms = new BufferedNorms[0]; // Holds norms until we flush DocumentsWriter(Directory directory, IndexWriter writer) throws IOException { this.directory = directory; this.writer = writer; - + flushedDocCount = writer.docCount(); postingsFreeList = new Posting[0]; } @@ -357,9 +467,7 @@ final class DocumentsWriter { try { - bufferedDeleteTerms.clear(); - bufferedDeleteDocIDs.clear(); - numBufferedDeleteTerms = 0; + deletesInRAM.clear(); try { abortedFiles = files(); @@ -547,6 +655,8 @@ final class DocumentsWriter { newFiles.addAll(writeSegment()); + flushedDocCount += docCount; + success = true; } finally { @@ -2110,12 +2220,7 @@ final class DocumentsWriter { } resetPostingsData(); - - nextDocID = 0; - nextWriteDocID = 0; - numDocsInRAM = 0; - files = null; - + // Maybe downsize postingsFreeList array if (postingsFreeList.length > 1.5*postingsFreeCount) { int newSize = postingsFreeList.length; @@ -2130,6 +2235,10 @@ final class DocumentsWriter { return flushedFiles; } + synchronized void pushDeletes() { + deletesFlushed.update(deletesInRAM); + } + /** Returns the name of the file with this extension, on * the current segment we are working on. */ private String segmentFileName(String extension) { @@ -2428,15 +2537,7 @@ final class DocumentsWriter { // Next, wait until my thread state is idle (in case // it's shared with other threads) and for threads to // not be paused nor a flush pending: - while(!closed && (!state.isIdle || pauseThreads != 0 || flushPending || abortCount > 0)) - try { - wait(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - - if (closed) - throw new AlreadyClosedException("this IndexWriter is closed"); + waitReady(state); if (segment == null) segment = writer.newSegmentName(); @@ -2529,55 +2630,72 @@ final class DocumentsWriter { return state.doFlushAfter || timeToFlushDeletes(); } + // for testing synchronized int getNumBufferedDeleteTerms() { - return numBufferedDeleteTerms; + return deletesInRAM.numTerms; } + // for testing synchronized HashMap getBufferedDeleteTerms() { - return bufferedDeleteTerms; + return deletesInRAM.terms; } - synchronized List getBufferedDeleteDocIDs() { - return bufferedDeleteDocIDs; + /** Called whenever a merge has completed and the merged segments had deletions */ + synchronized void remapDeletes(SegmentInfos infos, int[][] docMaps, int[] delCounts, MergePolicy.OneMerge merge, int mergeDocCount) { + if (docMaps == null) + // The merged segments had no deletes so docIDs did not change and we have nothing to do + return; + MergeDocIDRemapper mapper = new MergeDocIDRemapper(infos, docMaps, delCounts, merge, mergeDocCount); + deletesInRAM.remap(mapper, infos, docMaps, delCounts, merge, mergeDocCount); + deletesFlushed.remap(mapper, infos, docMaps, delCounts, merge, mergeDocCount); + flushedDocCount -= mapper.docShift; } - // Reset buffered deletes. - synchronized void clearBufferedDeletes() throws IOException { - bufferedDeleteTerms.clear(); - bufferedDeleteDocIDs.clear(); - numBufferedDeleteTerms = 0; - if (numBytesUsed > 0) - resetPostingsData(); - } - - synchronized boolean bufferDeleteTerms(Term[] terms) throws IOException { - while(pauseThreads != 0 || flushPending) + synchronized private void waitReady(ThreadState state) { + while(!closed && ((state != null && !state.isIdle) || pauseThreads != 0 || flushPending || abortCount > 0)) try { wait(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } - for (int i = 0; i < terms.length; i++) - addDeleteTerm(terms[i], numDocsInRAM); + + if (closed) + throw new AlreadyClosedException("this IndexWriter is closed"); + } + + synchronized boolean bufferDeleteTerms(Term[] terms) throws IOException { + waitReady(null); + for (int i = 0; i < terms.length; i++) + addDeleteTerm(terms[i], numDocsInRAM); return timeToFlushDeletes(); } synchronized boolean bufferDeleteTerm(Term term) throws IOException { - while(pauseThreads != 0 || flushPending) - try { - wait(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } + waitReady(null); addDeleteTerm(term, numDocsInRAM); return timeToFlushDeletes(); } + synchronized boolean bufferDeleteQueries(Query[] queries) throws IOException { + waitReady(null); + for (int i = 0; i < queries.length; i++) + addDeleteQuery(queries[i], numDocsInRAM); + return timeToFlushDeletes(); + } + + synchronized boolean bufferDeleteQuery(Query query) throws IOException { + waitReady(null); + addDeleteQuery(query, numDocsInRAM); + return timeToFlushDeletes(); + } + + synchronized boolean deletesFull() { + return maxBufferedDeleteTerms != IndexWriter.DISABLE_AUTO_FLUSH + && ((deletesInRAM.numTerms + deletesInRAM.queries.size() + deletesInRAM.docIDs.size()) >= maxBufferedDeleteTerms); + } + synchronized private boolean timeToFlushDeletes() { - return (bufferIsFull - || (maxBufferedDeleteTerms != IndexWriter.DISABLE_AUTO_FLUSH - && numBufferedDeleteTerms >= maxBufferedDeleteTerms)) - && setFlushPending(); + return (bufferIsFull || deletesFull()) && setFlushPending(); } void setMaxBufferedDeleteTerms(int maxBufferedDeleteTerms) { @@ -2589,7 +2707,108 @@ final class DocumentsWriter { } synchronized boolean hasDeletes() { - return bufferedDeleteTerms.size() > 0 || bufferedDeleteDocIDs.size() > 0; + return deletesFlushed.any(); + } + + synchronized boolean applyDeletes(SegmentInfos infos) throws IOException { + + if (!hasDeletes()) + return false; + + if (infoStream != null) + infoStream.println("apply " + deletesFlushed.numTerms + " buffered deleted terms and " + + deletesFlushed.docIDs.size() + " deleted docIDs and " + + deletesFlushed.queries.size() + " deleted queries on " + + + infos.size() + " segments."); + + final int infosEnd = infos.size(); + + int docStart = 0; + boolean any = false; + for (int i = 0; i < infosEnd; i++) { + IndexReader reader = SegmentReader.get(infos.info(i), false); + boolean success = false; + try { + any |= applyDeletes(reader, docStart); + docStart += reader.maxDoc(); + success = true; + } finally { + if (reader != null) { + try { + if (success) + reader.doCommit(); + } finally { + reader.doClose(); + } + } + } + } + + deletesFlushed.clear(); + + return any; + } + + // Apply buffered delete terms, queries and docIDs to the + // provided reader + private final synchronized boolean applyDeletes(IndexReader reader, int docIDStart) + throws CorruptIndexException, IOException { + + final int docEnd = docIDStart + reader.maxDoc(); + boolean any = false; + + // Delete by term + Iterator iter = deletesFlushed.terms.entrySet().iterator(); + while (iter.hasNext()) { + Entry entry = (Entry) iter.next(); + Term term = (Term) entry.getKey(); + + TermDocs docs = reader.termDocs(term); + if (docs != null) { + int limit = ((DocumentsWriter.Num) entry.getValue()).getNum(); + try { + while (docs.next()) { + int docID = docs.doc(); + if (docIDStart+docID >= limit) + break; + reader.deleteDocument(docID); + any = true; + } + } finally { + docs.close(); + } + } + } + + // Delete by docID + iter = deletesFlushed.docIDs.iterator(); + while(iter.hasNext()) { + int docID = ((Integer) iter.next()).intValue(); + if (docID >= docIDStart && docID < docEnd) { + reader.deleteDocument(docID-docIDStart); + any = true; + } + } + + // Delete by query + IndexSearcher searcher = new IndexSearcher(reader); + iter = deletesFlushed.queries.entrySet().iterator(); + while(iter.hasNext()) { + Entry entry = (Entry) iter.next(); + Query query = (Query) entry.getKey(); + int limit = ((Integer) entry.getValue()).intValue(); + Weight weight = query.weight(searcher); + Scorer scorer = weight.scorer(reader); + while(scorer.next()) { + final int docID = scorer.doc(); + if (docIDStart + docID >= limit) + break; + reader.deleteDocument(docID); + any = true; + } + } + searcher.close(); + return any; } // Number of documents a delete term applies to. @@ -2621,27 +2840,23 @@ final class DocumentsWriter { // delete term will be applied to those documents as well // as the disk segments. synchronized private void addDeleteTerm(Term term, int docCount) { - Num num = (Num) bufferedDeleteTerms.get(term); - if (num == null) { - bufferedDeleteTerms.put(term, new Num(docCount)); - // This is coarse approximation of actual bytes used: - numBytesUsed += (term.field().length() + term.text().length()) * BYTES_PER_CHAR - + 4 + 5 * OBJECT_HEADER_BYTES + 5 * OBJECT_POINTER_BYTES; - if (ramBufferSize != IndexWriter.DISABLE_AUTO_FLUSH - && numBytesUsed > ramBufferSize) { - bufferIsFull = true; - } - } else { - num.setNum(docCount); - } - numBufferedDeleteTerms++; + Num num = (Num) deletesInRAM.terms.get(term); + final int docIDUpto = flushedDocCount + docCount; + if (num == null) + deletesInRAM.terms.put(term, new Num(docIDUpto)); + else + num.setNum(docIDUpto); + deletesInRAM.numTerms++; } // Buffer a specific docID for deletion. Currently only // used when we hit a exception when adding a document - synchronized private void addDeleteDocID(int docId) { - bufferedDeleteDocIDs.add(new Integer(docId)); - numBytesUsed += OBJECT_HEADER_BYTES + BYTES_PER_INT + OBJECT_POINTER_BYTES; + synchronized private void addDeleteDocID(int docID) { + deletesInRAM.docIDs.add(new Integer(flushedDocCount+docID)); + } + + synchronized private void addDeleteQuery(Query query, int docID) { + deletesInRAM.queries.put(query, new Integer(flushedDocCount + docID)); } /** Does the synchronized work to finish/flush the @@ -3132,6 +3347,7 @@ final class DocumentsWriter { postingsAllocCount++; } } + assert numBytesUsed <= numBytesAlloc; } synchronized void recyclePostings(Posting[] postings, int numPostings) { @@ -3164,6 +3380,7 @@ final class DocumentsWriter { b = (byte[]) freeByteBlocks.remove(size-1); if (trackAllocations) numBytesUsed += BYTE_BLOCK_SIZE; + assert numBytesUsed <= numBytesAlloc; return b; } @@ -3194,6 +3411,7 @@ final class DocumentsWriter { } else c = (char[]) freeCharBlocks.remove(size-1); numBytesUsed += CHAR_BLOCK_SIZE * CHAR_NUM_BYTE; + assert numBytesUsed <= numBytesAlloc; return c; } diff --git a/src/java/org/apache/lucene/index/IndexReader.java b/src/java/org/apache/lucene/index/IndexReader.java index fd50bb7208f..4ba084ea8e4 100644 --- a/src/java/org/apache/lucene/index/IndexReader.java +++ b/src/java/org/apache/lucene/index/IndexReader.java @@ -852,6 +852,7 @@ public abstract class IndexReader { * currently locked. * @param directory the directory to check for a lock * @throws IOException if there is a low-level IO error + * @deprecated Please use {@link IndexWriter#isLocked(Directory)} instead */ public static boolean isLocked(Directory directory) throws IOException { return @@ -863,6 +864,7 @@ public abstract class IndexReader { * currently locked. * @param directory the directory to check for a lock * @throws IOException if there is a low-level IO error + * @deprecated Please use {@link IndexWriter#isLocked(String)} instead */ public static boolean isLocked(String directory) throws IOException { Directory dir = FSDirectory.getDirectory(directory); @@ -877,6 +879,7 @@ public abstract class IndexReader { * Caution: this should only be used by failure recovery code, * when it is known that no other process nor thread is in fact * currently accessing this index. + * @deprecated Please use {@link IndexWriter#unlock(Directory)} instead */ public static void unlock(Directory directory) throws IOException { directory.makeLock(IndexWriter.WRITE_LOCK_NAME).release(); diff --git a/src/java/org/apache/lucene/index/IndexWriter.java b/src/java/org/apache/lucene/index/IndexWriter.java index 01d688dc587..288f1ce7d86 100644 --- a/src/java/org/apache/lucene/index/IndexWriter.java +++ b/src/java/org/apache/lucene/index/IndexWriter.java @@ -20,6 +20,7 @@ package org.apache.lucene.index; import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.document.Document; import org.apache.lucene.search.Similarity; +import org.apache.lucene.search.Query; import org.apache.lucene.store.Directory; import org.apache.lucene.store.FSDirectory; import org.apache.lucene.store.Lock; @@ -39,7 +40,6 @@ import java.util.Set; import java.util.HashSet; import java.util.LinkedList; import java.util.Iterator; -import java.util.Map.Entry; /** An IndexWriter creates and maintains an index. @@ -60,7 +60,9 @@ import java.util.Map.Entry;

In either case, documents are added with addDocument and removed with deleteDocuments. + href="#deleteDocuments(org.apache.lucene.index.Term)">deleteDocuments + or deleteDocuments. A document can be updated with updateDocument (which just deletes and then adds the entire document). When finished adding, deleting and updating documents, close should be called.

@@ -75,9 +77,10 @@ import java.util.Map.Entry; #setRAMBufferSizeMB}) or the number of added documents. The default is to flush when RAM usage hits 16 MB. For best indexing speed you should flush by RAM usage with a - large RAM buffer. You can also force a flush by calling - {@link #flush}. When a flush occurs, both pending deletes - and added documents are flushed to the index. A flush may + large RAM buffer. Note that flushing just moves the + internal buffered state in IndexWriter into the index, but + these changes are not visible to IndexReader until either + {@link #commit} or {@link #close} is called. A flush may also trigger one or more segment merges which by default run with a background thread so as not to block the addDocument calls (see below @@ -296,17 +299,18 @@ public class IndexWriter { private Similarity similarity = Similarity.getDefault(); // how to normalize - private volatile boolean commitPending; // true if segmentInfos has changes not yet committed + private volatile long changeCount; // increments every a change is completed + private long lastCommitChangeCount; // last changeCount that was committed + private SegmentInfos rollbackSegmentInfos; // segmentInfos we will fallback to if the commit fails private HashMap rollbackSegments; private SegmentInfos localRollbackSegmentInfos; // segmentInfos we will fallback to if the commit fails private boolean localAutoCommit; // saved autoCommit during local transaction + private int localFlushedDocCount; // saved docWriter.getFlushedDocCount during local transaction private boolean autoCommit = true; // false if we should commit only on close private SegmentInfos segmentInfos = new SegmentInfos(); // the segments - private int syncCount; - private int syncCountSaved = -1; private DocumentsWriter docWriter; private IndexFileDeleter deleter; @@ -334,11 +338,9 @@ public class IndexWriter { private boolean stopMerges; private int flushCount; + private int flushDeletesCount; private double maxSyncPauseSeconds = DEFAULT_MAX_SYNC_PAUSE_SECONDS; - // Last (right most) SegmentInfo created by a merge - private SegmentInfo lastMergeInfo; - /** * Used internally to throw an {@link * AlreadyClosedException} if this IndexWriter has been @@ -1085,10 +1087,10 @@ public class IndexWriter { if (create) { // Clear the write lock in case it's leftover: - directory.clearLock(IndexWriter.WRITE_LOCK_NAME); + directory.clearLock(WRITE_LOCK_NAME); } - Lock writeLock = directory.makeLock(IndexWriter.WRITE_LOCK_NAME); + Lock writeLock = directory.makeLock(WRITE_LOCK_NAME); if (!writeLock.obtain(writeLockTimeout)) // obtain write lock throw new LockObtainFailedException("Index locked for write: " + writeLock); this.writeLock = writeLock; // save it @@ -1653,7 +1655,7 @@ public class IndexWriter { // Only allow a new merge to be triggered if we are // going to wait for merges: - flush(waitForMerges, true); + flush(waitForMerges, true, true); mergePolicy.close(); @@ -1662,9 +1664,9 @@ public class IndexWriter { mergeScheduler.close(); if (infoStream != null) - message("now call final sync()"); + message("now call final commit()"); - sync(true, 0); + commit(true, 0); if (infoStream != null) message("at close: " + segString()); @@ -1790,7 +1792,11 @@ public class IndexWriter { /** Returns the number of documents currently in this index. */ public synchronized int docCount() { ensureOpen(); - int count = docWriter.getNumDocsInRAM(); + int count; + if (docWriter != null) + count = docWriter.getNumDocsInRAM(); + else + count = 0; for (int i = 0; i < segmentInfos.size(); i++) { SegmentInfo si = segmentInfos.info(i); count += si.docCount; @@ -1798,6 +1804,16 @@ public class IndexWriter { return count; } + public synchronized boolean hasDeletions() throws IOException { + ensureOpen(); + if (docWriter.hasDeletes()) + return true; + for (int i = 0; i < segmentInfos.size(); i++) + if (segmentInfos.info(i).hasDeletions()) + return true; + return false; + } + /** * The maximum number of terms that will be indexed for a single field in a * document. This limits the amount of memory required for indexing, so that @@ -1893,7 +1909,7 @@ public class IndexWriter { } } if (doFlush) - flush(true, false); + flush(true, false, false); } catch (OutOfMemoryError oom) { hitOOM = true; throw oom; @@ -1911,7 +1927,7 @@ public class IndexWriter { try { boolean doFlush = docWriter.bufferDeleteTerm(term); if (doFlush) - flush(true, false); + flush(true, false, false); } catch (OutOfMemoryError oom) { hitOOM = true; throw oom; @@ -1931,13 +1947,41 @@ public class IndexWriter { try { boolean doFlush = docWriter.bufferDeleteTerms(terms); if (doFlush) - flush(true, false); + flush(true, false, false); } catch (OutOfMemoryError oom) { hitOOM = true; throw oom; } } + /** + * Deletes the document(s) matching the provided query. + * @param query the query to identify the documents to be deleted + * @throws CorruptIndexException if the index is corrupt + * @throws IOException if there is a low-level IO error + */ + public void deleteDocuments(Query query) throws CorruptIndexException, IOException { + ensureOpen(); + boolean doFlush = docWriter.bufferDeleteQuery(query); + if (doFlush) + flush(true, false, false); + } + + /** + * Deletes the document(s) matching any of the provided queries. + * All deletes are flushed at the same time. + * @param queries array of queries to identify the documents + * to be deleted + * @throws CorruptIndexException if the index is corrupt + * @throws IOException if there is a low-level IO error + */ + public void deleteDocuments(Query[] queries) throws CorruptIndexException, IOException { + ensureOpen(); + boolean doFlush = docWriter.bufferDeleteQueries(queries); + if (doFlush) + flush(true, false, false); + } + /** * Updates a document by first deleting the document(s) * containing term and then adding the new @@ -1993,7 +2037,7 @@ public class IndexWriter { } } if (doFlush) - flush(true, false); + flush(true, false, false); } catch (OutOfMemoryError oom) { hitOOM = true; throw oom; @@ -2024,16 +2068,21 @@ public class IndexWriter { return flushCount; } + // for test purpose + final synchronized int getFlushDeletesCount() { + return flushDeletesCount; + } + final String newSegmentName() { // Cannot synchronize on IndexWriter because that causes // deadlock synchronized(segmentInfos) { - // Important to set commitPending so that the + // Important to increment changeCount so that the // segmentInfos is written on close. Otherwise we // could close, re-open and re-return the same segment // name that was previously returned which can cause // problems at least with ConcurrentMergeScheduler. - commitPending = true; + changeCount++; return "_" + Integer.toString(segmentInfos.counter++, Character.MAX_RADIX); } } @@ -2158,7 +2207,7 @@ public class IndexWriter { if (infoStream != null) message("optimize: index now " + segString()); - flush(true, false); + flush(true, false, true); synchronized(this) { resetMergeExceptions(); @@ -2408,13 +2457,14 @@ public class IndexWriter { localRollbackSegmentInfos = (SegmentInfos) segmentInfos.clone(); localAutoCommit = autoCommit; + localFlushedDocCount = docWriter.getFlushedDocCount(); if (localAutoCommit) { if (infoStream != null) message("flush at startTransaction"); - flush(true, false); + flush(true, false, false); // Turn off auto-commit during our local transaction: autoCommit = false; @@ -2435,6 +2485,7 @@ public class IndexWriter { // First restore autoCommit in case we hit an exception below: autoCommit = localAutoCommit; + docWriter.setFlushedDocCount(localFlushedDocCount); // Keep the same segmentInfos instance but replace all // of its SegmentInfo instances. This is so the next @@ -2454,7 +2505,6 @@ public class IndexWriter { deleter.refresh(); finishMerges(false); - lastMergeInfo = null; stopMerges = false; } @@ -2477,7 +2527,7 @@ public class IndexWriter { if (autoCommit) { boolean success = false; try { - sync(true, 0); + commit(true, 0); success = true; } finally { if (!success) { @@ -2524,9 +2574,9 @@ public class IndexWriter { finishMerges(false); - // Must pre-close these two, in case they set - // commitPending=true, so that we can then set it to - // false before calling closeInternal + // Must pre-close these two, in case they increment + // changeCount so that we can then set it to false + // before calling closeInternal mergePolicy.close(); mergeScheduler.close(); @@ -2547,7 +2597,7 @@ public class IndexWriter { deleter.refresh(); } - commitPending = false; + lastCommitChangeCount = changeCount; closeInternal(false); } else waitForClose(); @@ -2614,7 +2664,7 @@ public class IndexWriter { * index directory. */ private synchronized void checkpoint() throws IOException { - commitPending = true; + changeCount++; deleter.checkpoint(segmentInfos, false); } @@ -2677,21 +2727,27 @@ public class IndexWriter { try { if (infoStream != null) message("flush at addIndexes"); - flush(true, false); + flush(true, false, true); boolean success = false; startTransaction(); try { + int docCount = 0; for (int i = 0; i < dirs.length; i++) { SegmentInfos sis = new SegmentInfos(); // read infos from dir sis.read(dirs[i]); for (int j = 0; j < sis.size(); j++) { - segmentInfos.addElement(sis.info(j)); // add each info + final SegmentInfo info = sis.info(j); + docCount += info.docCount; + segmentInfos.addElement(info); // add each info } } + // Notify DocumentsWriter that the flushed count just increased + docWriter.updateFlushedDocCount(docCount); + optimize(); success = true; @@ -2745,7 +2801,7 @@ public class IndexWriter { try { if (infoStream != null) message("flush at addIndexesNoOptimize"); - flush(true, false); + flush(true, false, true); boolean success = false; @@ -2753,6 +2809,7 @@ public class IndexWriter { try { + int docCount = 0; for (int i = 0; i < dirs.length; i++) { if (directory == dirs[i]) { // cannot add this index: segments may be deleted in merge before added @@ -2763,10 +2820,14 @@ public class IndexWriter { sis.read(dirs[i]); for (int j = 0; j < sis.size(); j++) { SegmentInfo info = sis.info(j); + docCount += info.docCount; segmentInfos.addElement(info); // add each info } } + // Notify DocumentsWriter that the flushed count just increased + docWriter.updateFlushedDocCount(docCount); + maybeMerge(); // If after merging there remain segments in the index @@ -2869,6 +2930,9 @@ public class IndexWriter { -1, null, false); segmentInfos.addElement(info); + // Notify DocumentsWriter that the flushed count just increased + docWriter.updateFlushedDocCount(docCount); + success = true; } finally { @@ -2931,7 +2995,7 @@ public class IndexWriter { * @deprecated please call {@link #commit}) instead */ public final void flush() throws CorruptIndexException, IOException { - flush(true, false); + flush(true, false, true); } /** @@ -2960,8 +3024,8 @@ public class IndexWriter { } private final void commit(boolean triggerMerges) throws CorruptIndexException, IOException { - flush(triggerMerges, true); - sync(true, 0); + flush(triggerMerges, true, true); + commit(true, 0); } /** @@ -2971,23 +3035,35 @@ public class IndexWriter { * deletes or docs were flushed) if necessary * @param flushDocStores if false we are allowed to keep * doc stores open to share with the next segment + * @param flushDeletes whether pending deletes should also + * be flushed */ - protected final void flush(boolean triggerMerge, boolean flushDocStores) throws CorruptIndexException, IOException { + protected final void flush(boolean triggerMerge, boolean flushDocStores, boolean flushDeletes) throws CorruptIndexException, IOException { ensureOpen(); - - if (doFlush(flushDocStores) && triggerMerge) + if (doFlush(flushDocStores, flushDeletes) && triggerMerge) maybeMerge(); } // TODO: this method should not have to be entirely // synchronized, ie, merges should be allowed to commit // even while a flush is happening - private synchronized final boolean doFlush(boolean flushDocStores) throws CorruptIndexException, IOException { + private synchronized final boolean doFlush(boolean flushDocStores, boolean flushDeletes) throws CorruptIndexException, IOException { // Make sure no threads are actively adding a document + assert testPoint("startDoFlush"); + flushCount++; + flushDeletes |= docWriter.deletesFull(); + + // When autoCommit=true we must always flush deletes + // when flushing a segment; otherwise deletes may become + // visible before their corresponding added document + // from an updateDocument call + if (autoCommit) + flushDeletes = true; + // Returns true if docWriter is currently aborting, in // which case we skip flushing this segment if (docWriter.pauseAllThreads()) { @@ -3011,15 +3087,6 @@ public class IndexWriter { if (docStoreSegment == null) flushDocStores = false; - // Always flush deletes if there are any delete terms. - // TODO: when autoCommit=false we don't have to flush - // deletes with every flushed segment; we can save - // CPU/IO by buffering longer & flushing deletes only - // when they are full or writer is being closed. We - // have to fix the "applyDeletesSelectively" logic to - // apply to more than just the last flushed segment - boolean flushDeletes = docWriter.hasDeletes(); - int docStoreOffset = docWriter.getDocStoreOffset(); // docStoreOffset should only be non-zero when @@ -3095,54 +3162,17 @@ public class IndexWriter { docStoreIsCompoundFile); } - if (flushDeletes) { - try { - SegmentInfos rollback = (SegmentInfos) segmentInfos.clone(); - - boolean success = false; - try { - // we should be able to change this so we can - // buffer deletes longer and then flush them to - // multiple flushed segments only when a commit() - // finally happens - applyDeletes(newSegment); - success = true; - } finally { - if (!success) { - if (infoStream != null) - message("hit exception flushing deletes"); - - // Carefully remove any partially written .del - // files - final int size = rollback.size(); - for(int i=0;i 0; @@ -3507,11 +3544,15 @@ public class IndexWriter { if (merge.isAborted()) return; + boolean changed = applyDeletes(); + + // If autoCommit == true then all deletes should have + // been flushed when we flushed the last segment + assert !changed || !autoCommit; + final SegmentInfos sourceSegments = merge.segments; final int end = sourceSegments.size(); - ensureContiguousMerge(merge); - // Check whether this merge will allow us to skip // merging the doc stores (stored field & vectors). // This is a very substantial optimization (saves tons @@ -3598,7 +3639,7 @@ public class IndexWriter { // make compound file out of them... if (infoStream != null) message("flush at merge"); - flush(false, true); + flush(false, true, false); } // We must take a full copy at this point so that we can @@ -3746,7 +3787,7 @@ public class IndexWriter { } } - if (!commitMerge(merge)) + if (!commitMerge(merge, merger, mergedDocCount)) // commitMerge will return false if this merge was aborted return 0; @@ -3759,7 +3800,7 @@ public class IndexWriter { synchronized(this) { size = merge.info.sizeInBytes(); } - sync(false, size); + commit(false, size); } success = false; @@ -3812,7 +3853,7 @@ public class IndexWriter { synchronized(this) { size = merge.info.sizeInBytes(); } - sync(false, size); + commit(false, size); } return mergedDocCount; @@ -3823,63 +3864,41 @@ public class IndexWriter { mergeExceptions.add(merge); } - // Called during flush to apply any buffered deletes. If - // flushedNewSegment is true then a new segment was just - // created and flushed from the ram segments, so we will - // selectively apply the deletes to that new segment. - private final void applyDeletes(SegmentInfo newSegment) throws CorruptIndexException, IOException { + // Apply buffered deletes to all segments. + private final synchronized boolean applyDeletes() throws CorruptIndexException, IOException { + assert testPoint("startApplyDeletes"); + SegmentInfos rollback = (SegmentInfos) segmentInfos.clone(); + boolean success = false; + boolean changed; + try { + changed = docWriter.applyDeletes(segmentInfos); + success = true; + } finally { + if (!success) { + if (infoStream != null) + message("hit exception flushing deletes"); - final HashMap bufferedDeleteTerms = docWriter.getBufferedDeleteTerms(); - final List bufferedDeleteDocIDs = docWriter.getBufferedDeleteDocIDs(); - - if (infoStream != null) - message("flush " + docWriter.getNumBufferedDeleteTerms() + " buffered deleted terms and " + - bufferedDeleteDocIDs.size() + " deleted docIDs on " - + segmentInfos.size() + " segments."); - - if (newSegment != null) { - IndexReader reader = null; - try { - // Open readers w/o opening the stored fields / - // vectors because these files may still be held - // open for writing by docWriter - reader = SegmentReader.get(newSegment, false); - - // Apply delete terms to the segment just flushed from ram - // apply appropriately so that a delete term is only applied to - // the documents buffered before it, not those buffered after it. - applyDeletesSelectively(bufferedDeleteTerms, bufferedDeleteDocIDs, reader); - } finally { - if (reader != null) { - try { - reader.doCommit(); - } finally { - reader.doClose(); - } + // Carefully remove any partially written .del + // files + final int size = rollback.size(); + for(int i=0;i= num) { - break; - } - reader.deleteDocument(doc); - } - } finally { - docs.close(); - } - } - } - - if (deleteIds.size() > 0) { - iter = deleteIds.iterator(); - while(iter.hasNext()) - reader.deleteDocument(((Integer) iter.next()).intValue()); - } - } - - // Apply buffered delete terms to this reader. - private final void applyDeletes(HashMap deleteTerms, IndexReader reader) - throws CorruptIndexException, IOException { - Iterator iter = deleteTerms.entrySet().iterator(); - while (iter.hasNext()) { - Entry entry = (Entry) iter.next(); - reader.deleteDocuments((Term) entry.getKey()); - } - } - // utility routines for tests SegmentInfo newestSegment() { return segmentInfos.info(segmentInfos.size()-1); } public synchronized String segString() { + return segString(segmentInfos); + } + + private synchronized String segString(SegmentInfos infos) { StringBuffer buffer = new StringBuffer(); - for(int i = 0; i < segmentInfos.size(); i++) { + final int count = infos.size(); + for(int i = 0; i < count; i++) { if (i > 0) { buffer.append(' '); } - buffer.append(segmentInfos.info(i).segString(directory)); + buffer.append(infos.info(i).segString(directory)); } - return buffer.toString(); } @@ -4042,68 +4020,49 @@ public class IndexWriter { * sync each file, if it wasn't already. If that * succeeds, then we write a new segments_N file & sync * that. */ - private void sync(boolean includeFlushes, long sizeInBytes) throws IOException { + private void commit(boolean skipWait, long sizeInBytes) throws IOException { + + assert testPoint("startCommit"); if (hitOOM) return; try { - message("start sync() includeFlushes=" + includeFlushes); + if (infoStream != null) + message("start commit() skipWait=" + skipWait + " sizeInBytes=" + sizeInBytes); - if (!includeFlushes) + if (!skipWait) syncPause(sizeInBytes); - // First, we clone & incref the segmentInfos we intend - // to sync, then, without locking, we sync() each file - // referenced by toSync, in the background. Multiple - // threads can be doing this at once, if say a large - // merge and a small merge finish at the same time: - SegmentInfos toSync = null; - final int mySyncCount; + final long myChangeCount; + synchronized(this) { - if (!commitPending) { - message(" skip sync(): no commit pending"); + assert lastCommitChangeCount <= changeCount; + + if (changeCount == lastCommitChangeCount) { + if (infoStream != null) + message(" skip commit(): no changes pending"); return; } - // Create the segmentInfos we want to sync, by copying - // the current one and possibly removing flushed - // segments: + // First, we clone & incref the segmentInfos we intend + // to sync, then, without locking, we sync() each file + // referenced by toSync, in the background. Multiple + // threads can be doing this at once, if say a large + // merge and a small merge finish at the same time: + toSync = (SegmentInfos) segmentInfos.clone(); - final int numSegmentsToSync = toSync.size(); - - boolean newCommitPending = false; - - if (!includeFlushes) { - // Do not sync flushes: - assert lastMergeInfo != null; - assert toSync.contains(lastMergeInfo); - int downTo = numSegmentsToSync-1; - while(!toSync.info(downTo).equals(lastMergeInfo)) { - message(" skip segment " + toSync.info(downTo).name); - toSync.remove(downTo); - downTo--; - newCommitPending = true; - } - - } else if (numSegmentsToSync > 0) - // Force all subsequent syncs to include up through - // the final info in the current segments. This - // ensure that a call to commit() will force another - // sync (due to merge finishing) to sync all flushed - // segments as well: - lastMergeInfo = toSync.info(numSegmentsToSync-1); - - mySyncCount = syncCount++; deleter.incRef(toSync, false); - - commitPending = newCommitPending; + myChangeCount = changeCount; } - boolean success0 = false; + if (infoStream != null) + message("commit index=" + segString(toSync)); + + assert testPoint("midCommit"); try { @@ -4143,12 +4102,14 @@ public class IndexWriter { break; } + assert testPoint("midCommit2"); + synchronized(this) { // If someone saved a newer version of segments file // since I first started syncing my version, I can // safely skip saving myself since I've been // superseded: - if (mySyncCount > syncCountSaved) { + if (myChangeCount > lastCommitChangeCount) { if (segmentInfos.getGeneration() > toSync.getGeneration()) toSync.updateGeneration(segmentInfos); @@ -4161,14 +4122,13 @@ public class IndexWriter { // Have our master segmentInfos record the // generations we just sync'd segmentInfos.updateGeneration(toSync); - if (!success) { - commitPending = true; + if (!success) message("hit exception committing segments file"); - } } + message("commit complete"); - syncCountSaved = mySyncCount; + lastCommitChangeCount = myChangeCount; deleter.checkpoint(toSync, true); setRollbackSegmentInfos(); @@ -4178,19 +4138,54 @@ public class IndexWriter { message("done all syncs"); - success0 = true; + assert testPoint("midCommitSuccess"); } finally { synchronized(this) { deleter.decRef(toSync); - if (!success0) - commitPending = true; } } } catch (OutOfMemoryError oom) { hitOOM = true; throw oom; } + assert testPoint("finishCommit"); + } + + /** + * Returns true iff the index in the named directory is + * currently locked. + * @param directory the directory to check for a lock + * @throws IOException if there is a low-level IO error + */ + public static boolean isLocked(Directory directory) throws IOException { + return directory.makeLock(WRITE_LOCK_NAME).isLocked(); + } + + /** + * Returns true iff the index in the named directory is + * currently locked. + * @param directory the directory to check for a lock + * @throws IOException if there is a low-level IO error + */ + public static boolean isLocked(String directory) throws IOException { + Directory dir = FSDirectory.getDirectory(directory); + try { + return isLocked(dir); + } finally { + dir.close(); + } + } + + /** + * Forcibly unlocks the index in the named directory. + *

+ * Caution: this should only be used by failure recovery code, + * when it is known that no other process nor thread is in fact + * currently accessing this index. + */ + public static void unlock(Directory directory) throws IOException { + directory.makeLock(IndexWriter.WRITE_LOCK_NAME).release(); } /** @@ -4245,7 +4240,17 @@ public class IndexWriter { } // Used only by assert for testing. Current points: - // "DocumentsWriter.ThreadState.init start" + // startDoFlush + // startCommitMerge + // startCommit + // midCommit + // midCommit2 + // midCommitSuccess + // finishCommit + // startCommitMergeDeletes + // startMergeInit + // startApplyDeletes + // DocumentsWriter.ThreadState.init start boolean testPoint(String name) { return true; } diff --git a/src/java/org/apache/lucene/index/MergeDocIDRemapper.java b/src/java/org/apache/lucene/index/MergeDocIDRemapper.java new file mode 100644 index 00000000000..cbbde20f7f0 --- /dev/null +++ b/src/java/org/apache/lucene/index/MergeDocIDRemapper.java @@ -0,0 +1,110 @@ +package org.apache.lucene.index; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** Remaps docIDs after a merge has completed, where the + * merged segments had at least one deletion. This is used + * to renumber the buffered deletes in IndexWriter when a + * merge of segments with deletions commits. */ + +final class MergeDocIDRemapper { + int[] starts; // used for binary search of mapped docID + int[] newStarts; // starts, minus the deletes + int[][] docMaps; // maps docIDs in the merged set + int minDocID; // minimum docID that needs renumbering + int maxDocID; // 1+ the max docID that needs renumbering + int docShift; // total # deleted docs that were compacted by this merge + + public MergeDocIDRemapper(SegmentInfos infos, int[][] docMaps, int[] delCounts, MergePolicy.OneMerge merge, int mergedDocCount) { + this.docMaps = docMaps; + SegmentInfo firstSegment = merge.segments.info(0); + int i = 0; + while(true) { + SegmentInfo info = infos.info(i); + if (info.equals(firstSegment)) + break; + minDocID += info.docCount; + i++; + } + + int numDocs = 0; + for(int j=0;j 0; + + // Make sure it all adds up: + assert docShift == maxDocID - (newStarts[docMaps.length-1] + merge.segments.info(docMaps.length-1).docCount - delCounts[docMaps.length-1]); + } + + public int remap(int oldDocID) { + if (oldDocID < minDocID) + // Unaffected by merge + return oldDocID; + else if (oldDocID >= maxDocID) + // This doc was "after" the merge, so simple shift + return oldDocID - docShift; + else { + // Binary search to locate this document & find its new docID + int lo = 0; // search starts array + int hi = docMaps.length - 1; // for first element less + + while (hi >= lo) { + int mid = (lo + hi) >> 1; + int midValue = starts[mid]; + if (oldDocID < midValue) + hi = mid - 1; + else if (oldDocID > midValue) + lo = mid + 1; + else { // found a match + while (mid+1 < docMaps.length && starts[mid+1] == midValue) { + mid++; // scan to last match + } + if (docMaps[mid] != null) + return newStarts[mid] + docMaps[mid][oldDocID-starts[mid]]; + else + return newStarts[mid] + oldDocID-starts[mid]; + } + } + if (docMaps[hi] != null) + return newStarts[hi] + docMaps[hi][oldDocID-starts[hi]]; + else + return newStarts[hi] + oldDocID-starts[hi]; + } + } +} diff --git a/src/java/org/apache/lucene/index/SegmentMerger.java b/src/java/org/apache/lucene/index/SegmentMerger.java index edeeaff7505..014cd74a099 100644 --- a/src/java/org/apache/lucene/index/SegmentMerger.java +++ b/src/java/org/apache/lucene/index/SegmentMerger.java @@ -436,10 +436,21 @@ final class SegmentMerger { private final void mergeTermInfos() throws CorruptIndexException, IOException { int base = 0; - for (int i = 0; i < readers.size(); i++) { + final int readerCount = readers.size(); + for (int i = 0; i < readerCount; i++) { IndexReader reader = (IndexReader) readers.elementAt(i); TermEnum termEnum = reader.terms(); SegmentMergeInfo smi = new SegmentMergeInfo(base, termEnum, reader); + int[] docMap = smi.getDocMap(); + if (docMap != null) { + if (docMaps == null) { + docMaps = new int[readerCount][]; + delCounts = new int[readerCount]; + } + docMaps[i] = docMap; + delCounts[i] = smi.reader.maxDoc() - smi.reader.numDocs(); + } + base += reader.numDocs(); if (smi.next()) queue.put(smi); // initialize queue @@ -504,7 +515,15 @@ final class SegmentMerger { return df; } - private byte[] payloadBuffer = null; + private byte[] payloadBuffer; + private int[][] docMaps; + int[][] getDocMaps() { + return docMaps; + } + private int[] delCounts; + int[] getDelCounts() { + return delCounts; + } /** Process postings from multiple segments all positioned on the * same term. Writes out merged entries into freqOutput and diff --git a/src/java/org/apache/lucene/index/TermVectorsReader.java b/src/java/org/apache/lucene/index/TermVectorsReader.java index 54944158a76..533a716661c 100644 --- a/src/java/org/apache/lucene/index/TermVectorsReader.java +++ b/src/java/org/apache/lucene/index/TermVectorsReader.java @@ -91,6 +91,7 @@ class TermVectorsReader implements Cloneable { if (-1 == docStoreOffset) { this.docStoreOffset = 0; this.size = numTotalDocs; + assert size == 0 || numTotalDocs == size; } else { this.docStoreOffset = docStoreOffset; this.size = size; @@ -176,6 +177,7 @@ class TermVectorsReader implements Cloneable { } else { tvdPosition = tvd.length(); tvfPosition = tvf.length(); + assert count == numDocs-1; } tvdLengths[count] = (int) (tvdPosition-lastTvdPosition); tvfLengths[count] = (int) (tvfPosition-lastTvfPosition); diff --git a/src/test/org/apache/lucene/index/TestAddIndexesNoOptimize.java b/src/test/org/apache/lucene/index/TestAddIndexesNoOptimize.java index 781579f826f..0c758104494 100755 --- a/src/test/org/apache/lucene/index/TestAddIndexesNoOptimize.java +++ b/src/test/org/apache/lucene/index/TestAddIndexesNoOptimize.java @@ -24,11 +24,11 @@ import org.apache.lucene.util.LuceneTestCase; import org.apache.lucene.analysis.WhitespaceAnalyzer; import org.apache.lucene.document.Document; import org.apache.lucene.document.Field; -import org.apache.lucene.index.IndexReader; -import org.apache.lucene.index.IndexWriter; import org.apache.lucene.store.Directory; import org.apache.lucene.store.RAMDirectory; +import org.apache.lucene.search.PhraseQuery; + public class TestAddIndexesNoOptimize extends LuceneTestCase { public void testSimpleCase() throws IOException { // main directory @@ -122,6 +122,118 @@ public class TestAddIndexesNoOptimize extends LuceneTestCase { verifyTermDocs(dir, new Term("content", "bbb"), 51); } + public void testWithPendingDeletes() throws IOException { + // main directory + Directory dir = new RAMDirectory(); + // auxiliary directory + Directory aux = new RAMDirectory(); + + setUpDirs(dir, aux); + IndexWriter writer = newWriter(dir, false); + writer.addIndexesNoOptimize(new Directory[] {aux}); + + // Adds 10 docs, then replaces them with another 10 + // docs, so 10 pending deletes: + for (int i = 0; i < 20; i++) { + Document doc = new Document(); + doc.add(new Field("id", "" + (i % 10), Field.Store.NO, Field.Index.UN_TOKENIZED)); + doc.add(new Field("content", "bbb " + i, Field.Store.NO, + Field.Index.TOKENIZED)); + writer.updateDocument(new Term("id", "" + (i%10)), doc); + } + // Deletes one of the 10 added docs, leaving 9: + PhraseQuery q = new PhraseQuery(); + q.add(new Term("content", "bbb")); + q.add(new Term("content", "14")); + writer.deleteDocuments(q); + + writer.optimize(); + + verifyNumDocs(dir, 1039); + verifyTermDocs(dir, new Term("content", "aaa"), 1030); + verifyTermDocs(dir, new Term("content", "bbb"), 9); + + writer.close(); + dir.close(); + aux.close(); + } + + public void testWithPendingDeletes2() throws IOException { + // main directory + Directory dir = new RAMDirectory(); + // auxiliary directory + Directory aux = new RAMDirectory(); + + setUpDirs(dir, aux); + IndexWriter writer = newWriter(dir, false); + + // Adds 10 docs, then replaces them with another 10 + // docs, so 10 pending deletes: + for (int i = 0; i < 20; i++) { + Document doc = new Document(); + doc.add(new Field("id", "" + (i % 10), Field.Store.NO, Field.Index.UN_TOKENIZED)); + doc.add(new Field("content", "bbb " + i, Field.Store.NO, + Field.Index.TOKENIZED)); + writer.updateDocument(new Term("id", "" + (i%10)), doc); + } + + writer.addIndexesNoOptimize(new Directory[] {aux}); + + // Deletes one of the 10 added docs, leaving 9: + PhraseQuery q = new PhraseQuery(); + q.add(new Term("content", "bbb")); + q.add(new Term("content", "14")); + writer.deleteDocuments(q); + + writer.optimize(); + + verifyNumDocs(dir, 1039); + verifyTermDocs(dir, new Term("content", "aaa"), 1030); + verifyTermDocs(dir, new Term("content", "bbb"), 9); + + writer.close(); + dir.close(); + aux.close(); + } + + public void testWithPendingDeletes3() throws IOException { + // main directory + Directory dir = new RAMDirectory(); + // auxiliary directory + Directory aux = new RAMDirectory(); + + setUpDirs(dir, aux); + IndexWriter writer = newWriter(dir, false); + + // Adds 10 docs, then replaces them with another 10 + // docs, so 10 pending deletes: + for (int i = 0; i < 20; i++) { + Document doc = new Document(); + doc.add(new Field("id", "" + (i % 10), Field.Store.NO, Field.Index.UN_TOKENIZED)); + doc.add(new Field("content", "bbb " + i, Field.Store.NO, + Field.Index.TOKENIZED)); + writer.updateDocument(new Term("id", "" + (i%10)), doc); + } + + // Deletes one of the 10 added docs, leaving 9: + PhraseQuery q = new PhraseQuery(); + q.add(new Term("content", "bbb")); + q.add(new Term("content", "14")); + writer.deleteDocuments(q); + + writer.addIndexesNoOptimize(new Directory[] {aux}); + + writer.optimize(); + + verifyNumDocs(dir, 1039); + verifyTermDocs(dir, new Term("content", "aaa"), 1030); + verifyTermDocs(dir, new Term("content", "bbb"), 9); + + writer.close(); + dir.close(); + aux.close(); + } + // case 0: add self or exceed maxMergeDocs, expect exception public void testAddSelf() throws IOException { // main directory diff --git a/src/test/org/apache/lucene/index/TestAtomicUpdate.java b/src/test/org/apache/lucene/index/TestAtomicUpdate.java index 1693eb62af2..329759fffff 100644 --- a/src/test/org/apache/lucene/index/TestAtomicUpdate.java +++ b/src/test/org/apache/lucene/index/TestAtomicUpdate.java @@ -25,11 +25,26 @@ import org.apache.lucene.queryParser.*; import java.util.Random; import java.io.File; +import java.io.IOException; public class TestAtomicUpdate extends LuceneTestCase { private static final Analyzer ANALYZER = new SimpleAnalyzer(); private static final Random RANDOM = new Random(); + public class MockIndexWriter extends IndexWriter { + + public MockIndexWriter(Directory dir, boolean autoCommit, Analyzer a, boolean create, MaxFieldLength mfl) throws IOException { + super(dir, autoCommit, a, create, mfl); + } + + boolean testPoint(String name) { + // if (name.equals("startCommit")) { + if (RANDOM.nextInt(4) == 2) + Thread.yield(); + return true; + } + } + private static abstract class TimedThread extends Thread { boolean failed; int count; @@ -113,7 +128,9 @@ public class TestAtomicUpdate extends LuceneTestCase { TimedThread[] threads = new TimedThread[4]; - IndexWriter writer = new IndexWriter(directory, ANALYZER, true, IndexWriter.MaxFieldLength.LIMITED); + IndexWriter writer = new MockIndexWriter(directory, true, ANALYZER, true, IndexWriter.MaxFieldLength.LIMITED); + writer.setMaxBufferedDocs(7); + writer.setMergeFactor(3); // Establish a base index of 100 docs: for(int i=0;i<100;i++) { diff --git a/src/test/org/apache/lucene/index/TestIndexWriter.java b/src/test/org/apache/lucene/index/TestIndexWriter.java index 2fa5d7b2c69..f610d05fc01 100644 --- a/src/test/org/apache/lucene/index/TestIndexWriter.java +++ b/src/test/org/apache/lucene/index/TestIndexWriter.java @@ -1353,7 +1353,7 @@ public class TestIndexWriter extends LuceneTestCase assertTrue(flushCount > lastFlushCount); lastFlushCount = flushCount; writer.setRAMBufferSizeMB(0.000001); - writer.setMaxBufferedDeleteTerms(IndexWriter.DISABLE_AUTO_FLUSH); + writer.setMaxBufferedDeleteTerms(1); } else if (j < 20) { assertTrue(flushCount > lastFlushCount); lastFlushCount = flushCount; @@ -1366,6 +1366,7 @@ public class TestIndexWriter extends LuceneTestCase } else if (30 == j) { writer.setRAMBufferSizeMB(0.000001); writer.setMaxBufferedDeleteTerms(IndexWriter.DISABLE_AUTO_FLUSH); + writer.setMaxBufferedDeleteTerms(1); } else if (j < 40) { assertTrue(flushCount> lastFlushCount); lastFlushCount = flushCount; @@ -1554,7 +1555,7 @@ public class TestIndexWriter extends LuceneTestCase doc.add(new Field("field", "aaa", Field.Store.YES, Field.Index.TOKENIZED, Field.TermVector.WITH_POSITIONS_OFFSETS)); for(int i=0;i<19;i++) writer.addDocument(doc); - writer.flush(false, true); + writer.flush(false, true, true); writer.close(); SegmentInfos sis = new SegmentInfos(); sis.read(dir); diff --git a/src/test/org/apache/lucene/index/TestIndexWriterDelete.java b/src/test/org/apache/lucene/index/TestIndexWriterDelete.java index 7366ce703af..85b7dd23d09 100644 --- a/src/test/org/apache/lucene/index/TestIndexWriterDelete.java +++ b/src/test/org/apache/lucene/index/TestIndexWriterDelete.java @@ -107,7 +107,6 @@ public class TestIndexWriterDelete extends LuceneTestCase { assertEquals(7, reader.numDocs()); reader.close(); - modifier.deleteDocuments(new Term("value", String.valueOf(value))); modifier.deleteDocuments(new Term("value", String.valueOf(value))); modifier.commit(); @@ -120,44 +119,68 @@ public class TestIndexWriterDelete extends LuceneTestCase { } } - // test when delete terms only apply to ram segments - public void testRAMDeletes() throws IOException { + public void testMaxBufferedDeletes() throws IOException { for(int pass=0;pass<2;pass++) { boolean autoCommit = (0==pass); Directory dir = new MockRAMDirectory(); - IndexWriter modifier = new IndexWriter(dir, autoCommit, - new WhitespaceAnalyzer(), true, IndexWriter.MaxFieldLength.LIMITED); - modifier.setMaxBufferedDocs(4); - modifier.setMaxBufferedDeleteTerms(4); - - int id = 0; - int value = 100; - - addDoc(modifier, ++id, value); - modifier.deleteDocuments(new Term("value", String.valueOf(value))); - addDoc(modifier, ++id, value); - modifier.deleteDocuments(new Term("value", String.valueOf(value))); - - assertEquals(2, modifier.getNumBufferedDeleteTerms()); - assertEquals(1, modifier.getBufferedDeleteTermsSize()); - - addDoc(modifier, ++id, value); - assertEquals(0, modifier.getSegmentCount()); - modifier.flush(); - - modifier.commit(); - - IndexReader reader = IndexReader.open(dir); - assertEquals(1, reader.numDocs()); - - int hitCount = getHitCount(dir, new Term("id", String.valueOf(id))); - assertEquals(1, hitCount); - reader.close(); - modifier.close(); + IndexWriter writer = new IndexWriter(dir, autoCommit, + new WhitespaceAnalyzer(), true, IndexWriter.MaxFieldLength.LIMITED); + writer.setMaxBufferedDeleteTerms(1); + writer.deleteDocuments(new Term("foobar", "1")); + writer.deleteDocuments(new Term("foobar", "1")); + writer.deleteDocuments(new Term("foobar", "1")); + assertEquals(3, writer.getFlushDeletesCount()); + writer.close(); dir.close(); } } + // test when delete terms only apply to ram segments + public void testRAMDeletes() throws IOException { + for(int pass=0;pass<2;pass++) { + for(int t=0;t<2;t++) { + boolean autoCommit = (0==pass); + Directory dir = new MockRAMDirectory(); + IndexWriter modifier = new IndexWriter(dir, autoCommit, + new WhitespaceAnalyzer(), true, IndexWriter.MaxFieldLength.LIMITED); + modifier.setMaxBufferedDocs(4); + modifier.setMaxBufferedDeleteTerms(4); + + int id = 0; + int value = 100; + + addDoc(modifier, ++id, value); + if (0 == t) + modifier.deleteDocuments(new Term("value", String.valueOf(value))); + else + modifier.deleteDocuments(new TermQuery(new Term("value", String.valueOf(value)))); + addDoc(modifier, ++id, value); + if (0 == t) { + modifier.deleteDocuments(new Term("value", String.valueOf(value))); + assertEquals(2, modifier.getNumBufferedDeleteTerms()); + assertEquals(1, modifier.getBufferedDeleteTermsSize()); + } + else + modifier.deleteDocuments(new TermQuery(new Term("value", String.valueOf(value)))); + + addDoc(modifier, ++id, value); + assertEquals(0, modifier.getSegmentCount()); + modifier.flush(); + + modifier.commit(); + + IndexReader reader = IndexReader.open(dir); + assertEquals(1, reader.numDocs()); + + int hitCount = getHitCount(dir, new Term("id", String.valueOf(id))); + assertEquals(1, hitCount); + reader.close(); + modifier.close(); + dir.close(); + } + } + } + // test when delete terms apply to both disk and ram segments public void testBothDeletes() throws IOException { for(int pass=0;pass<2;pass++) { @@ -306,6 +329,7 @@ public class TestIndexWriterDelete extends LuceneTestCase { // Iterate w/ ever increasing free disk space: while (!done) { MockRAMDirectory dir = new MockRAMDirectory(startDir); + dir.setPreventDoubleWrite(false); IndexWriter modifier = new IndexWriter(dir, autoCommit, new WhitespaceAnalyzer(), IndexWriter.MaxFieldLength.LIMITED); diff --git a/src/test/org/apache/lucene/index/TestStressIndexing2.java b/src/test/org/apache/lucene/index/TestStressIndexing2.java index 32a595592d7..4d3b27b895b 100644 --- a/src/test/org/apache/lucene/index/TestStressIndexing2.java +++ b/src/test/org/apache/lucene/index/TestStressIndexing2.java @@ -38,6 +38,19 @@ public class TestStressIndexing2 extends LuceneTestCase { static Random r = new Random(0); + public class MockIndexWriter extends IndexWriter { + + public MockIndexWriter(Directory dir, boolean autoCommit, Analyzer a, boolean create, MaxFieldLength mfl) throws IOException { + super(dir, autoCommit, a, create, mfl); + } + + boolean testPoint(String name) { + // if (name.equals("startCommit")) { + if (r.nextInt(4) == 2) + Thread.yield(); + return true; + } + } public void testRandom() throws Exception { Directory dir1 = new MockRAMDirectory(); @@ -99,7 +112,7 @@ public class TestStressIndexing2 extends LuceneTestCase { // everything. public Map indexRandom(int nThreads, int iterations, int range, Directory dir) throws IOException, InterruptedException { - IndexWriter w = new IndexWriter(dir, autoCommit, new WhitespaceAnalyzer(), true, IndexWriter.MaxFieldLength.UNLIMITED); + IndexWriter w = new MockIndexWriter(dir, autoCommit, new WhitespaceAnalyzer(), true, IndexWriter.MaxFieldLength.UNLIMITED); w.setUseCompoundFile(false); /*** w.setMaxMergeDocs(Integer.MAX_VALUE);