Fix waitForMerges to allow merge schedule to run one last time

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1618667 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Ryan Ernst 2014-08-18 17:16:04 +00:00
parent 235a886169
commit 52b2a30bcc
2 changed files with 62 additions and 67 deletions

View File

@ -908,7 +908,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
infoStream.message("IW", "now flush at close"); infoStream.message("IW", "now flush at close");
} }
flush(true, true); flush(true, true);
finishMerges(true); waitForMerges();
commitInternal(config.getMergePolicy()); commitInternal(config.getMergePolicy());
rollbackInternal(); // ie close, since we just committed rollbackInternal(); // ie close, since we just committed
success = true; success = true;
@ -1999,7 +1999,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
try { try {
synchronized(this) { synchronized(this) {
finishMerges(false); abortMerges();
stopMerges = true; stopMerges = true;
} }
@ -2136,7 +2136,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
synchronized (this) { synchronized (this) {
try { try {
// Abort any running merges // Abort any running merges
finishMerges(false); abortMerges();
// Remove all segments // Remove all segments
segmentInfos.clear(); segmentInfos.clear();
// Ask deleter to locate unreferenced files & remove them: // Ask deleter to locate unreferenced files & remove them:
@ -2173,60 +2173,45 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
/** Aborts running merges. Be careful when using this /** Aborts running merges. Be careful when using this
* method: when you abort a long-running merge, you lose * method: when you abort a long-running merge, you lose
* a lot of work that must later be redone. */ * a lot of work that must later be redone. */
public void abortMerges() { public synchronized void abortMerges() {
finishMerges(false); stopMerges = true;
}
private synchronized void finishMerges(boolean waitForMerges) {
if (!waitForMerges) {
stopMerges = true;
// Abort all pending & running merges:
for (final MergePolicy.OneMerge merge : pendingMerges) {
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "now abort pending merge " + segString(merge.segments));
}
merge.abort();
mergeFinish(merge);
}
pendingMerges.clear();
for (final MergePolicy.OneMerge merge : runningMerges) {
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "now abort running merge " + segString(merge.segments));
}
merge.abort();
}
// These merges periodically check whether they have
// been aborted, and stop if so. We wait here to make
// sure they all stop. It should not take very long
// because the merge threads periodically check if
// they are aborted.
while(runningMerges.size() > 0) {
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "now wait for " + runningMerges.size() + " running merge/s to abort");
}
doWait();
}
stopMerges = false;
notifyAll();
assert 0 == mergingSegments.size();
// Abort all pending & running merges:
for (final MergePolicy.OneMerge merge : pendingMerges) {
if (infoStream.isEnabled("IW")) { if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "all running merges have aborted"); infoStream.message("IW", "now abort pending merge " + segString(merge.segments));
} }
merge.abort();
mergeFinish(merge);
}
pendingMerges.clear();
} else { for (final MergePolicy.OneMerge merge : runningMerges) {
// waitForMerges() will ensure any running addIndexes finishes. if (infoStream.isEnabled("IW")) {
// It's fine if a new one attempts to start because from our infoStream.message("IW", "now abort running merge " + segString(merge.segments));
// caller above the call will see that we are in the }
// process of closing, and will throw an merge.abort();
// AlreadyClosedException. }
waitForMerges();
// These merges periodically check whether they have
// been aborted, and stop if so. We wait here to make
// sure they all stop. It should not take very long
// because the merge threads periodically check if
// they are aborted.
while(runningMerges.size() > 0) {
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "now wait for " + runningMerges.size() + " running merge/s to abort");
}
doWait();
}
stopMerges = false;
notifyAll();
assert 0 == mergingSegments.size();
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "all running merges have aborted");
} }
} }
@ -2236,20 +2221,30 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* <p>It is guaranteed that any merges started prior to calling this method * <p>It is guaranteed that any merges started prior to calling this method
* will have completed once this method completes.</p> * will have completed once this method completes.</p>
*/ */
public synchronized void waitForMerges() { public void waitForMerges() throws IOException {
ensureOpen(false);
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "waitForMerges");
}
while(pendingMerges.size() > 0 || runningMerges.size() > 0) {
doWait();
}
// sanity check // Give merge scheduler last chance to run, in case
assert 0 == mergingSegments.size(); // any pending merges are waiting. We can't hold IW's lock
// when going into merge because it can lead to deadlock.
mergeScheduler.merge(this, MergeTrigger.CLOSING, false);
if (infoStream.isEnabled("IW")) { synchronized (this) {
infoStream.message("IW", "waitForMerges done"); ensureOpen(false);
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "waitForMerges");
}
while (pendingMerges.size() > 0 || runningMerges.size() > 0) {
doWait();
}
// sanity check
assert 0 == mergingSegments.size();
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "waitForMerges done");
}
} }
} }
@ -3835,7 +3830,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* the synchronized lock on IndexWriter instance. */ * the synchronized lock on IndexWriter instance. */
final synchronized void mergeFinish(MergePolicy.OneMerge merge) { final synchronized void mergeFinish(MergePolicy.OneMerge merge) {
// forceMerge, addIndexes or finishMerges may be waiting // forceMerge, addIndexes or waitForMerges may be waiting
// on merges to finish. // on merges to finish.
notifyAll(); notifyAll();

View File

@ -233,7 +233,7 @@ public class TestIndexWriterMergePolicy extends LuceneTestCase {
writer.addDocument(doc); writer.addDocument(doc);
} }
private void checkInvariants(IndexWriter writer) { private void checkInvariants(IndexWriter writer) throws IOException {
writer.waitForMerges(); writer.waitForMerges();
int maxBufferedDocs = writer.getConfig().getMaxBufferedDocs(); int maxBufferedDocs = writer.getConfig().getMaxBufferedDocs();
int mergeFactor = ((LogMergePolicy) writer.getConfig().getMergePolicy()).getMergeFactor(); int mergeFactor = ((LogMergePolicy) writer.getConfig().getMergePolicy()).getMergeFactor();