mirror of https://github.com/apache/lucene.git
LUCENE-1164: when too many merge threads are running, pause until one or more finishes, instead of doing the merge with the foreground thread
git-svn-id: https://svn.apache.org/repos/asf/lucene/java/trunk@619890 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
0d2df7b785
commit
7965c59942
|
@ -25,10 +25,11 @@ import java.util.ArrayList;
|
||||||
|
|
||||||
/** A {@link MergeScheduler} that runs each merge using a
|
/** A {@link MergeScheduler} that runs each merge using a
|
||||||
* separate thread, up until a maximum number of threads
|
* separate thread, up until a maximum number of threads
|
||||||
* ({@link #setMaxThreadCount}) at which points merges are
|
* ({@link #setMaxThreadCount}) at which when a merge is
|
||||||
* run in the foreground, serially. This is a simple way
|
* needed, the thread(s) that are updating the index will
|
||||||
* to use concurrency in the indexing process without
|
* pause until one or more merges completes. This is a
|
||||||
* having to create and manage application level
|
* simple way to use concurrency in the indexing process
|
||||||
|
* without having to create and manage application level
|
||||||
* threads. */
|
* threads. */
|
||||||
|
|
||||||
public class ConcurrentMergeScheduler extends MergeScheduler {
|
public class ConcurrentMergeScheduler extends MergeScheduler {
|
||||||
|
@ -36,6 +37,8 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
|
||||||
private int mergeThreadPriority = -1;
|
private int mergeThreadPriority = -1;
|
||||||
|
|
||||||
protected List mergeThreads = new ArrayList();
|
protected List mergeThreads = new ArrayList();
|
||||||
|
|
||||||
|
// Max number of threads allowed to be merging at once
|
||||||
private int maxThreadCount = 3;
|
private int maxThreadCount = 3;
|
||||||
|
|
||||||
private List exceptions = new ArrayList();
|
private List exceptions = new ArrayList();
|
||||||
|
@ -53,8 +56,9 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
|
||||||
|
|
||||||
/** Sets the max # simultaneous threads that may be
|
/** Sets the max # simultaneous threads that may be
|
||||||
* running. If a merge is necessary yet we already have
|
* running. If a merge is necessary yet we already have
|
||||||
* this many threads running, the merge is returned back
|
* this many threads running, the incoming thread (that
|
||||||
* to IndexWriter so that it runs in the "foreground". */
|
* is calling add/updateDocument) will block until
|
||||||
|
* a merge thread has completed. */
|
||||||
public void setMaxThreadCount(int count) {
|
public void setMaxThreadCount(int count) {
|
||||||
if (count < 1)
|
if (count < 1)
|
||||||
throw new IllegalArgumentException("count should be at least 1");
|
throw new IllegalArgumentException("count should be at least 1");
|
||||||
|
@ -150,7 +154,7 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
|
||||||
message(" index: " + writer.segString());
|
message(" index: " + writer.segString());
|
||||||
|
|
||||||
// Iterate, pulling from the IndexWriter's queue of
|
// Iterate, pulling from the IndexWriter's queue of
|
||||||
// pending merges, until its empty:
|
// pending merges, until it's empty:
|
||||||
while(true) {
|
while(true) {
|
||||||
|
|
||||||
// TODO: we could be careful about which merges to do in
|
// TODO: we could be careful about which merges to do in
|
||||||
|
@ -167,27 +171,35 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
|
||||||
// deterministic assignment of segment names
|
// deterministic assignment of segment names
|
||||||
writer.mergeInit(merge);
|
writer.mergeInit(merge);
|
||||||
|
|
||||||
message(" consider merge " + merge.segString(dir));
|
synchronized(this) {
|
||||||
|
while (mergeThreadCount() >= maxThreadCount) {
|
||||||
|
message(" too many merge threads running; stalling...");
|
||||||
|
try {
|
||||||
|
wait();
|
||||||
|
} catch (InterruptedException ie) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (merge.isExternal) {
|
message(" consider merge " + merge.segString(dir));
|
||||||
message(" merge involves segments from an external directory; now run in foreground");
|
|
||||||
} else {
|
if (merge.isExternal) {
|
||||||
synchronized(this) {
|
message(" merge involves segments from an external directory; now run in foreground");
|
||||||
if (mergeThreadCount() < maxThreadCount) {
|
} else {
|
||||||
// OK to spawn a new merge thread to handle this
|
assert mergeThreadCount() < maxThreadCount;
|
||||||
// merge:
|
|
||||||
final MergeThread merger = getMergeThread(writer, merge);
|
// OK to spawn a new merge thread to handle this
|
||||||
mergeThreads.add(merger);
|
// merge:
|
||||||
message(" launch new thread [" + merger.getName() + "]");
|
final MergeThread merger = getMergeThread(writer, merge);
|
||||||
merger.start();
|
mergeThreads.add(merger);
|
||||||
continue;
|
message(" launch new thread [" + merger.getName() + "]");
|
||||||
} else
|
merger.start();
|
||||||
message(" too many merge threads running; run merge in foreground");
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Too many merge threads already running, so we do
|
// This merge involves segments outside our index
|
||||||
// this in the foreground of the calling thread
|
// Directory so we must merge in foreground
|
||||||
doMerge(merge);
|
doMerge(merge);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -285,7 +297,8 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
synchronized(ConcurrentMergeScheduler.this) {
|
synchronized(ConcurrentMergeScheduler.this) {
|
||||||
mergeThreads.remove(this);
|
boolean removed = mergeThreads.remove(this);
|
||||||
|
assert removed;
|
||||||
ConcurrentMergeScheduler.this.notifyAll();
|
ConcurrentMergeScheduler.this.notifyAll();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue