LUCENE-2820: revert until I find the cause of the deadlock

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1050899 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael McCandless 2010-12-19 17:07:24 +00:00
parent 5f9bb3f0a8
commit 65b27c2adb
1 changed files with 27 additions and 54 deletions

View File

@ -65,6 +65,7 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
protected Directory dir; protected Directory dir;
private boolean closed;
protected IndexWriter writer; protected IndexWriter writer;
protected int mergeThreadCount; protected int mergeThreadCount;
@ -146,37 +147,18 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
* pause & unpause threads. */ * pause & unpause threads. */
protected synchronized void updateMergeThreads() { protected synchronized void updateMergeThreads() {
// Only look at threads that are alive & not in the CollectionUtil.mergeSort(mergeThreads, compareByMergeDocCount);
// process of stopping (ie have an active merge):
final List<MergeThread> activeMerges = new ArrayList<MergeThread>();
int threadIdx = 0;
while (threadIdx < mergeThreads.size()) {
final MergeThread mergeThread = mergeThreads.get(threadIdx);
if (!mergeThread.isAlive()) {
// Prune any dead threads
mergeThreads.remove(threadIdx);
continue;
}
if (mergeThread.getCurrentMerge() != null) {
activeMerges.add(mergeThread);
}
threadIdx++;
}
CollectionUtil.mergeSort(activeMerges, compareByMergeDocCount);
final int count = mergeThreads.size();
int pri = mergeThreadPriority; int pri = mergeThreadPriority;
final int activeMergeCount = activeMerges.size(); for(int i=0;i<count;i++) {
for (threadIdx=0;threadIdx<activeMergeCount;threadIdx++) { final MergeThread mergeThread = mergeThreads.get(i);
final MergeThread mergeThread = activeMerges.get(threadIdx);
final MergePolicy.OneMerge merge = mergeThread.getCurrentMerge(); final MergePolicy.OneMerge merge = mergeThread.getCurrentMerge();
if (merge == null) { if (merge == null) {
continue; continue;
} }
final boolean doPause; final boolean doPause;
if (threadIdx < activeMergeCount-maxThreadCount) { if (i < count-maxThreadCount) {
doPause = true; doPause = true;
} else { } else {
doPause = false; doPause = false;
@ -226,29 +208,23 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
@Override @Override
public void close() { public void close() {
sync(); closed = true;
} }
/** Wait for any running merge threads to finish */ public synchronized void sync() {
public void sync() { while(mergeThreadCount() > 0) {
while(true) { if (verbose())
MergeThread toSync = null; message("now wait for threads; currently " + mergeThreads.size() + " still running");
synchronized(this) { final int count = mergeThreads.size();
for(MergeThread t : mergeThreads) { if (verbose()) {
if (t.isAlive()) { for(int i=0;i<count;i++)
toSync = t; message(" " + i + ": " + mergeThreads.get(i));
break;
}
}
} }
if (toSync != null) {
try { try {
toSync.join(); wait();
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
throw new ThreadInterruptedException(ie); throw new ThreadInterruptedException(ie);
}
} else {
break;
} }
} }
} }
@ -335,17 +311,11 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
// merge: // merge:
merger = getMergeThread(writer, merge); merger = getMergeThread(writer, merge);
mergeThreads.add(merger); mergeThreads.add(merger);
if (verbose()) { updateMergeThreads();
if (verbose())
message(" launch new thread [" + merger.getName() + "]"); message(" launch new thread [" + merger.getName() + "]");
}
merger.start(); merger.start();
// Must call this after starting the thread else
// the new thread is removed from mergeThreads
// (since it's not alive yet):
updateMergeThreads();
success = true; success = true;
} }
} finally { } finally {
@ -439,6 +409,7 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
message(" merge thread: do another merge " + merge.segString(dir)); message(" merge thread: do another merge " + merge.segString(dir));
} else { } else {
done = true; done = true;
updateMergeThreads();
break; break;
} }
} }
@ -458,8 +429,10 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
} }
} finally { } finally {
synchronized(ConcurrentMergeScheduler.this) { synchronized(ConcurrentMergeScheduler.this) {
updateMergeThreads();
ConcurrentMergeScheduler.this.notifyAll(); ConcurrentMergeScheduler.this.notifyAll();
boolean removed = mergeThreads.remove(this);
assert removed;
updateMergeThreads();
} }
} }
} }