LUCENE-1097: change IndexWriter.close(false) to ask merge threads to abort, and, wait for them to finally finish

git-svn-id: https://svn.apache.org/repos/asf/lucene/java/trunk@606441 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael McCandless 2007-12-22 10:06:28 +00:00
parent 6e20e41418
commit 89fe185d71
8 changed files with 297 additions and 82 deletions

View File

@ -68,18 +68,22 @@ final class CompoundFileWriter {
private HashSet ids;
private LinkedList entries;
private boolean merged = false;
private SegmentMerger.CheckAbort checkAbort;
/** Create the compound stream in the specified file. The file name is the
* entire name (no extensions are added).
* @throws NullPointerException if <code>dir</code> or <code>name</code> is null
*/
public CompoundFileWriter(Directory dir, String name) {
this(dir, name, null);
}
CompoundFileWriter(Directory dir, String name, SegmentMerger.CheckAbort checkAbort) {
if (dir == null)
throw new NullPointerException("directory cannot be null");
if (name == null)
throw new NullPointerException("name cannot be null");
this.checkAbort = checkAbort;
directory = dir;
fileName = name;
ids = new HashSet();
@ -211,6 +215,10 @@ final class CompoundFileWriter {
is.readBytes(buffer, 0, len);
os.writeBytes(buffer, len);
remainder -= len;
if (checkAbort != null)
// Roughly every 2 MB we will check if
// it's time to abort
checkAbort.work(80);
}
// Verify that remainder is 0

View File

@ -251,18 +251,15 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
message(" merge thread: done");
} catch (Throwable exc) {
// When a merge was aborted & IndexWriter closed,
// it's possible to get various IOExceptions,
// NullPointerExceptions, AlreadyClosedExceptions:
} catch (IOException exc) {
if (merge != null) {
merge.setException(exc);
writer.addMergeException(merge);
}
if (merge == null || !merge.isAborted()) {
// If the merge was not aborted then the exception
// is real
// Ignore the exception if it was due to abort:
if (!(exc instanceof MergePolicy.MergeAbortedException)) {
synchronized(ConcurrentMergeScheduler.this) {
exceptions.add(exc);
}

View File

@ -27,6 +27,7 @@ import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.RAMOutputStream;
import org.apache.lucene.store.AlreadyClosedException;
import java.io.IOException;
import java.io.PrintStream;
@ -148,6 +149,8 @@ final class DocumentsWriter {
// non-zero we will flush by RAM usage instead.
private int maxBufferedDocs = IndexWriter.DEFAULT_MAX_BUFFERED_DOCS;
private boolean closed;
// Coarse estimates used to measure RAM usage of buffered deletes
private static int OBJECT_HEADER_BYTES = 12;
private static int OBJECT_POINTER_BYTES = 4; // TODO: should be 8 on 64-bit platform
@ -2168,6 +2171,10 @@ final class DocumentsWriter {
}
}
synchronized void close() {
closed = true;
}
/** Returns a free (idle) ThreadState that may be used for
* indexing this one document. This call also pauses if a
* flush is pending. If delTerm is non-null then we
@ -2211,6 +2218,9 @@ final class DocumentsWriter {
Thread.currentThread().interrupt();
}
if (closed)
throw new AlreadyClosedException("this IndexWriter is closed");
if (segment == null)
segment = writer.newSegmentName();

View File

@ -291,6 +291,7 @@ public class IndexWriter {
private Set runningMerges = new HashSet();
private List mergeExceptions = new ArrayList();
private long mergeGen;
private boolean stopMerges;
/**
* Used internally to throw an {@link
@ -1150,8 +1151,10 @@ public class IndexWriter {
* using a MergeScheduler that runs merges in background
* threads.
* @param waitForMerges if true, this call will block
* until all merges complete; else, it will abort all
* running merges and return right away
* until all merges complete; else, it will ask all
* running merges to abort, wait until those merges have
* finished (which should be at most a few seconds), and
* then return.
*/
public void close(boolean waitForMerges) throws CorruptIndexException, IOException {
boolean doClose;
@ -1186,6 +1189,8 @@ public class IndexWriter {
if (infoStream != null)
message("now flush at close");
docWriter.close();
// Only allow a new merge to be triggered if we are
// going to wait for merges:
flush(waitForMerges, true);
@ -1196,6 +1201,7 @@ public class IndexWriter {
mergeScheduler.close();
synchronized(this) {
if (commitPending) {
boolean success = false;
try {
@ -1210,9 +1216,9 @@ public class IndexWriter {
}
if (infoStream != null)
message("close: wrote segments file \"" + segmentInfos.getCurrentSegmentFileName() + "\"");
synchronized(this) {
deleter.checkpoint(segmentInfos, true);
}
commitPending = false;
rollbackSegmentInfos = null;
}
@ -1222,7 +1228,6 @@ public class IndexWriter {
docWriter = null;
synchronized(this) {
deleter.close();
}
@ -1440,12 +1445,14 @@ public class IndexWriter {
synchronized (this) {
// If docWriter has some aborted files that were
// never incref'd, then we clean them up here
if (docWriter != null) {
final List files = docWriter.abortedFiles();
if (files != null)
deleter.deleteNewFiles(files);
}
}
}
}
if ((status & 1) != 0)
flush(true, false);
checkMaxTermLength(status);
@ -1799,6 +1806,9 @@ public class IndexWriter {
throws CorruptIndexException, IOException {
assert !optimize || maxNumSegmentsOptimize > 0;
if (stopMerges)
return;
final MergePolicy.MergeSpecification spec;
if (optimize) {
spec = mergePolicy.findMergesForOptimize(segmentInfos, this, maxNumSegmentsOptimize, segmentsToOptimize);
@ -1861,6 +1871,7 @@ public class IndexWriter {
localRollbackSegmentInfos = (SegmentInfos) segmentInfos.clone();
localAutoCommit = autoCommit;
if (localAutoCommit) {
if (infoStream != null)
@ -1905,6 +1916,7 @@ public class IndexWriter {
deleter.refresh();
finishMerges(false);
stopMerges = false;
}
/*
@ -1995,7 +2007,6 @@ public class IndexWriter {
// them:
deleter.checkpoint(segmentInfos, false);
deleter.refresh();
finishMerges(false);
}
commitPending = false;
@ -2004,8 +2015,11 @@ public class IndexWriter {
waitForClose();
}
private synchronized void finishMerges(boolean waitForMerges) {
private synchronized void finishMerges(boolean waitForMerges) throws IOException {
if (!waitForMerges) {
stopMerges = true;
// Abort all pending & running merges:
Iterator it = pendingMerges.iterator();
while(it.hasNext()) {
@ -2013,6 +2027,7 @@ public class IndexWriter {
if (infoStream != null)
message("now abort pending merge " + merge.segString(directory));
merge.abort();
mergeFinish(merge);
}
pendingMerges.clear();
@ -2023,10 +2038,27 @@ public class IndexWriter {
message("now abort running merge " + merge.segString(directory));
merge.abort();
}
runningMerges.clear();
mergingSegments.clear();
notifyAll();
// These merges periodically check whether they have
// been aborted, and stop if so. We wait here to make
// sure they all stop. It should not take very long
// because the merge threads periodically check if
// they are aborted.
while(runningMerges.size() > 0) {
if (infoStream != null)
message("now wait for " + runningMerges.size() + " running merge to abort");
try {
wait();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
}
assert 0 == mergingSegments.size();
if (infoStream != null)
message("all running merges have aborted");
} else {
while(pendingMerges.size() > 0 || runningMerges.size() > 0) {
try {
@ -2263,7 +2295,7 @@ public class IndexWriter {
optimize(); // start with zero or 1 seg
final String mergedName = newSegmentName();
SegmentMerger merger = new SegmentMerger(this, mergedName);
SegmentMerger merger = new SegmentMerger(this, mergedName, null);
SegmentInfo info;
@ -2688,6 +2720,8 @@ public class IndexWriter {
} else
// No deletes before or after
docUpto += currentInfo.docCount;
merge.checkAborted(directory);
}
if (deletes != null) {
@ -2783,6 +2817,7 @@ public class IndexWriter {
try {
try {
if (merge.info == null)
mergeInit(merge);
@ -2790,8 +2825,18 @@ public class IndexWriter {
message("now merge\n merge=" + merge.segString(directory) + "\n index=" + segString());
mergeMiddle(merge);
success = true;
} catch (MergePolicy.MergeAbortedException e) {
merge.setException(e);
addMergeException(merge);
// We can ignore this exception, unless the merge
// involves segments from external directories, in
// which case we must throw it so, for example, the
// rollbackTransaction code in addIndexes* is
// executed.
if (merge.isExternal)
throw e;
}
} finally {
synchronized(this) {
try {
@ -2863,11 +2908,11 @@ public class IndexWriter {
* the synchronized lock on IndexWriter instance. */
final synchronized void mergeInit(MergePolicy.OneMerge merge) throws IOException {
if (merge.isAborted())
throw new IOException("merge is aborted");
assert merge.registerDone;
if (merge.isAborted())
return;
final SegmentInfos sourceSegments = merge.segments;
final int end = sourceSegments.size();
@ -3011,6 +3056,8 @@ public class IndexWriter {
final private int mergeMiddle(MergePolicy.OneMerge merge)
throws CorruptIndexException, IOException {
merge.checkAborted(directory);
final String mergedName = merge.info.name;
SegmentMerger merger = null;
@ -3024,7 +3071,7 @@ public class IndexWriter {
if (infoStream != null)
message("merging " + merge.segString(directory));
merger = new SegmentMerger(this, mergedName);
merger = new SegmentMerger(this, mergedName, merge);
// This is try/finally to make sure merger's readers are
// closed:
@ -3044,8 +3091,7 @@ public class IndexWriter {
message("merge: total "+totDocCount+" docs");
}
if (merge.isAborted())
throw new IOException("merge is aborted");
merge.checkAborted(directory);
mergedDocCount = merge.info.docCount = merger.merge(merge.mergeDocStores);

View File

@ -86,29 +86,34 @@ public abstract class MergePolicy {
/** Record that an exception occurred while executing
* this merge */
public synchronized void setException(Throwable error) {
synchronized void setException(Throwable error) {
this.error = error;
}
/** Retrieve previous exception set by {@link
* #setException}. */
public synchronized Throwable getException() {
synchronized Throwable getException() {
return error;
}
/** Mark this merge as aborted. If this is called
* before the merge is committed then the merge will
* not be committed. */
public synchronized void abort() {
synchronized void abort() {
aborted = true;
}
/** Returns true if this merge was aborted. */
public synchronized boolean isAborted() {
synchronized boolean isAborted() {
return aborted;
}
public String segString(Directory dir) {
synchronized void checkAborted(Directory dir) throws MergeAbortedException {
if (aborted)
throw new MergeAbortedException("merge is aborted: " + segString(dir));
}
String segString(Directory dir) {
StringBuffer b = new StringBuffer();
final int numSegments = segments.size();
for(int i=0;i<numSegments;i++) {
@ -162,6 +167,15 @@ public abstract class MergePolicy {
}
}
public static class MergeAbortedException extends IOException {
public MergeAbortedException() {
super("merge is aborted");
}
public MergeAbortedException(String message) {
super(message);
}
}
/**
* Determine what set of merge operations are now
* necessary on the index. The IndexWriter calls this

View File

@ -53,6 +53,8 @@ final class SegmentMerger {
private int mergedDocs;
private CheckAbort checkAbort;
// Whether we should merge doc stores (stored fields and
// vectors files). When all segments we are merging
// already share the same doc store files, we don't need
@ -61,7 +63,7 @@ final class SegmentMerger {
/** Maximum number of contiguous documents to bulk-copy
when merging stored fields */
private final static int MAX_RAW_MERGE_DOCS = 16384;
private final static int MAX_RAW_MERGE_DOCS = 4192;
/** This ctor used only by test code.
*
@ -73,9 +75,11 @@ final class SegmentMerger {
segment = name;
}
SegmentMerger(IndexWriter writer, String name) {
SegmentMerger(IndexWriter writer, String name, MergePolicy.OneMerge merge) {
directory = writer.getDirectory();
segment = name;
if (merge != null)
checkAbort = new CheckAbort(merge, directory);
termIndexInterval = writer.getTermIndexInterval();
}
@ -119,6 +123,13 @@ final class SegmentMerger {
this.mergeDocStores = mergeDocStores;
// NOTE: it's important to add calls to
// checkAbort.work(...) if you make any changes to this
// method that will spend alot of time. The frequency
// of this check impacts how long
// IndexWriter.close(false) takes to actually stop the
// threads.
mergedDocs = mergeFields();
mergeTerms();
mergeNorms();
@ -144,7 +155,7 @@ final class SegmentMerger {
final Vector createCompoundFile(String fileName)
throws IOException {
CompoundFileWriter cfsWriter =
new CompoundFileWriter(directory, fileName);
new CompoundFileWriter(directory, fileName, checkAbort);
Vector files =
new Vector(IndexFileNames.COMPOUND_EXTENSIONS.length + 1);
@ -265,9 +276,6 @@ final class SegmentMerger {
// Used for bulk-reading raw bytes for stored fields
final int[] rawDocLengths = new int[MAX_RAW_MERGE_DOCS];
// merge field values
final FieldsWriter fieldsWriter = new FieldsWriter(directory, segment, fieldInfos);
// for merging we don't want to compress/uncompress the data, so to tell the FieldsReader that we're
// in merge mode, we use this FieldSelector
FieldSelector fieldSelectorMerge = new FieldSelector() {
@ -276,6 +284,9 @@ final class SegmentMerger {
}
};
// merge field values
final FieldsWriter fieldsWriter = new FieldsWriter(directory, segment, fieldInfos);
try {
for (int i = 0; i < readers.size(); i++) {
final IndexReader reader = (IndexReader) readers.elementAt(i);
@ -302,10 +313,14 @@ final class SegmentMerger {
IndexInput stream = matchingFieldsReader.rawDocs(rawDocLengths, start, numDocs);
fieldsWriter.addRawDocuments(stream, rawDocLengths, numDocs);
docCount += numDocs;
if (checkAbort != null)
checkAbort.work(300*numDocs);
} else {
fieldsWriter.addDocument(reader.document(j, fieldSelectorMerge));
j++;
docCount++;
if (checkAbort != null)
checkAbort.work(300);
}
} else
j++;
@ -342,6 +357,8 @@ final class SegmentMerger {
if (reader.isDeleted(docNum))
continue;
termVectorsWriter.addAllDocVectors(reader.getTermFreqVectors(docNum));
if (checkAbort != null)
checkAbort.work(300);
}
}
} finally {
@ -405,7 +422,10 @@ final class SegmentMerger {
top = (SegmentMergeInfo) queue.top();
}
mergeTermInfo(match, matchSize); // add new TermInfo
final int df = mergeTermInfo(match, matchSize); // add new TermInfo
if (checkAbort != null)
checkAbort.work(df/3.0);
while (matchSize > 0) {
SegmentMergeInfo smi = match[--matchSize];
@ -428,7 +448,7 @@ final class SegmentMerger {
* @throws CorruptIndexException if the index is corrupt
* @throws IOException if there is a low-level IO error
*/
private final void mergeTermInfo(SegmentMergeInfo[] smis, int n)
private final int mergeTermInfo(SegmentMergeInfo[] smis, int n)
throws CorruptIndexException, IOException {
long freqPointer = freqOutput.getFilePointer();
long proxPointer = proxOutput.getFilePointer();
@ -442,6 +462,8 @@ final class SegmentMerger {
termInfo.set(df, freqPointer, proxPointer, (int) (skipPointer - freqPointer));
termInfosWriter.add(smis[0].term, termInfo);
}
return df;
}
private byte[] payloadBuffer = null;
@ -562,6 +584,8 @@ final class SegmentMerger {
}
}
}
if (checkAbort != null)
checkAbort.work(maxDoc);
}
}
}
@ -572,4 +596,29 @@ final class SegmentMerger {
}
}
final static class CheckAbort {
private double workCount;
private MergePolicy.OneMerge merge;
private Directory dir;
public CheckAbort(MergePolicy.OneMerge merge, Directory dir) {
this.merge = merge;
this.dir = dir;
}
/**
* Records the fact that roughly units amount of work
* have been done since this method was last called.
* When adding time-consuming code into SegmentMerger,
* you should test different values for units to ensure
* that the time in between calls to merge.checkAborted
* is up to ~ 1 second.
*/
public void work(double units) throws MergePolicy.MergeAbortedException {
workCount += units;
if (workCount >= 10000.0) {
merge.checkAborted(dir);
workCount = 0;
}
}
}
}

View File

@ -219,12 +219,6 @@ public class TestConcurrentMergeScheduler extends LuceneTestCase {
writer.close();
}
try {
directory.close();
} catch (RuntimeException re) {
// MockRAMDirectory will throw RuntimeExceptions when there
// are still open files, which is OK since some merge
// threads may still be running at this point.
}
}
}

View File

@ -21,6 +21,7 @@ import java.io.IOException;
import java.io.Reader;
import java.io.File;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.Random;
import org.apache.lucene.util.LuceneTestCase;
@ -1981,4 +1982,100 @@ public class TestIndexWriter extends LuceneTestCase
}
}
}
public void testNoWaitClose() throws Throwable {
RAMDirectory directory = new MockRAMDirectory();
final Document doc = new Document();
Field idField = new Field("id", "", Field.Store.YES, Field.Index.UN_TOKENIZED);
doc.add(idField);
for(int pass=0;pass<3;pass++) {
boolean autoCommit = pass%2 == 0;
IndexWriter writer = new IndexWriter(directory, autoCommit, new WhitespaceAnalyzer(), true);
//System.out.println("TEST: pass=" + pass + " ac=" + autoCommit + " cms=" + (pass >= 2));
for(int iter=0;iter<10;iter++) {
//System.out.println("TEST: iter=" + iter);
MergeScheduler ms;
if (pass >= 2)
ms = new ConcurrentMergeScheduler();
else
ms = new SerialMergeScheduler();
writer.setMergeScheduler(ms);
writer.setMaxBufferedDocs(2);
writer.setMergeFactor(100);
for(int j=0;j<199;j++) {
idField.setValue(Integer.toString(iter*201+j));
writer.addDocument(doc);
}
int delID = iter*199;
for(int j=0;j<20;j++) {
writer.deleteDocuments(new Term("id", Integer.toString(delID)));
delID += 5;
}
// Force a bunch of merge threads to kick off so we
// stress out aborting them on close:
writer.setMergeFactor(2);
final IndexWriter finalWriter = writer;
final ArrayList failure = new ArrayList();
Thread t1 = new Thread() {
public void run() {
boolean done = false;
while(!done) {
for(int i=0;i<100;i++) {
try {
finalWriter.addDocument(doc);
} catch (AlreadyClosedException e) {
done = true;
break;
} catch (NullPointerException e) {
done = true;
break;
} catch (Throwable e) {
e.printStackTrace(System.out);
failure.add(e);
done = true;
break;
}
}
Thread.yield();
}
}
};
if (failure.size() > 0)
throw (Throwable) failure.get(0);
t1.start();
writer.close(false);
while(true) {
try {
t1.join();
break;
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
}
// Make sure reader can read
IndexReader reader = IndexReader.open(directory);
reader.close();
// Reopen
writer = new IndexWriter(directory, autoCommit, new WhitespaceAnalyzer(), false);
}
writer.close();
}
directory.close();
}
}