LUCENE-1018: fix intermittent exception in TestConcurrentMergeScheduler

git-svn-id: https://svn.apache.org/repos/asf/lucene/java/trunk@582508 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael McCandless 2007-10-06 15:12:43 +00:00
parent 9de7367279
commit ceebb95d63
4 changed files with 59 additions and 27 deletions

View File

@ -107,10 +107,8 @@ public class ConcurrentMergeScheduler implements MergeScheduler {
while(mergeThreads.size() > 0) { while(mergeThreads.size() > 0) {
if (VERBOSE) { if (VERBOSE) {
message("now wait for threads; currently " + mergeThreads.size() + " still running"); message("now wait for threads; currently " + mergeThreads.size() + " still running");
for(int i=0;i<mergeThreads.size();i++) { for(int i=0;i<mergeThreads.size();i++)
final MergeThread mergeThread = ((MergeThread) mergeThreads.get(i)); message(" " + i + ": " + ((MergeThread) mergeThreads.get(i)));
message(" " + i + ": " + mergeThread.merge.segString(dir));
}
} }
try { try {
@ -210,24 +208,35 @@ public class ConcurrentMergeScheduler implements MergeScheduler {
private class MergeThread extends Thread { private class MergeThread extends Thread {
IndexWriter writer; IndexWriter writer;
MergePolicy.OneMerge merge; MergePolicy.OneMerge startMerge;
MergePolicy.OneMerge runningMerge;
public MergeThread(IndexWriter writer, MergePolicy.OneMerge merge) throws IOException { public MergeThread(IndexWriter writer, MergePolicy.OneMerge startMerge) throws IOException {
this.writer = writer; this.writer = writer;
this.merge = merge; this.startMerge = startMerge;
}
public synchronized void setRunningMerge(MergePolicy.OneMerge merge) {
runningMerge = merge;
}
public synchronized MergePolicy.OneMerge getRunningMerge() {
return runningMerge;
} }
public void run() { public void run() {
// First time through the while loop we do the merge
// that we were started with:
MergePolicy.OneMerge merge = this.startMerge;
try { try {
if (VERBOSE) if (VERBOSE)
message(" merge thread: start"); message(" merge thread: start");
// First time through the while loop we do the merge
// that we were started with:
MergePolicy.OneMerge merge = this.merge;
while(true) { while(true) {
setRunningMerge(merge);
writer.merge(merge); writer.merge(merge);
// Subsequent times through the loop we do any new // Subsequent times through the loop we do any new
@ -248,13 +257,17 @@ public class ConcurrentMergeScheduler implements MergeScheduler {
// When a merge was aborted & IndexWriter closed, // When a merge was aborted & IndexWriter closed,
// it's possible to get various IOExceptions, // it's possible to get various IOExceptions,
// NullPointerExceptions, AlreadyClosedExceptions: // NullPointerExceptions, AlreadyClosedExceptions:
merge.setException(exc); if (merge != null) {
writer.addMergeException(merge); merge.setException(exc);
writer.addMergeException(merge);
}
if (!merge.isAborted()) { if (merge == null || !merge.isAborted()) {
// If the merge was not aborted then the exception // If the merge was not aborted then the exception
// is real // is real
exceptions.add(exc); synchronized(ConcurrentMergeScheduler.this) {
exceptions.add(exc);
}
if (!suppressExceptions) if (!suppressExceptions)
// suppressExceptions is normally only set during // suppressExceptions is normally only set during
@ -270,6 +283,9 @@ public class ConcurrentMergeScheduler implements MergeScheduler {
} }
public String toString() { public String toString() {
MergePolicy.OneMerge merge = getRunningMerge();
if (merge == null)
merge = startMerge;
return "merge thread: " + merge.segString(dir); return "merge thread: " + merge.segString(dir);
} }
} }

View File

@ -100,6 +100,10 @@ final class IndexFileDeleter {
private IndexDeletionPolicy policy; private IndexDeletionPolicy policy;
private DocumentsWriter docWriter; private DocumentsWriter docWriter;
/** Change to true to see details of reference counts when
* infoStream != null */
public static boolean VERBOSE_REF_COUNTS = false;
void setInfoStream(PrintStream infoStream) { void setInfoStream(PrintStream infoStream) {
this.infoStream = infoStream; this.infoStream = infoStream;
if (infoStream != null) if (infoStream != null)
@ -342,6 +346,8 @@ final class IndexFileDeleter {
deletable = null; deletable = null;
int size = oldDeletable.size(); int size = oldDeletable.size();
for(int i=0;i<size;i++) { for(int i=0;i<size;i++) {
if (infoStream != null)
message("delete pending file " + oldDeletable.get(i));
deleteFile((String) oldDeletable.get(i)); deleteFile((String) oldDeletable.get(i));
} }
} }
@ -441,7 +447,7 @@ final class IndexFileDeleter {
for(int i=0;i<size;i++) { for(int i=0;i<size;i++) {
String fileName = (String) files.get(i); String fileName = (String) files.get(i);
RefCount rc = getRefCount(fileName); RefCount rc = getRefCount(fileName);
if (infoStream != null) { if (infoStream != null && VERBOSE_REF_COUNTS) {
message(" IncRef \"" + fileName + "\": pre-incr count is " + rc.count); message(" IncRef \"" + fileName + "\": pre-incr count is " + rc.count);
} }
rc.IncRef(); rc.IncRef();
@ -457,7 +463,7 @@ final class IndexFileDeleter {
private void decRef(String fileName) throws IOException { private void decRef(String fileName) throws IOException {
RefCount rc = getRefCount(fileName); RefCount rc = getRefCount(fileName);
if (infoStream != null) { if (infoStream != null && VERBOSE_REF_COUNTS) {
message(" DecRef \"" + fileName + "\": pre-decr count is " + rc.count); message(" DecRef \"" + fileName + "\": pre-decr count is " + rc.count);
} }
if (0 == rc.DecRef()) { if (0 == rc.DecRef()) {

View File

@ -1172,7 +1172,9 @@ public class IndexWriter {
if (infoStream != null) if (infoStream != null)
message("now flush at close"); message("now flush at close");
flush(true, true); // Only allow a new merge to be triggered if we are
// going to wait for merges:
flush(waitForMerges, true);
mergePolicy.close(); mergePolicy.close();
@ -1947,15 +1949,23 @@ public class IndexWriter {
if (!waitForMerges) { if (!waitForMerges) {
// Abort all pending & running merges: // Abort all pending & running merges:
Iterator it = pendingMerges.iterator(); Iterator it = pendingMerges.iterator();
while(it.hasNext()) while(it.hasNext()) {
((MergePolicy.OneMerge) it.next()).abort(); final MergePolicy.OneMerge merge = (MergePolicy.OneMerge) it.next();
if (infoStream != null)
message("now abort pending merge " + merge.segString(directory));
merge.abort();
}
pendingMerges.clear(); pendingMerges.clear();
it = runningMerges.iterator();
while(it.hasNext())
((MergePolicy.OneMerge) it.next()).abort();
it = runningMerges.iterator();
while(it.hasNext()) {
final MergePolicy.OneMerge merge = (MergePolicy.OneMerge) it.next();
if (infoStream != null)
message("now abort running merge " + merge.segString(directory));
merge.abort();
}
runningMerges.clear(); runningMerges.clear();
mergingSegments.clear(); mergingSegments.clear();
notifyAll(); notifyAll();
} else { } else {
@ -3078,7 +3088,7 @@ public class IndexWriter {
return mergedDocCount; return mergedDocCount;
} }
void addMergeException(MergePolicy.OneMerge merge) { synchronized void addMergeException(MergePolicy.OneMerge merge) {
if (!mergeExceptions.contains(merge) && mergeGen == merge.mergeGen) if (!mergeExceptions.contains(merge) && mergeGen == merge.mergeGen)
mergeExceptions.add(merge); mergeExceptions.add(merge);
} }

View File

@ -225,8 +225,8 @@ public class TestConcurrentMergeScheduler extends TestCase {
try { try {
directory.close(); directory.close();
} catch (RuntimeException ioe) { } catch (RuntimeException re) {
// MockRAMDirectory will throw IOExceptions when there // MockRAMDirectory will throw RuntimeExceptions when there
// are still open files, which is OK since some merge // are still open files, which is OK since some merge
// threads may still be running at this point. // threads may still be running at this point.
} }