LUCENE-2164: make CMS smarter about prioritizing its threads

git-svn-id: https://svn.apache.org/repos/asf/lucene/java/trunk@892992 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael McCandless 2009-12-21 21:38:24 +00:00
parent 6e6acefb05
commit 7d35aafe71
6 changed files with 224 additions and 36 deletions

View File

@ -119,6 +119,14 @@ Optimizations
* LUCENE-2161: Improve concurrency of IndexReader, especially in the
context of near real-time readers. (Mike McCandless)
* LUCENE-2164: ConcurrentMergeScheduler has more control over merge
threads. First, it gives smaller merges higher thread priority than
larges ones. Second, a new set/getMaxMergeCount setting will pause
the larger merges to allow smaller ones to finish. The defaults for
these settings are now dynamic, depending the number CPU cores as
reported by Runtime.getRuntime().availableProcessors() (Mike
McCandless)
Build
* LUCENE-2124: Moved the JDK-based collation support from contrib/collation

View File

@ -22,6 +22,7 @@ import org.apache.lucene.benchmark.byTask.utils.Config;
import org.apache.lucene.index.IndexDeletionPolicy;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.MergeScheduler;
import org.apache.lucene.index.ConcurrentMergeScheduler;
import org.apache.lucene.index.MergePolicy;
import java.io.BufferedOutputStream;
@ -33,9 +34,15 @@ import java.io.PrintStream;
/**
* Create an index. <br>
* Other side effects: index writer object in perfRunData is set. <br>
* Relevant properties: <code>merge.factor, max.buffered,
* max.field.length, ram.flush.mb [default 0],
* [default true]</code>.
* Relevant properties: <code>merge.factor (default 10),
* max.buffered (default no flush), max.field.length (default
* 10,000 tokens), max.field.length, compound (default true), ram.flush.mb [default 0],
* merge.policy (default org.apache.lucene.index.LogByteSizeMergePolicy),
* merge.scheduler (default
* org.apache.lucene.index.ConcurrentMergeScheduler),
* concurrent.merge.scheduler.max.thread.count and
* concurrent.merge.scheduler.max.merge.count (defaults per
* ConcurrentMergeScheduler) </code>.
* <p>
* This task also supports a "writer.info.stream" property with the following
* values:
@ -66,6 +73,18 @@ public class CreateIndexTask extends PerfTask {
throw new RuntimeException("unable to instantiate class '" + mergeScheduler + "' as merge scheduler", e);
}
if (mergeScheduler.equals("org.apache.lucene.index.ConcurrentMergeScheduler")) {
ConcurrentMergeScheduler cms = (ConcurrentMergeScheduler) writer.getMergeScheduler();
int v = config.get("concurrent.merge.scheduler.max.thread.count", -1);
if (v != -1) {
cms.setMaxThreadCount(v);
}
v = config.get("concurrent.merge.scheduler.max.merge.count", -1);
if (v != -1) {
cms.setMaxMergeCount(v);
}
}
final String mergePolicy = config.get("merge.policy",
"org.apache.lucene.index.LogByteSizeMergePolicy");
try {

View File

@ -23,24 +23,45 @@ import org.apache.lucene.util.ThreadInterruptedException;
import java.io.IOException;
import java.util.List;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Collections;
/** A {@link MergeScheduler} that runs each merge using a
* separate thread, up until a maximum number of threads
* ({@link #setMaxThreadCount}) at which when a merge is
* needed, the thread(s) that are updating the index will
* pause until one or more merges completes. This is a
* simple way to use concurrency in the indexing process
* without having to create and manage application level
* threads. */
* separate thread.
*
* <p>Specify the max number of threads that may run at
* once with {@link #setMaxThreadCount}.</p>
*
* <p>Separately specify the maximum number of simultaneous
* merges with {@link #setMaxMergeCount}. If the number of
* merges exceeds the max number of threads then the
* largest merges are paused until one of the smaller
* merges completes.</p>
*
* <p>If more than {@link #getMaxMergeCount} merges are
* requested then this class will forcefully throttle the
* incoming threads by pausing until one more more merges
* complete.</p>
*/
public class ConcurrentMergeScheduler extends MergeScheduler {
private int mergeThreadPriority = -1;
protected List<MergeThread> mergeThreads = new ArrayList<MergeThread>();
// Max number of threads allowed to be merging at once
private int maxThreadCount = 1;
// Max number of merge threads allowed to be running at
// once. When there are more merges then this, we
// forcefully pause the larger ones, letting the smaller
// ones run, up until maxMergeCount merges at which point
// we forcefully pause incoming threads (that presumably
// are the ones causing so much merging). We dynamically
// default this from 1 to 3, depending on how many cores
// you have:
private int maxThreadCount = Math.max(1, Math.min(3, Runtime.getRuntime().availableProcessors()/2));
// Max number of merges we accept before forcefully
// throttling the incoming threads
private int maxMergeCount = maxThreadCount+2;
protected Directory dir;
@ -55,23 +76,45 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
}
}
/** Sets the max # simultaneous threads that may be
* running. If a merge is necessary yet we already have
* this many threads running, the incoming thread (that
* is calling add/updateDocument) will block until
* a merge thread has completed. */
/** Sets the max # simultaneous merge threads that should
* be running at once. This must be <= {@link
* #setMaxMergeCount}. */
public void setMaxThreadCount(int count) {
if (count < 1)
if (count < 1) {
throw new IllegalArgumentException("count should be at least 1");
}
if (count > maxMergeCount) {
throw new IllegalArgumentException("count should be <= maxMergeCount (= " + maxMergeCount + ")");
}
maxThreadCount = count;
}
/** Get the max # simultaneous threads that may be
* running. @see #setMaxThreadCount. */
/** @see #setMaxThreadCount. */
public int getMaxThreadCount() {
return maxThreadCount;
}
/** Sets the max # simultaneous merges that are allowed.
* If a merge is necessary yet we already have this many
* threads running, the incoming thread (that is calling
* add/updateDocument) will block until a merge thread
* has completed. Note that we will only run the
* smallest {@link #setMaxThreadCount} merges at a time. */
public void setMaxMergeCount(int count) {
if (count < 1) {
throw new IllegalArgumentException("count should be at least 1");
}
if (count < maxThreadCount) {
throw new IllegalArgumentException("count should be >= maxThreadCount (= " + maxThreadCount + ")");
}
maxMergeCount = count;
}
/** See {@link #setMaxMergeCount}. */
public int getMaxMergeCount() {
return maxMergeCount;
}
/** Return the priority that merge threads run at. By
* default the priority is 1 plus the priority of (ie,
* slightly higher priority than) the first thread that
@ -81,16 +124,73 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
return mergeThreadPriority;
}
/** Set the priority that merge threads run at. */
/** Set the base priority that merge threads run at.
* Note that CMS may increase priority of some merge
* threads beyond this base priority. It's best not to
* set this any higher than
* Thread.MAX_PRIORITY-maxThreadCount, so that CMS has
* room to set relative priority among threads. */
public synchronized void setMergeThreadPriority(int pri) {
if (pri > Thread.MAX_PRIORITY || pri < Thread.MIN_PRIORITY)
throw new IllegalArgumentException("priority must be in range " + Thread.MIN_PRIORITY + " .. " + Thread.MAX_PRIORITY + " inclusive");
mergeThreadPriority = pri;
updateMergeThreads();
}
final int numThreads = mergeThreadCount();
for(int i=0;i<numThreads;i++) {
MergeThread merge = mergeThreads.get(i);
merge.setThreadPriority(pri);
// Larger merges come first
protected static class CompareByMergeDocCount implements Comparator<MergeThread> {
public int compare(MergeThread t1, MergeThread t2) {
final MergePolicy.OneMerge m1 = t1.getCurrentMerge();
final MergePolicy.OneMerge m2 = t2.getCurrentMerge();
final int c1 = m1 == null ? Integer.MAX_VALUE : m1.segments.totalDocCount();
final int c2 = m2 == null ? Integer.MAX_VALUE : m2.segments.totalDocCount();
return c2 - c1;
}
}
/** Called whenever the running merges have changed, to
* pause & unpause threads. */
protected synchronized void updateMergeThreads() {
Collections.sort(mergeThreads, new CompareByMergeDocCount());
final int count = mergeThreads.size();
int pri = mergeThreadPriority;
for(int i=0;i<count;i++) {
final MergeThread mergeThread = mergeThreads.get(i);
final MergePolicy.OneMerge merge = mergeThread.getCurrentMerge();
if (merge == null) {
continue;
}
final boolean doPause;
if (i < count-maxThreadCount) {
doPause = true;
} else {
doPause = false;
}
if (verbose()) {
if (doPause != merge.getPause()) {
if (doPause) {
message("pause thread " + mergeThread.getName());
} else {
message("unpause thread " + mergeThread.getName());
}
}
}
if (doPause != merge.getPause()) {
merge.setPause(doPause);
}
if (!doPause) {
if (verbose()) {
message("set priority of merge thread " + mergeThread.getName() + " to " + pri);
}
mergeThread.setThreadPriority(pri);
pri = Math.min(Thread.MAX_PRIORITY, 1+pri);
}
}
}
@ -192,9 +292,12 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
try {
synchronized(this) {
final MergeThread merger;
while (mergeThreadCount() >= maxThreadCount) {
if (verbose())
message(" too many merge threads running; stalling...");
long startStallTime = 0;
while (mergeThreadCount() >= maxMergeCount) {
startStallTime = System.currentTimeMillis();
if (verbose()) {
message(" too many merges; stalling...");
}
try {
wait();
} catch (InterruptedException ie) {
@ -202,15 +305,20 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
}
}
if (verbose())
if (verbose()) {
if (startStallTime != 0) {
message(" stalled for " + (System.currentTimeMillis()-startStallTime) + " msec");
}
message(" consider merge " + merge.segString(dir));
assert mergeThreadCount() < maxThreadCount;
}
assert mergeThreadCount() < maxMergeCount;
// OK to spawn a new merge thread to handle this
// merge:
merger = getMergeThread(writer, merge);
mergeThreads.add(merger);
updateMergeThreads();
if (verbose())
message(" launch new thread [" + merger.getName() + "]");
@ -245,6 +353,7 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
IndexWriter writer;
MergePolicy.OneMerge startMerge;
MergePolicy.OneMerge runningMerge;
private volatile boolean done;
public MergeThread(IndexWriter writer, MergePolicy.OneMerge startMerge) throws IOException {
this.writer = writer;
@ -259,6 +368,16 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
return runningMerge;
}
public synchronized MergePolicy.OneMerge getCurrentMerge() {
if (done) {
return null;
} else if (runningMerge != null) {
return runningMerge;
} else {
return startMerge;
}
}
public void setThreadPriority(int pri) {
try {
setPriority(pri);
@ -292,10 +411,14 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
merge = writer.getNextMerge();
if (merge != null) {
writer.mergeInit(merge);
updateMergeThreads();
if (verbose())
message(" merge thread: do another merge " + merge.segString(dir));
} else
} else {
done = true;
updateMergeThreads();
break;
}
}
if (verbose())
@ -317,6 +440,7 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
ConcurrentMergeScheduler.this.notifyAll();
boolean removed = mergeThreads.remove(this);
assert removed;
updateMergeThreads();
}
}
}

View File

@ -3944,7 +3944,7 @@ public class IndexWriter implements Closeable {
handleOOM(oom, "merge");
}
if (infoStream != null) {
message("merge time " + (System.currentTimeMillis()-t0) + " msec");
message("merge time " + (System.currentTimeMillis()-t0) + " msec for " + merge.info.docCount + " docs");
}
}

View File

@ -85,6 +85,7 @@ public abstract class MergePolicy implements java.io.Closeable {
final boolean useCompoundFile;
boolean aborted;
Throwable error;
boolean paused;
public OneMerge(SegmentInfos segments, boolean useCompoundFile) {
if (0 == segments.size())
@ -110,6 +111,7 @@ public abstract class MergePolicy implements java.io.Closeable {
* not be committed. */
synchronized void abort() {
aborted = true;
notifyAll();
}
/** Returns true if this merge was aborted. */
@ -118,8 +120,34 @@ public abstract class MergePolicy implements java.io.Closeable {
}
synchronized void checkAborted(Directory dir) throws MergeAbortedException {
if (aborted)
if (aborted) {
throw new MergeAbortedException("merge is aborted: " + segString(dir));
}
while (paused) {
try {
// In theory we could wait() indefinitely, but we
// do 1000 msec, defensively
wait(1000);
} catch (InterruptedException ie) {
throw new RuntimeException(ie);
}
if (aborted) {
throw new MergeAbortedException("merge is aborted: " + segString(dir));
}
}
}
synchronized public void setPause(boolean paused) {
this.paused = paused;
if (!paused) {
// Wakeup merge thread, if it's waiting
notifyAll();
}
}
synchronized public boolean getPause() {
return paused;
}
String segString(Directory dir) {
@ -262,5 +290,4 @@ public abstract class MergePolicy implements java.io.Closeable {
* compound file format.
*/
public abstract boolean useCompoundDocStore(SegmentInfos segments);
}

View File

@ -911,4 +911,14 @@ public final class SegmentInfos extends Vector<SegmentInfo> {
return true;
return false;
}
/** Returns sum of all segment's docCounts. Note that
* this does not include deletions */
public int totalDocCount() {
int count = 0;
for(SegmentInfo info : this) {
count += info.docCount;
}
return count;
}
}