mirror of https://github.com/apache/lucene.git
LUCENE-4245: Make IndexWriter#close() and MergeScheduler#close() non-interruptible
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1364896 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
9878f2f5f7
commit
22a8a7cde9
|
@ -100,6 +100,9 @@ Bug Fixes
|
|||
* LUCENE-4234: Exception when FacetsCollector is used with ScoreFacetRequest,
|
||||
and the number of matching documents is too large. (Gilad Barkai via Shai Erera)
|
||||
|
||||
* LUCENE-4245: Make IndexWriter#close() and MergeScheduler#close()
|
||||
non-interruptible. (Mark Miller, Uwe Schindler)
|
||||
|
||||
Build
|
||||
|
||||
* LUCENE-4094: Support overriding file.encoding on forked test JVMs
|
||||
|
|
|
@ -243,27 +243,34 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
|
|||
sync();
|
||||
}
|
||||
|
||||
/** Wait for any running merge threads to finish */
|
||||
/** Wait for any running merge threads to finish. This call is not interruptible as used by {@link #close()}. */
|
||||
public void sync() {
|
||||
while (true) {
|
||||
MergeThread toSync = null;
|
||||
synchronized (this) {
|
||||
for (MergeThread t : mergeThreads) {
|
||||
if (t.isAlive()) {
|
||||
toSync = t;
|
||||
break;
|
||||
boolean interrupted = false;
|
||||
try {
|
||||
while (true) {
|
||||
MergeThread toSync = null;
|
||||
synchronized (this) {
|
||||
for (MergeThread t : mergeThreads) {
|
||||
if (t.isAlive()) {
|
||||
toSync = t;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (toSync != null) {
|
||||
try {
|
||||
toSync.join();
|
||||
} catch (InterruptedException ie) {
|
||||
throw new ThreadInterruptedException(ie);
|
||||
if (toSync != null) {
|
||||
try {
|
||||
toSync.join();
|
||||
} catch (InterruptedException ie) {
|
||||
// ignore this Exception, we will retry until all threads are dead
|
||||
interrupted = true;
|
||||
}
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
} finally {
|
||||
// finally, restore interrupt status:
|
||||
if (interrupted) Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -842,7 +842,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
|
|||
if (hitOOM) {
|
||||
rollbackInternal();
|
||||
} else {
|
||||
closeInternal(waitForMerges, !hitOOM);
|
||||
closeInternal(waitForMerges, true);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -870,7 +870,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
|
|||
}
|
||||
|
||||
private void closeInternal(boolean waitForMerges, boolean doFlush) throws IOException {
|
||||
|
||||
boolean interrupted = Thread.interrupted();
|
||||
try {
|
||||
|
||||
if (pendingCommit != null) {
|
||||
|
@ -891,17 +891,35 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
|
|||
docWriter.abort(); // already closed
|
||||
}
|
||||
|
||||
if (waitForMerges)
|
||||
// Give merge scheduler last chance to run, in case
|
||||
// any pending merges are waiting:
|
||||
mergeScheduler.merge(this);
|
||||
|
||||
if (waitForMerges) {
|
||||
try {
|
||||
// Give merge scheduler last chance to run, in case
|
||||
// any pending merges are waiting:
|
||||
mergeScheduler.merge(this);
|
||||
} catch (ThreadInterruptedException tie) {
|
||||
// ignore any interruption, does not matter
|
||||
interrupted = true;
|
||||
}
|
||||
}
|
||||
|
||||
mergePolicy.close();
|
||||
|
||||
synchronized(this) {
|
||||
finishMerges(waitForMerges);
|
||||
for (;;) {
|
||||
try {
|
||||
finishMerges(waitForMerges && !interrupted);
|
||||
break;
|
||||
} catch (ThreadInterruptedException tie) {
|
||||
// by setting the interrupted status, the
|
||||
// next call to finishMerges will pass false,
|
||||
// so it will not wait
|
||||
interrupted = true;
|
||||
}
|
||||
}
|
||||
stopMerges = true;
|
||||
}
|
||||
|
||||
// shutdown scheduler and all threads (this call is not interruptible):
|
||||
mergeScheduler.close();
|
||||
|
||||
if (infoStream.isEnabled("IW")) {
|
||||
|
@ -943,6 +961,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
|
|||
}
|
||||
}
|
||||
}
|
||||
// finally, restore interrupt status:
|
||||
if (interrupted) Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -17,6 +17,7 @@ package org.apache.lucene.index;
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
|
||||
/** <p>Expert: {@link IndexWriter} uses an instance
|
||||
|
@ -26,7 +27,7 @@ import java.io.IOException;
|
|||
*
|
||||
* @lucene.experimental
|
||||
*/
|
||||
public abstract class MergeScheduler {
|
||||
public abstract class MergeScheduler implements Closeable {
|
||||
|
||||
/** Run the merges provided by {@link IndexWriter#getNextMerge()}. */
|
||||
public abstract void merge(IndexWriter writer) throws IOException;
|
||||
|
|
Loading…
Reference in New Issue