LUCENE-2820: fix CMS to stop its threads without deadlocking!

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1051041 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael McCandless 2010-12-20 10:35:30 +00:00
parent 65b27c2adb
commit 765eab4008
3 changed files with 79 additions and 39 deletions

View File

@ -65,7 +65,6 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
protected Directory dir; protected Directory dir;
private boolean closed;
protected IndexWriter writer; protected IndexWriter writer;
protected int mergeThreadCount; protected int mergeThreadCount;
@ -147,18 +146,37 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
* pause & unpause threads. */ * pause & unpause threads. */
protected synchronized void updateMergeThreads() { protected synchronized void updateMergeThreads() {
CollectionUtil.mergeSort(mergeThreads, compareByMergeDocCount); // Only look at threads that are alive & not in the
// process of stopping (ie have an active merge):
final List<MergeThread> activeMerges = new ArrayList<MergeThread>();
int threadIdx = 0;
while (threadIdx < mergeThreads.size()) {
final MergeThread mergeThread = mergeThreads.get(threadIdx);
if (!mergeThread.isAlive()) {
// Prune any dead threads
mergeThreads.remove(threadIdx);
continue;
}
if (mergeThread.getCurrentMerge() != null) {
activeMerges.add(mergeThread);
}
threadIdx++;
}
CollectionUtil.mergeSort(activeMerges, compareByMergeDocCount);
final int count = mergeThreads.size();
int pri = mergeThreadPriority; int pri = mergeThreadPriority;
for(int i=0;i<count;i++) { final int activeMergeCount = activeMerges.size();
final MergeThread mergeThread = mergeThreads.get(i); for (threadIdx=0;threadIdx<activeMergeCount;threadIdx++) {
final MergeThread mergeThread = activeMerges.get(threadIdx);
final MergePolicy.OneMerge merge = mergeThread.getCurrentMerge(); final MergePolicy.OneMerge merge = mergeThread.getCurrentMerge();
if (merge == null) { if (merge == null) {
continue; continue;
} }
final boolean doPause; final boolean doPause;
if (i < count-maxThreadCount) { if (threadIdx < activeMergeCount-maxThreadCount) {
doPause = true; doPause = true;
} else { } else {
doPause = false; doPause = false;
@ -208,23 +226,29 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
@Override @Override
public void close() { public void close() {
closed = true; sync();
} }
public synchronized void sync() { /** Wait for any running merge threads to finish */
while(mergeThreadCount() > 0) { public void sync() {
if (verbose()) while(true) {
message("now wait for threads; currently " + mergeThreads.size() + " still running"); MergeThread toSync = null;
final int count = mergeThreads.size(); synchronized(this) {
if (verbose()) { for(MergeThread t : mergeThreads) {
for(int i=0;i<count;i++) if (t.isAlive()) {
message(" " + i + ": " + mergeThreads.get(i)); toSync = t;
break;
}
}
} }
if (toSync != null) {
try { try {
wait(); toSync.join();
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
throw new ThreadInterruptedException(ie); throw new ThreadInterruptedException(ie);
}
} else {
break;
} }
} }
} }
@ -232,9 +256,12 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
private synchronized int mergeThreadCount() { private synchronized int mergeThreadCount() {
int count = 0; int count = 0;
final int numThreads = mergeThreads.size(); final int numThreads = mergeThreads.size();
for(int i=0;i<numThreads;i++) for(int i=0;i<numThreads;i++) {
if (mergeThreads.get(i).isAlive()) final MergeThread t = mergeThreads.get(i);
if (t.isAlive() && t.getCurrentMerge() != null) {
count++; count++;
}
}
return count; return count;
} }
@ -311,11 +338,17 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
// merge: // merge:
merger = getMergeThread(writer, merge); merger = getMergeThread(writer, merge);
mergeThreads.add(merger); mergeThreads.add(merger);
updateMergeThreads(); if (verbose()) {
if (verbose())
message(" launch new thread [" + merger.getName() + "]"); message(" launch new thread [" + merger.getName() + "]");
}
merger.start(); merger.start();
// Must call this after starting the thread else
// the new thread is removed from mergeThreads
// (since it's not alive yet):
updateMergeThreads();
success = true; success = true;
} }
} finally { } finally {
@ -408,8 +441,6 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
if (verbose()) if (verbose())
message(" merge thread: do another merge " + merge.segString(dir)); message(" merge thread: do another merge " + merge.segString(dir));
} else { } else {
done = true;
updateMergeThreads();
break; break;
} }
} }
@ -428,11 +459,10 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
} }
} }
} finally { } finally {
done = true;
synchronized(ConcurrentMergeScheduler.this) { synchronized(ConcurrentMergeScheduler.this) {
ConcurrentMergeScheduler.this.notifyAll();
boolean removed = mergeThreads.remove(this);
assert removed;
updateMergeThreads(); updateMergeThreads();
ConcurrentMergeScheduler.this.notifyAll();
} }
} }
} }

View File

@ -851,10 +851,14 @@ public class TestIndexWriterExceptions extends LuceneTestCase {
w.close(); w.close();
for(int i=0;i<200;i++) { for(int i=0;i<200;i++) {
if (VERBOSE) {
System.out.println("TEST: iter " + i);
}
MockDirectoryWrapper dir = new MockDirectoryWrapper(random, new RAMDirectory(startDir)); MockDirectoryWrapper dir = new MockDirectoryWrapper(random, new RAMDirectory(startDir));
conf = newIndexWriterConfig( TEST_VERSION_CURRENT, new MockAnalyzer()).setMergeScheduler(new ConcurrentMergeScheduler()); conf = newIndexWriterConfig( TEST_VERSION_CURRENT, new MockAnalyzer()).setMergeScheduler(new ConcurrentMergeScheduler());
((ConcurrentMergeScheduler) conf.getMergeScheduler()).setSuppressExceptions(); ((ConcurrentMergeScheduler) conf.getMergeScheduler()).setSuppressExceptions();
w = new IndexWriter(dir, conf); w = new IndexWriter(dir, conf);
w.setInfoStream(VERBOSE ? System.out : null);
dir.setRandomIOExceptionRate(0.5); dir.setRandomIOExceptionRate(0.5);
try { try {
w.optimize(); w.optimize();

View File

@ -18,18 +18,20 @@ package org.apache.lucene.store;
*/ */
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.Random;
import java.util.Map;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Random;
import java.util.Set; import java.util.Set;
import java.util.ArrayList;
import org.apache.lucene.util.LuceneTestCase;
/** /**
* This is a Directory Wrapper that adds methods * This is a Directory Wrapper that adds methods
@ -220,6 +222,10 @@ public class MockDirectoryWrapper extends Directory {
if (randomIOExceptionRate > 0.0) { if (randomIOExceptionRate > 0.0) {
int number = Math.abs(randomState.nextInt() % 1000); int number = Math.abs(randomState.nextInt() % 1000);
if (number < randomIOExceptionRate*1000) { if (number < randomIOExceptionRate*1000) {
if (LuceneTestCase.VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": MockDirectoryWrapper: now throw random exception");
new Throwable().printStackTrace(System.out);
}
throw new IOException("a random IOException"); throw new IOException("a random IOException");
} }
} }