From e7179ea8666139ffa4fb6c52779ffa3e0d1d4c8b Mon Sep 17 00:00:00 2001 From: Michael McCandless Date: Mon, 10 Sep 2007 14:33:51 +0000 Subject: [PATCH] LUCENE-992: move buffered deletes into DocumentsWriter so IndexWriter.updateDocument is atomic git-svn-id: https://svn.apache.org/repos/asf/lucene/java/trunk@574260 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 3 + .../apache/lucene/index/DocumentsWriter.java | 129 +++++++++++- .../org/apache/lucene/index/IndexWriter.java | 161 +++++---------- .../apache/lucene/index/TestAtomicUpdate.java | 184 ++++++++++++++++++ .../lucene/index/TestIndexWriterDelete.java | 29 ++- 5 files changed, 381 insertions(+), 125 deletions(-) create mode 100644 src/test/org/apache/lucene/index/TestAtomicUpdate.java diff --git a/CHANGES.txt b/CHANGES.txt index b5319a09575..4c7fd439102 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -83,6 +83,9 @@ Bug fixes 13. LUCENE-991: The explain() method of BoostingTermQuery had errors when no payloads were present on a document. (Peter Keegan via Grant Ingersoll) +14. LUCENE-992: Fixed IndexWriter.updateDocument to be atomic again + (this was broken by LUCENE-843). (Ning Li via Mike McCandless) + New features 1. LUCENE-906: Elision filter for French. diff --git a/src/java/org/apache/lucene/index/DocumentsWriter.java b/src/java/org/apache/lucene/index/DocumentsWriter.java index acf8a8a21c6..26b50ec4d11 100644 --- a/src/java/org/apache/lucene/index/DocumentsWriter.java +++ b/src/java/org/apache/lucene/index/DocumentsWriter.java @@ -129,6 +129,16 @@ final class DocumentsWriter { private PrintStream infoStream; + // This Hashmap buffers delete terms in ram before they + // are applied. The key is delete term; the value is + // number of buffered documents the term applies to. + private HashMap bufferedDeleteTerms = new HashMap(); + private int numBufferedDeleteTerms = 0; + + // The max number of delete terms that can be buffered before + // they must be flushed to disk. + private int maxBufferedDeleteTerms = IndexWriter.DEFAULT_MAX_BUFFERED_DELETE_TERMS; + // How much RAM we can use before flushing. This is 0 if // we are flushing by doc count instead. private long ramBufferSize = (long) (IndexWriter.DEFAULT_RAM_BUFFER_SIZE_MB*1024*1024); @@ -265,8 +275,8 @@ final class DocumentsWriter { /** Called if we hit an exception when adding docs, * flushing, etc. This resets our state, discarding any - * * docs added since last flush. */ - void abort() throws IOException { + * docs added since last flush. */ + synchronized void abort() throws IOException { // Forcefully remove waiting ThreadStates from line for(int i=0;i= maxBufferedDeleteTerms && setFlushPending(); + } + + void setMaxBufferedDeleteTerms(int maxBufferedDeleteTerms) { + if (maxBufferedDeleteTerms < 1) + throw new IllegalArgumentException("maxBufferedDeleteTerms must at least be 1"); + this.maxBufferedDeleteTerms = maxBufferedDeleteTerms; + } + + int getMaxBufferedDeleteTerms() { + return maxBufferedDeleteTerms; + } + + synchronized boolean hasDeletes() { + return bufferedDeleteTerms.size() > 0; + } + + // Number of documents a delete term applies to. + static class Num { + private int num; + + Num(int num) { + this.num = num; + } + + int getNum() { + return num; + } + + void setNum(int num) { + // Only record the new number if it's greater than the + // current one. This is important because if multiple + // threads are replacing the same doc at nearly the + // same time, it's possible that one thread that got a + // higher docID is scheduled before the other + // threads. + if (num > this.num) + this.num = num; + } + } + + // Buffer a term in bufferedDeleteTerms, which records the + // current number of documents buffered in ram so that the + // delete term will be applied to those documents as well + // as the disk segments. + synchronized private void addDeleteTerm(Term term, int docCount) { + Num num = (Num) bufferedDeleteTerms.get(term); + if (num == null) { + bufferedDeleteTerms.put(term, new Num(docCount)); + } else { + num.setNum(docCount); + } + numBufferedDeleteTerms++; } /** Does the synchronized work to finish/flush the diff --git a/src/java/org/apache/lucene/index/IndexWriter.java b/src/java/org/apache/lucene/index/IndexWriter.java index 60244bb78df..52f4fdf2475 100644 --- a/src/java/org/apache/lucene/index/IndexWriter.java +++ b/src/java/org/apache/lucene/index/IndexWriter.java @@ -247,16 +247,6 @@ public class IndexWriter { private int termIndexInterval = DEFAULT_TERM_INDEX_INTERVAL; - // The max number of delete terms that can be buffered before - // they must be flushed to disk. - private int maxBufferedDeleteTerms = DEFAULT_MAX_BUFFERED_DELETE_TERMS; - - // This Hashmap buffers delete terms in ram before they are applied. - // The key is delete term; the value is number of ram - // segments the term applies to. - private HashMap bufferedDeleteTerms = new HashMap(); - private int numBufferedDeleteTerms = 0; - /** Use compound file setting. Defaults to true, minimizing the number of * files used. Setting this to false may improve indexing performance, but * may also cause file handle problems. @@ -773,9 +763,7 @@ public class IndexWriter { */ public void setMaxBufferedDeleteTerms(int maxBufferedDeleteTerms) { ensureOpen(); - if (maxBufferedDeleteTerms < 1) - throw new IllegalArgumentException("maxBufferedDeleteTerms must at least be 1"); - this.maxBufferedDeleteTerms = maxBufferedDeleteTerms; + docWriter.setMaxBufferedDeleteTerms(maxBufferedDeleteTerms); } /** @@ -785,7 +773,7 @@ public class IndexWriter { */ public int getMaxBufferedDeleteTerms() { ensureOpen(); - return maxBufferedDeleteTerms; + return docWriter.getMaxBufferedDeleteTerms(); } /** Determines how often segment indices are merged by addDocument(). With @@ -1134,10 +1122,11 @@ public class IndexWriter { * @throws CorruptIndexException if the index is corrupt * @throws IOException if there is a low-level IO error */ - public synchronized void deleteDocuments(Term term) throws CorruptIndexException, IOException { + public void deleteDocuments(Term term) throws CorruptIndexException, IOException { ensureOpen(); - bufferDeleteTerm(term); - maybeFlush(); + boolean doFlush = docWriter.bufferDeleteTerm(term); + if (doFlush) + flush(true, false); } /** @@ -1148,12 +1137,11 @@ public class IndexWriter { * @throws CorruptIndexException if the index is corrupt * @throws IOException if there is a low-level IO error */ - public synchronized void deleteDocuments(Term[] terms) throws CorruptIndexException, IOException { + public void deleteDocuments(Term[] terms) throws CorruptIndexException, IOException { ensureOpen(); - for (int i = 0; i < terms.length; i++) { - bufferDeleteTerm(terms[i]); - } - maybeFlush(); + boolean doFlush = docWriter.bufferDeleteTerms(terms); + if (doFlush) + flush(true, false); } /** @@ -1189,20 +1177,15 @@ public class IndexWriter { public void updateDocument(Term term, Document doc, Analyzer analyzer) throws CorruptIndexException, IOException { ensureOpen(); - synchronized (this) { - bufferDeleteTerm(term); - } - boolean success = false; + boolean doFlush = false; try { - success = docWriter.addDocument(doc, analyzer); + doFlush = docWriter.updateDocument(term, doc, analyzer); } catch (IOException ioe) { deleter.refresh(); throw ioe; } - if (success) + if (doFlush) flush(true, false); - else - maybeFlush(); } // for test purpose @@ -1357,7 +1340,7 @@ public class IndexWriter { */ private void startTransaction() throws IOException { - assert numBufferedDeleteTerms == 0 : + assert docWriter.getNumBufferedDeleteTerms() == 0 : "calling startTransaction with buffered delete terms not supported"; assert docWriter.getNumDocsInRAM() == 0 : "calling startTransaction with buffered documents not supported"; @@ -1462,9 +1445,6 @@ public class IndexWriter { deleter.checkpoint(segmentInfos, false); deleter.refresh(); - bufferedDeleteTerms.clear(); - numBufferedDeleteTerms = 0; - commitPending = false; docWriter.abort(); close(); @@ -1845,20 +1825,6 @@ public class IndexWriter { throws IOException { } - /** - * Used internally to trigger a flush if the number of - * buffered added documents or buffered deleted terms are - * large enough. - */ - protected final synchronized void maybeFlush() throws CorruptIndexException, IOException { - // We only check for flush due to number of buffered - // delete terms, because triggering of a flush due to - // too many added documents is handled by - // DocumentsWriter - if (numBufferedDeleteTerms >= maxBufferedDeleteTerms && docWriter.setFlushPending()) - flush(true, false); - } - /** * Flush all in-memory buffered updates (adds and deletes) * to the Directory. @@ -1908,7 +1874,7 @@ public class IndexWriter { // when they are full or writer is being closed. We // have to fix the "applyDeletesSelectively" logic to // apply to more than just the last flushed segment - boolean flushDeletes = bufferedDeleteTerms.size() > 0; + boolean flushDeletes = docWriter.hasDeletes(); if (infoStream != null) infoStream.println(" flush: flushDocs=" + flushDocs + @@ -1938,9 +1904,6 @@ public class IndexWriter { SegmentInfos rollback = null; - HashMap saveBufferedDeleteTerms = null; - int saveNumBufferedDeleteTerms = 0; - if (flushDeletes) rollback = (SegmentInfos) segmentInfos.clone(); @@ -1975,9 +1938,9 @@ public class IndexWriter { // buffer deletes longer and then flush them to // multiple flushed segments, when // autoCommit=false - saveBufferedDeleteTerms = bufferedDeleteTerms; - saveNumBufferedDeleteTerms = numBufferedDeleteTerms; - applyDeletes(flushDocs); + int delCount = applyDeletes(flushDocs); + if (infoStream != null) + infoStream.println("flushed " + delCount + " deleted documents"); doAfterFlush(); } @@ -1991,11 +1954,6 @@ public class IndexWriter { // SegmentInfo instances: segmentInfos.clear(); segmentInfos.addAll(rollback); - - if (saveBufferedDeleteTerms != null) { - numBufferedDeleteTerms = saveNumBufferedDeleteTerms; - bufferedDeleteTerms = saveBufferedDeleteTerms; - } } else { // Remove segment we added, if any: @@ -2319,11 +2277,14 @@ public class IndexWriter { // flushedNewSegment is true then a new segment was just // created and flushed from the ram segments, so we will // selectively apply the deletes to that new segment. - private final void applyDeletes(boolean flushedNewSegment) throws CorruptIndexException, IOException { + private final int applyDeletes(boolean flushedNewSegment) throws CorruptIndexException, IOException { + final HashMap bufferedDeleteTerms = docWriter.getBufferedDeleteTerms(); + + int delCount = 0; if (bufferedDeleteTerms.size() > 0) { if (infoStream != null) - infoStream.println("flush " + numBufferedDeleteTerms + " buffered deleted terms on " + infoStream.println("flush " + docWriter.getNumBufferedDeleteTerms() + " buffered deleted terms on " + segmentInfos.size() + " segments."); if (flushedNewSegment) { @@ -2337,7 +2298,7 @@ public class IndexWriter { // Apply delete terms to the segment just flushed from ram // apply appropriately so that a delete term is only applied to // the documents buffered before it, not those buffered after it. - applyDeletesSelectively(bufferedDeleteTerms, reader); + delCount += applyDeletesSelectively(bufferedDeleteTerms, reader); } finally { if (reader != null) { try { @@ -2361,7 +2322,7 @@ public class IndexWriter { // Apply delete terms to disk segments // except the one just flushed from ram. - applyDeletes(bufferedDeleteTerms, reader); + delCount += applyDeletes(bufferedDeleteTerms, reader); } finally { if (reader != null) { try { @@ -2374,15 +2335,10 @@ public class IndexWriter { } // Clean up bufferedDeleteTerms. - - // Rollbacks of buffered deletes are based on restoring the old - // map, so don't modify this one. Rare enough that the gc - // overhead is almost certainly lower than the alternate, which - // would be clone to support rollback. - - bufferedDeleteTerms = new HashMap(); - numBufferedDeleteTerms = 0; + docWriter.clearBufferedDeleteTerms(); } + + return delCount; } private final boolean checkNonDecreasingLevels(int start) { @@ -2410,59 +2366,28 @@ public class IndexWriter { // For test purposes. final synchronized int getBufferedDeleteTermsSize() { - return bufferedDeleteTerms.size(); + return docWriter.getBufferedDeleteTerms().size(); } // For test purposes. final synchronized int getNumBufferedDeleteTerms() { - return numBufferedDeleteTerms; - } - - // Number of ram segments a delete term applies to. - private static class Num { - private int num; - - Num(int num) { - this.num = num; - } - - int getNum() { - return num; - } - - void setNum(int num) { - this.num = num; - } - } - - // Buffer a term in bufferedDeleteTerms, which records the - // current number of documents buffered in ram so that the - // delete term will be applied to those ram segments as - // well as the disk segments. - private void bufferDeleteTerm(Term term) { - Num num = (Num) bufferedDeleteTerms.get(term); - int numDoc = docWriter.getNumDocsInRAM(); - if (num == null) { - bufferedDeleteTerms.put(term, new Num(numDoc)); - } else { - num.setNum(numDoc); - } - numBufferedDeleteTerms++; + return docWriter.getNumBufferedDeleteTerms(); } // Apply buffered delete terms to the segment just flushed from ram // apply appropriately so that a delete term is only applied to // the documents buffered before it, not those buffered after it. - private final void applyDeletesSelectively(HashMap deleteTerms, + private final int applyDeletesSelectively(HashMap deleteTerms, IndexReader reader) throws CorruptIndexException, IOException { Iterator iter = deleteTerms.entrySet().iterator(); + int delCount = 0; while (iter.hasNext()) { Entry entry = (Entry) iter.next(); Term term = (Term) entry.getKey(); TermDocs docs = reader.termDocs(term); if (docs != null) { - int num = ((Num) entry.getValue()).getNum(); + int num = ((DocumentsWriter.Num) entry.getValue()).getNum(); try { while (docs.next()) { int doc = docs.doc(); @@ -2470,21 +2395,37 @@ public class IndexWriter { break; } reader.deleteDocument(doc); + delCount++; } } finally { docs.close(); } } } + return delCount; } // Apply buffered delete terms to this reader. - private final void applyDeletes(HashMap deleteTerms, IndexReader reader) + private final int applyDeletes(HashMap deleteTerms, IndexReader reader) throws CorruptIndexException, IOException { Iterator iter = deleteTerms.entrySet().iterator(); + int delCount = 0; while (iter.hasNext()) { Entry entry = (Entry) iter.next(); - reader.deleteDocuments((Term) entry.getKey()); + delCount += reader.deleteDocuments((Term) entry.getKey()); } + return delCount; + } + + public synchronized String segString() { + StringBuffer buffer = new StringBuffer(); + for(int i = 0; i < segmentInfos.size(); i++) { + if (i > 0) { + buffer.append(' '); + } + buffer.append(segmentInfos.info(i).name + ":" + segmentInfos.info(i).docCount); + } + + return buffer.toString(); } } diff --git a/src/test/org/apache/lucene/index/TestAtomicUpdate.java b/src/test/org/apache/lucene/index/TestAtomicUpdate.java new file mode 100644 index 00000000000..a1de0febd2a --- /dev/null +++ b/src/test/org/apache/lucene/index/TestAtomicUpdate.java @@ -0,0 +1,184 @@ +package org.apache.lucene.index; + +/** + * Copyright 2004 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.lucene.util.*; +import org.apache.lucene.store.*; +import org.apache.lucene.document.*; +import org.apache.lucene.analysis.*; +import org.apache.lucene.index.*; +import org.apache.lucene.search.*; +import org.apache.lucene.queryParser.*; +import org.apache.lucene.util._TestUtil; + +import junit.framework.TestCase; + +import java.util.Random; +import java.io.File; + +public class TestAtomicUpdate extends TestCase { + private static final Analyzer ANALYZER = new SimpleAnalyzer(); + private static final Random RANDOM = new Random(); + + private static abstract class TimedThread extends Thread { + boolean failed; + int count; + private static int RUN_TIME_SEC = 3; + private TimedThread[] allThreads; + + abstract public void doWork() throws Throwable; + + TimedThread(TimedThread[] threads) { + this.allThreads = threads; + } + + public void run() { + final long stopTime = System.currentTimeMillis() + 1000*RUN_TIME_SEC; + + count = 0; + + try { + while(System.currentTimeMillis() < stopTime && !anyErrors()) { + doWork(); + count++; + } + } catch (Throwable e) { + e.printStackTrace(System.out); + failed = true; + } + } + + private boolean anyErrors() { + for(int i=0;i