diff --git a/src/java/org/apache/lucene/index/CompoundFileWriter.java b/src/java/org/apache/lucene/index/CompoundFileWriter.java index c882f892bfe..fc8d2c82aaf 100644 --- a/src/java/org/apache/lucene/index/CompoundFileWriter.java +++ b/src/java/org/apache/lucene/index/CompoundFileWriter.java @@ -68,18 +68,22 @@ final class CompoundFileWriter { private HashSet ids; private LinkedList entries; private boolean merged = false; - + private SegmentMerger.CheckAbort checkAbort; /** Create the compound stream in the specified file. The file name is the * entire name (no extensions are added). * @throws NullPointerException if dir or name is null */ public CompoundFileWriter(Directory dir, String name) { + this(dir, name, null); + } + + CompoundFileWriter(Directory dir, String name, SegmentMerger.CheckAbort checkAbort) { if (dir == null) throw new NullPointerException("directory cannot be null"); if (name == null) throw new NullPointerException("name cannot be null"); - + this.checkAbort = checkAbort; directory = dir; fileName = name; ids = new HashSet(); @@ -211,6 +215,10 @@ final class CompoundFileWriter { is.readBytes(buffer, 0, len); os.writeBytes(buffer, len); remainder -= len; + if (checkAbort != null) + // Roughly every 2 MB we will check if + // it's time to abort + checkAbort.work(80); } // Verify that remainder is 0 diff --git a/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java b/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java index 1683bac5c53..e3e0b60f0ab 100644 --- a/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java +++ b/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java @@ -251,18 +251,15 @@ public class ConcurrentMergeScheduler extends MergeScheduler { message(" merge thread: done"); - } catch (Throwable exc) { - // When a merge was aborted & IndexWriter closed, - // it's possible to get various IOExceptions, - // NullPointerExceptions, AlreadyClosedExceptions: + } catch (IOException exc) { + if (merge != null) { merge.setException(exc); writer.addMergeException(merge); } - if (merge == null || !merge.isAborted()) { - // If the merge was not aborted then the exception - // is real + // Ignore the exception if it was due to abort: + if (!(exc instanceof MergePolicy.MergeAbortedException)) { synchronized(ConcurrentMergeScheduler.this) { exceptions.add(exc); } diff --git a/src/java/org/apache/lucene/index/DocumentsWriter.java b/src/java/org/apache/lucene/index/DocumentsWriter.java index 930a99aadc5..495570776ff 100644 --- a/src/java/org/apache/lucene/index/DocumentsWriter.java +++ b/src/java/org/apache/lucene/index/DocumentsWriter.java @@ -27,6 +27,7 @@ import org.apache.lucene.store.Directory; import org.apache.lucene.store.IndexOutput; import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.RAMOutputStream; +import org.apache.lucene.store.AlreadyClosedException; import java.io.IOException; import java.io.PrintStream; @@ -148,6 +149,8 @@ final class DocumentsWriter { // non-zero we will flush by RAM usage instead. private int maxBufferedDocs = IndexWriter.DEFAULT_MAX_BUFFERED_DOCS; + private boolean closed; + // Coarse estimates used to measure RAM usage of buffered deletes private static int OBJECT_HEADER_BYTES = 12; private static int OBJECT_POINTER_BYTES = 4; // TODO: should be 8 on 64-bit platform @@ -2168,6 +2171,10 @@ final class DocumentsWriter { } } + synchronized void close() { + closed = true; + } + /** Returns a free (idle) ThreadState that may be used for * indexing this one document. This call also pauses if a * flush is pending. If delTerm is non-null then we @@ -2211,6 +2218,9 @@ final class DocumentsWriter { Thread.currentThread().interrupt(); } + if (closed) + throw new AlreadyClosedException("this IndexWriter is closed"); + if (segment == null) segment = writer.newSegmentName(); diff --git a/src/java/org/apache/lucene/index/IndexWriter.java b/src/java/org/apache/lucene/index/IndexWriter.java index 5cc4dc9336e..71035a26d76 100644 --- a/src/java/org/apache/lucene/index/IndexWriter.java +++ b/src/java/org/apache/lucene/index/IndexWriter.java @@ -291,6 +291,7 @@ public class IndexWriter { private Set runningMerges = new HashSet(); private List mergeExceptions = new ArrayList(); private long mergeGen; + private boolean stopMerges; /** * Used internally to throw an {@link @@ -1150,8 +1151,10 @@ public class IndexWriter { * using a MergeScheduler that runs merges in background * threads. * @param waitForMerges if true, this call will block - * until all merges complete; else, it will abort all - * running merges and return right away + * until all merges complete; else, it will ask all + * running merges to abort, wait until those merges have + * finished (which should be at most a few seconds), and + * then return. */ public void close(boolean waitForMerges) throws CorruptIndexException, IOException { boolean doClose; @@ -1186,6 +1189,8 @@ public class IndexWriter { if (infoStream != null) message("now flush at close"); + docWriter.close(); + // Only allow a new merge to be triggered if we are // going to wait for merges: flush(waitForMerges, true); @@ -1196,33 +1201,33 @@ public class IndexWriter { mergeScheduler.close(); - if (commitPending) { - boolean success = false; - try { - segmentInfos.write(directory); // now commit changes - success = true; - } finally { - if (!success) { - if (infoStream != null) - message("hit exception committing segments file during close"); - deletePartialSegmentsFile(); - } - } - if (infoStream != null) - message("close: wrote segments file \"" + segmentInfos.getCurrentSegmentFileName() + "\""); - synchronized(this) { - deleter.checkpoint(segmentInfos, true); - } - commitPending = false; - rollbackSegmentInfos = null; - } - - if (infoStream != null) - message("at close: " + segString()); - - docWriter = null; - synchronized(this) { + if (commitPending) { + boolean success = false; + try { + segmentInfos.write(directory); // now commit changes + success = true; + } finally { + if (!success) { + if (infoStream != null) + message("hit exception committing segments file during close"); + deletePartialSegmentsFile(); + } + } + if (infoStream != null) + message("close: wrote segments file \"" + segmentInfos.getCurrentSegmentFileName() + "\""); + + deleter.checkpoint(segmentInfos, true); + + commitPending = false; + rollbackSegmentInfos = null; + } + + if (infoStream != null) + message("at close: " + segString()); + + docWriter = null; + deleter.close(); } @@ -1440,9 +1445,11 @@ public class IndexWriter { synchronized (this) { // If docWriter has some aborted files that were // never incref'd, then we clean them up here - final List files = docWriter.abortedFiles(); - if (files != null) - deleter.deleteNewFiles(files); + if (docWriter != null) { + final List files = docWriter.abortedFiles(); + if (files != null) + deleter.deleteNewFiles(files); + } } } } @@ -1799,6 +1806,9 @@ public class IndexWriter { throws CorruptIndexException, IOException { assert !optimize || maxNumSegmentsOptimize > 0; + if (stopMerges) + return; + final MergePolicy.MergeSpecification spec; if (optimize) { spec = mergePolicy.findMergesForOptimize(segmentInfos, this, maxNumSegmentsOptimize, segmentsToOptimize); @@ -1861,6 +1871,7 @@ public class IndexWriter { localRollbackSegmentInfos = (SegmentInfos) segmentInfos.clone(); localAutoCommit = autoCommit; + if (localAutoCommit) { if (infoStream != null) @@ -1905,6 +1916,7 @@ public class IndexWriter { deleter.refresh(); finishMerges(false); + stopMerges = false; } /* @@ -1995,7 +2007,6 @@ public class IndexWriter { // them: deleter.checkpoint(segmentInfos, false); deleter.refresh(); - finishMerges(false); } commitPending = false; @@ -2004,8 +2015,11 @@ public class IndexWriter { waitForClose(); } - private synchronized void finishMerges(boolean waitForMerges) { + private synchronized void finishMerges(boolean waitForMerges) throws IOException { if (!waitForMerges) { + + stopMerges = true; + // Abort all pending & running merges: Iterator it = pendingMerges.iterator(); while(it.hasNext()) { @@ -2013,9 +2027,10 @@ public class IndexWriter { if (infoStream != null) message("now abort pending merge " + merge.segString(directory)); merge.abort(); + mergeFinish(merge); } pendingMerges.clear(); - + it = runningMerges.iterator(); while(it.hasNext()) { final MergePolicy.OneMerge merge = (MergePolicy.OneMerge) it.next(); @@ -2023,10 +2038,27 @@ public class IndexWriter { message("now abort running merge " + merge.segString(directory)); merge.abort(); } - runningMerges.clear(); - mergingSegments.clear(); - notifyAll(); + // These merges periodically check whether they have + // been aborted, and stop if so. We wait here to make + // sure they all stop. It should not take very long + // because the merge threads periodically check if + // they are aborted. + while(runningMerges.size() > 0) { + if (infoStream != null) + message("now wait for " + runningMerges.size() + " running merge to abort"); + try { + wait(); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + } + + assert 0 == mergingSegments.size(); + + if (infoStream != null) + message("all running merges have aborted"); + } else { while(pendingMerges.size() > 0 || runningMerges.size() > 0) { try { @@ -2263,7 +2295,7 @@ public class IndexWriter { optimize(); // start with zero or 1 seg final String mergedName = newSegmentName(); - SegmentMerger merger = new SegmentMerger(this, mergedName); + SegmentMerger merger = new SegmentMerger(this, mergedName, null); SegmentInfo info; @@ -2684,10 +2716,12 @@ public class IndexWriter { deletes.set(docUpto); docUpto++; } - + } else // No deletes before or after docUpto += currentInfo.docCount; + + merge.checkAborted(directory); } if (deletes != null) { @@ -2783,15 +2817,26 @@ public class IndexWriter { try { - if (merge.info == null) - mergeInit(merge); + try { + if (merge.info == null) + mergeInit(merge); - if (infoStream != null) - message("now merge\n merge=" + merge.segString(directory) + "\n index=" + segString()); + if (infoStream != null) + message("now merge\n merge=" + merge.segString(directory) + "\n index=" + segString()); - mergeMiddle(merge); - - success = true; + mergeMiddle(merge); + success = true; + } catch (MergePolicy.MergeAbortedException e) { + merge.setException(e); + addMergeException(merge); + // We can ignore this exception, unless the merge + // involves segments from external directories, in + // which case we must throw it so, for example, the + // rollbackTransaction code in addIndexes* is + // executed. + if (merge.isExternal) + throw e; + } } finally { synchronized(this) { try { @@ -2863,11 +2908,11 @@ public class IndexWriter { * the synchronized lock on IndexWriter instance. */ final synchronized void mergeInit(MergePolicy.OneMerge merge) throws IOException { - if (merge.isAborted()) - throw new IOException("merge is aborted"); - assert merge.registerDone; + if (merge.isAborted()) + return; + final SegmentInfos sourceSegments = merge.segments; final int end = sourceSegments.size(); @@ -3010,6 +3055,8 @@ public class IndexWriter { * instance */ final private int mergeMiddle(MergePolicy.OneMerge merge) throws CorruptIndexException, IOException { + + merge.checkAborted(directory); final String mergedName = merge.info.name; @@ -3024,8 +3071,8 @@ public class IndexWriter { if (infoStream != null) message("merging " + merge.segString(directory)); - merger = new SegmentMerger(this, mergedName); - + merger = new SegmentMerger(this, mergedName, merge); + // This is try/finally to make sure merger's readers are // closed: @@ -3044,8 +3091,7 @@ public class IndexWriter { message("merge: total "+totDocCount+" docs"); } - if (merge.isAborted()) - throw new IOException("merge is aborted"); + merge.checkAborted(directory); mergedDocCount = merge.info.docCount = merger.merge(merge.mergeDocStores); diff --git a/src/java/org/apache/lucene/index/MergePolicy.java b/src/java/org/apache/lucene/index/MergePolicy.java index c3800dacd38..34c93f2b751 100644 --- a/src/java/org/apache/lucene/index/MergePolicy.java +++ b/src/java/org/apache/lucene/index/MergePolicy.java @@ -86,29 +86,34 @@ public abstract class MergePolicy { /** Record that an exception occurred while executing * this merge */ - public synchronized void setException(Throwable error) { + synchronized void setException(Throwable error) { this.error = error; } /** Retrieve previous exception set by {@link * #setException}. */ - public synchronized Throwable getException() { + synchronized Throwable getException() { return error; } /** Mark this merge as aborted. If this is called * before the merge is committed then the merge will * not be committed. */ - public synchronized void abort() { + synchronized void abort() { aborted = true; } /** Returns true if this merge was aborted. */ - public synchronized boolean isAborted() { + synchronized boolean isAborted() { return aborted; } - public String segString(Directory dir) { + synchronized void checkAborted(Directory dir) throws MergeAbortedException { + if (aborted) + throw new MergeAbortedException("merge is aborted: " + segString(dir)); + } + + String segString(Directory dir) { StringBuffer b = new StringBuffer(); final int numSegments = segments.size(); for(int i=0;i 0) { SegmentMergeInfo smi = match[--matchSize]; @@ -428,7 +448,7 @@ final class SegmentMerger { * @throws CorruptIndexException if the index is corrupt * @throws IOException if there is a low-level IO error */ - private final void mergeTermInfo(SegmentMergeInfo[] smis, int n) + private final int mergeTermInfo(SegmentMergeInfo[] smis, int n) throws CorruptIndexException, IOException { long freqPointer = freqOutput.getFilePointer(); long proxPointer = proxOutput.getFilePointer(); @@ -442,6 +462,8 @@ final class SegmentMerger { termInfo.set(df, freqPointer, proxPointer, (int) (skipPointer - freqPointer)); termInfosWriter.add(smis[0].term, termInfo); } + + return df; } private byte[] payloadBuffer = null; @@ -562,6 +584,8 @@ final class SegmentMerger { } } } + if (checkAbort != null) + checkAbort.work(maxDoc); } } } @@ -572,4 +596,29 @@ final class SegmentMerger { } } + final static class CheckAbort { + private double workCount; + private MergePolicy.OneMerge merge; + private Directory dir; + public CheckAbort(MergePolicy.OneMerge merge, Directory dir) { + this.merge = merge; + this.dir = dir; + } + + /** + * Records the fact that roughly units amount of work + * have been done since this method was last called. + * When adding time-consuming code into SegmentMerger, + * you should test different values for units to ensure + * that the time in between calls to merge.checkAborted + * is up to ~ 1 second. + */ + public void work(double units) throws MergePolicy.MergeAbortedException { + workCount += units; + if (workCount >= 10000.0) { + merge.checkAborted(dir); + workCount = 0; + } + } + } } diff --git a/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java b/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java index 2661673c857..22a6c5d4369 100644 --- a/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java +++ b/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java @@ -219,12 +219,6 @@ public class TestConcurrentMergeScheduler extends LuceneTestCase { writer.close(); } - try { - directory.close(); - } catch (RuntimeException re) { - // MockRAMDirectory will throw RuntimeExceptions when there - // are still open files, which is OK since some merge - // threads may still be running at this point. - } + directory.close(); } } diff --git a/src/test/org/apache/lucene/index/TestIndexWriter.java b/src/test/org/apache/lucene/index/TestIndexWriter.java index 3efcc1bcc2e..137a508215c 100644 --- a/src/test/org/apache/lucene/index/TestIndexWriter.java +++ b/src/test/org/apache/lucene/index/TestIndexWriter.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.io.Reader; import java.io.File; import java.util.Arrays; +import java.util.ArrayList; import java.util.Random; import org.apache.lucene.util.LuceneTestCase; @@ -1981,4 +1982,100 @@ public class TestIndexWriter extends LuceneTestCase } } } + + public void testNoWaitClose() throws Throwable { + RAMDirectory directory = new MockRAMDirectory(); + + final Document doc = new Document(); + Field idField = new Field("id", "", Field.Store.YES, Field.Index.UN_TOKENIZED); + doc.add(idField); + + for(int pass=0;pass<3;pass++) { + boolean autoCommit = pass%2 == 0; + IndexWriter writer = new IndexWriter(directory, autoCommit, new WhitespaceAnalyzer(), true); + + //System.out.println("TEST: pass=" + pass + " ac=" + autoCommit + " cms=" + (pass >= 2)); + for(int iter=0;iter<10;iter++) { + //System.out.println("TEST: iter=" + iter); + MergeScheduler ms; + if (pass >= 2) + ms = new ConcurrentMergeScheduler(); + else + ms = new SerialMergeScheduler(); + + writer.setMergeScheduler(ms); + writer.setMaxBufferedDocs(2); + writer.setMergeFactor(100); + + for(int j=0;j<199;j++) { + idField.setValue(Integer.toString(iter*201+j)); + writer.addDocument(doc); + } + + int delID = iter*199; + for(int j=0;j<20;j++) { + writer.deleteDocuments(new Term("id", Integer.toString(delID))); + delID += 5; + } + + // Force a bunch of merge threads to kick off so we + // stress out aborting them on close: + writer.setMergeFactor(2); + + final IndexWriter finalWriter = writer; + final ArrayList failure = new ArrayList(); + Thread t1 = new Thread() { + public void run() { + boolean done = false; + while(!done) { + for(int i=0;i<100;i++) { + try { + finalWriter.addDocument(doc); + } catch (AlreadyClosedException e) { + done = true; + break; + } catch (NullPointerException e) { + done = true; + break; + } catch (Throwable e) { + e.printStackTrace(System.out); + failure.add(e); + done = true; + break; + } + } + Thread.yield(); + } + + } + }; + + if (failure.size() > 0) + throw (Throwable) failure.get(0); + + t1.start(); + + writer.close(false); + while(true) { + try { + t1.join(); + break; + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + } + + // Make sure reader can read + IndexReader reader = IndexReader.open(directory); + reader.close(); + + // Reopen + writer = new IndexWriter(directory, autoCommit, new WhitespaceAnalyzer(), false); + } + writer.close(); + } + + directory.close(); + } + }