From 52b2a30bccb6ef4e50124559bf734d5cf0df7b37 Mon Sep 17 00:00:00 2001 From: Ryan Ernst Date: Mon, 18 Aug 2014 17:16:04 +0000 Subject: [PATCH] Fix waitForMerges to allow merge schedule to run one last time git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1618667 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/lucene/index/IndexWriter.java | 127 +++++++++--------- .../index/TestIndexWriterMergePolicy.java | 2 +- 2 files changed, 62 insertions(+), 67 deletions(-) diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java index e9252e4b163..d454b2e5ac1 100644 --- a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java +++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java @@ -908,7 +908,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { infoStream.message("IW", "now flush at close"); } flush(true, true); - finishMerges(true); + waitForMerges(); commitInternal(config.getMergePolicy()); rollbackInternal(); // ie close, since we just committed success = true; @@ -1999,7 +1999,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { try { synchronized(this) { - finishMerges(false); + abortMerges(); stopMerges = true; } @@ -2136,7 +2136,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { synchronized (this) { try { // Abort any running merges - finishMerges(false); + abortMerges(); // Remove all segments segmentInfos.clear(); // Ask deleter to locate unreferenced files & remove them: @@ -2173,60 +2173,45 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { /** Aborts running merges. Be careful when using this * method: when you abort a long-running merge, you lose * a lot of work that must later be redone. */ - public void abortMerges() { - finishMerges(false); - } - - private synchronized void finishMerges(boolean waitForMerges) { - if (!waitForMerges) { - - stopMerges = true; - - // Abort all pending & running merges: - for (final MergePolicy.OneMerge merge : pendingMerges) { - if (infoStream.isEnabled("IW")) { - infoStream.message("IW", "now abort pending merge " + segString(merge.segments)); - } - merge.abort(); - mergeFinish(merge); - } - pendingMerges.clear(); - - for (final MergePolicy.OneMerge merge : runningMerges) { - if (infoStream.isEnabled("IW")) { - infoStream.message("IW", "now abort running merge " + segString(merge.segments)); - } - merge.abort(); - } - - // These merges periodically check whether they have - // been aborted, and stop if so. We wait here to make - // sure they all stop. It should not take very long - // because the merge threads periodically check if - // they are aborted. - while(runningMerges.size() > 0) { - if (infoStream.isEnabled("IW")) { - infoStream.message("IW", "now wait for " + runningMerges.size() + " running merge/s to abort"); - } - doWait(); - } - - stopMerges = false; - notifyAll(); - - assert 0 == mergingSegments.size(); + public synchronized void abortMerges() { + stopMerges = true; + // Abort all pending & running merges: + for (final MergePolicy.OneMerge merge : pendingMerges) { if (infoStream.isEnabled("IW")) { - infoStream.message("IW", "all running merges have aborted"); + infoStream.message("IW", "now abort pending merge " + segString(merge.segments)); } + merge.abort(); + mergeFinish(merge); + } + pendingMerges.clear(); - } else { - // waitForMerges() will ensure any running addIndexes finishes. - // It's fine if a new one attempts to start because from our - // caller above the call will see that we are in the - // process of closing, and will throw an - // AlreadyClosedException. - waitForMerges(); + for (final MergePolicy.OneMerge merge : runningMerges) { + if (infoStream.isEnabled("IW")) { + infoStream.message("IW", "now abort running merge " + segString(merge.segments)); + } + merge.abort(); + } + + // These merges periodically check whether they have + // been aborted, and stop if so. We wait here to make + // sure they all stop. It should not take very long + // because the merge threads periodically check if + // they are aborted. + while(runningMerges.size() > 0) { + if (infoStream.isEnabled("IW")) { + infoStream.message("IW", "now wait for " + runningMerges.size() + " running merge/s to abort"); + } + doWait(); + } + + stopMerges = false; + notifyAll(); + + assert 0 == mergingSegments.size(); + + if (infoStream.isEnabled("IW")) { + infoStream.message("IW", "all running merges have aborted"); } } @@ -2236,20 +2221,30 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { *

It is guaranteed that any merges started prior to calling this method * will have completed once this method completes.

*/ - public synchronized void waitForMerges() { - ensureOpen(false); - if (infoStream.isEnabled("IW")) { - infoStream.message("IW", "waitForMerges"); - } - while(pendingMerges.size() > 0 || runningMerges.size() > 0) { - doWait(); - } + public void waitForMerges() throws IOException { - // sanity check - assert 0 == mergingSegments.size(); + // Give merge scheduler last chance to run, in case + // any pending merges are waiting. We can't hold IW's lock + // when going into merge because it can lead to deadlock. + mergeScheduler.merge(this, MergeTrigger.CLOSING, false); - if (infoStream.isEnabled("IW")) { - infoStream.message("IW", "waitForMerges done"); + synchronized (this) { + ensureOpen(false); + if (infoStream.isEnabled("IW")) { + infoStream.message("IW", "waitForMerges"); + } + + + while (pendingMerges.size() > 0 || runningMerges.size() > 0) { + doWait(); + } + + // sanity check + assert 0 == mergingSegments.size(); + + if (infoStream.isEnabled("IW")) { + infoStream.message("IW", "waitForMerges done"); + } } } @@ -3835,7 +3830,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { * the synchronized lock on IndexWriter instance. */ final synchronized void mergeFinish(MergePolicy.OneMerge merge) { - // forceMerge, addIndexes or finishMerges may be waiting + // forceMerge, addIndexes or waitForMerges may be waiting // on merges to finish. notifyAll(); diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterMergePolicy.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterMergePolicy.java index bf83ee82587..f10de0e9e74 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterMergePolicy.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterMergePolicy.java @@ -233,7 +233,7 @@ public class TestIndexWriterMergePolicy extends LuceneTestCase { writer.addDocument(doc); } - private void checkInvariants(IndexWriter writer) { + private void checkInvariants(IndexWriter writer) throws IOException { writer.waitForMerges(); int maxBufferedDocs = writer.getConfig().getMaxBufferedDocs(); int mergeFactor = ((LogMergePolicy) writer.getConfig().getMergePolicy()).getMergeFactor();