diff --git a/CHANGES.txt b/CHANGES.txt
index 52cd40b7b35..8bfe9da0181 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -119,6 +119,14 @@ Optimizations
* LUCENE-2161: Improve concurrency of IndexReader, especially in the
context of near real-time readers. (Mike McCandless)
+* LUCENE-2164: ConcurrentMergeScheduler has more control over merge
+ threads. First, it gives smaller merges higher thread priority than
+ larges ones. Second, a new set/getMaxMergeCount setting will pause
+ the larger merges to allow smaller ones to finish. The defaults for
+ these settings are now dynamic, depending the number CPU cores as
+ reported by Runtime.getRuntime().availableProcessors() (Mike
+ McCandless)
+
Build
* LUCENE-2124: Moved the JDK-based collation support from contrib/collation
diff --git a/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/CreateIndexTask.java b/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/CreateIndexTask.java
index 5c8cfc92719..91c0feb1436 100644
--- a/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/CreateIndexTask.java
+++ b/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/CreateIndexTask.java
@@ -22,6 +22,7 @@ import org.apache.lucene.benchmark.byTask.utils.Config;
import org.apache.lucene.index.IndexDeletionPolicy;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.MergeScheduler;
+import org.apache.lucene.index.ConcurrentMergeScheduler;
import org.apache.lucene.index.MergePolicy;
import java.io.BufferedOutputStream;
@@ -33,9 +34,15 @@ import java.io.PrintStream;
/**
* Create an index.
* Other side effects: index writer object in perfRunData is set.
- * Relevant properties: merge.factor, max.buffered,
- * max.field.length, ram.flush.mb [default 0],
- * [default true]
.
+ * Relevant properties: merge.factor (default 10),
+ * max.buffered (default no flush), max.field.length (default
+ * 10,000 tokens), max.field.length, compound (default true), ram.flush.mb [default 0],
+ * merge.policy (default org.apache.lucene.index.LogByteSizeMergePolicy),
+ * merge.scheduler (default
+ * org.apache.lucene.index.ConcurrentMergeScheduler),
+ * concurrent.merge.scheduler.max.thread.count and
+ * concurrent.merge.scheduler.max.merge.count (defaults per
+ * ConcurrentMergeScheduler)
.
*
* This task also supports a "writer.info.stream" property with the following
* values:
@@ -66,6 +73,18 @@ public class CreateIndexTask extends PerfTask {
throw new RuntimeException("unable to instantiate class '" + mergeScheduler + "' as merge scheduler", e);
}
+ if (mergeScheduler.equals("org.apache.lucene.index.ConcurrentMergeScheduler")) {
+ ConcurrentMergeScheduler cms = (ConcurrentMergeScheduler) writer.getMergeScheduler();
+ int v = config.get("concurrent.merge.scheduler.max.thread.count", -1);
+ if (v != -1) {
+ cms.setMaxThreadCount(v);
+ }
+ v = config.get("concurrent.merge.scheduler.max.merge.count", -1);
+ if (v != -1) {
+ cms.setMaxMergeCount(v);
+ }
+ }
+
final String mergePolicy = config.get("merge.policy",
"org.apache.lucene.index.LogByteSizeMergePolicy");
try {
diff --git a/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java b/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java
index 25acbbb297c..af322b299d2 100644
--- a/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java
+++ b/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java
@@ -23,24 +23,45 @@ import org.apache.lucene.util.ThreadInterruptedException;
import java.io.IOException;
import java.util.List;
import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Collections;
/** A {@link MergeScheduler} that runs each merge using a
- * separate thread, up until a maximum number of threads
- * ({@link #setMaxThreadCount}) at which when a merge is
- * needed, the thread(s) that are updating the index will
- * pause until one or more merges completes. This is a
- * simple way to use concurrency in the indexing process
- * without having to create and manage application level
- * threads. */
-
+ * separate thread.
+ *
+ *
Specify the max number of threads that may run at
+ * once with {@link #setMaxThreadCount}.
+ *
+ * Separately specify the maximum number of simultaneous
+ * merges with {@link #setMaxMergeCount}. If the number of
+ * merges exceeds the max number of threads then the
+ * largest merges are paused until one of the smaller
+ * merges completes.
+ *
+ * If more than {@link #getMaxMergeCount} merges are
+ * requested then this class will forcefully throttle the
+ * incoming threads by pausing until one more more merges
+ * complete.
+ */
public class ConcurrentMergeScheduler extends MergeScheduler {
private int mergeThreadPriority = -1;
protected List mergeThreads = new ArrayList();
- // Max number of threads allowed to be merging at once
- private int maxThreadCount = 1;
+ // Max number of merge threads allowed to be running at
+ // once. When there are more merges then this, we
+ // forcefully pause the larger ones, letting the smaller
+ // ones run, up until maxMergeCount merges at which point
+ // we forcefully pause incoming threads (that presumably
+ // are the ones causing so much merging). We dynamically
+ // default this from 1 to 3, depending on how many cores
+ // you have:
+ private int maxThreadCount = Math.max(1, Math.min(3, Runtime.getRuntime().availableProcessors()/2));
+
+ // Max number of merges we accept before forcefully
+ // throttling the incoming threads
+ private int maxMergeCount = maxThreadCount+2;
protected Directory dir;
@@ -55,23 +76,45 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
}
}
- /** Sets the max # simultaneous threads that may be
- * running. If a merge is necessary yet we already have
- * this many threads running, the incoming thread (that
- * is calling add/updateDocument) will block until
- * a merge thread has completed. */
+ /** Sets the max # simultaneous merge threads that should
+ * be running at once. This must be <= {@link
+ * #setMaxMergeCount}. */
public void setMaxThreadCount(int count) {
- if (count < 1)
+ if (count < 1) {
throw new IllegalArgumentException("count should be at least 1");
+ }
+ if (count > maxMergeCount) {
+ throw new IllegalArgumentException("count should be <= maxMergeCount (= " + maxMergeCount + ")");
+ }
maxThreadCount = count;
}
- /** Get the max # simultaneous threads that may be
- * running. @see #setMaxThreadCount. */
+ /** @see #setMaxThreadCount. */
public int getMaxThreadCount() {
return maxThreadCount;
}
+ /** Sets the max # simultaneous merges that are allowed.
+ * If a merge is necessary yet we already have this many
+ * threads running, the incoming thread (that is calling
+ * add/updateDocument) will block until a merge thread
+ * has completed. Note that we will only run the
+ * smallest {@link #setMaxThreadCount} merges at a time. */
+ public void setMaxMergeCount(int count) {
+ if (count < 1) {
+ throw new IllegalArgumentException("count should be at least 1");
+ }
+ if (count < maxThreadCount) {
+ throw new IllegalArgumentException("count should be >= maxThreadCount (= " + maxThreadCount + ")");
+ }
+ maxMergeCount = count;
+ }
+
+ /** See {@link #setMaxMergeCount}. */
+ public int getMaxMergeCount() {
+ return maxMergeCount;
+ }
+
/** Return the priority that merge threads run at. By
* default the priority is 1 plus the priority of (ie,
* slightly higher priority than) the first thread that
@@ -81,16 +124,73 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
return mergeThreadPriority;
}
- /** Set the priority that merge threads run at. */
+ /** Set the base priority that merge threads run at.
+ * Note that CMS may increase priority of some merge
+ * threads beyond this base priority. It's best not to
+ * set this any higher than
+ * Thread.MAX_PRIORITY-maxThreadCount, so that CMS has
+ * room to set relative priority among threads. */
public synchronized void setMergeThreadPriority(int pri) {
if (pri > Thread.MAX_PRIORITY || pri < Thread.MIN_PRIORITY)
throw new IllegalArgumentException("priority must be in range " + Thread.MIN_PRIORITY + " .. " + Thread.MAX_PRIORITY + " inclusive");
mergeThreadPriority = pri;
+ updateMergeThreads();
+ }
- final int numThreads = mergeThreadCount();
- for(int i=0;i {
+ public int compare(MergeThread t1, MergeThread t2) {
+ final MergePolicy.OneMerge m1 = t1.getCurrentMerge();
+ final MergePolicy.OneMerge m2 = t2.getCurrentMerge();
+
+ final int c1 = m1 == null ? Integer.MAX_VALUE : m1.segments.totalDocCount();
+ final int c2 = m2 == null ? Integer.MAX_VALUE : m2.segments.totalDocCount();
+
+ return c2 - c1;
+ }
+ }
+
+ /** Called whenever the running merges have changed, to
+ * pause & unpause threads. */
+ protected synchronized void updateMergeThreads() {
+
+ Collections.sort(mergeThreads, new CompareByMergeDocCount());
+
+ final int count = mergeThreads.size();
+ int pri = mergeThreadPriority;
+ for(int i=0;i= maxThreadCount) {
- if (verbose())
- message(" too many merge threads running; stalling...");
+ long startStallTime = 0;
+ while (mergeThreadCount() >= maxMergeCount) {
+ startStallTime = System.currentTimeMillis();
+ if (verbose()) {
+ message(" too many merges; stalling...");
+ }
try {
wait();
} catch (InterruptedException ie) {
@@ -202,15 +305,20 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
}
}
- if (verbose())
+ if (verbose()) {
+ if (startStallTime != 0) {
+ message(" stalled for " + (System.currentTimeMillis()-startStallTime) + " msec");
+ }
message(" consider merge " + merge.segString(dir));
-
- assert mergeThreadCount() < maxThreadCount;
+ }
+
+ assert mergeThreadCount() < maxMergeCount;
// OK to spawn a new merge thread to handle this
// merge:
merger = getMergeThread(writer, merge);
mergeThreads.add(merger);
+ updateMergeThreads();
if (verbose())
message(" launch new thread [" + merger.getName() + "]");
@@ -245,6 +353,7 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
IndexWriter writer;
MergePolicy.OneMerge startMerge;
MergePolicy.OneMerge runningMerge;
+ private volatile boolean done;
public MergeThread(IndexWriter writer, MergePolicy.OneMerge startMerge) throws IOException {
this.writer = writer;
@@ -259,6 +368,16 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
return runningMerge;
}
+ public synchronized MergePolicy.OneMerge getCurrentMerge() {
+ if (done) {
+ return null;
+ } else if (runningMerge != null) {
+ return runningMerge;
+ } else {
+ return startMerge;
+ }
+ }
+
public void setThreadPriority(int pri) {
try {
setPriority(pri);
@@ -292,10 +411,14 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
merge = writer.getNextMerge();
if (merge != null) {
writer.mergeInit(merge);
+ updateMergeThreads();
if (verbose())
message(" merge thread: do another merge " + merge.segString(dir));
- } else
+ } else {
+ done = true;
+ updateMergeThreads();
break;
+ }
}
if (verbose())
@@ -317,6 +440,7 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
ConcurrentMergeScheduler.this.notifyAll();
boolean removed = mergeThreads.remove(this);
assert removed;
+ updateMergeThreads();
}
}
}
diff --git a/src/java/org/apache/lucene/index/IndexWriter.java b/src/java/org/apache/lucene/index/IndexWriter.java
index d5661b0085d..de92b194ca2 100644
--- a/src/java/org/apache/lucene/index/IndexWriter.java
+++ b/src/java/org/apache/lucene/index/IndexWriter.java
@@ -3944,7 +3944,7 @@ public class IndexWriter implements Closeable {
handleOOM(oom, "merge");
}
if (infoStream != null) {
- message("merge time " + (System.currentTimeMillis()-t0) + " msec");
+ message("merge time " + (System.currentTimeMillis()-t0) + " msec for " + merge.info.docCount + " docs");
}
}
diff --git a/src/java/org/apache/lucene/index/MergePolicy.java b/src/java/org/apache/lucene/index/MergePolicy.java
index 0ffc729363d..bb2808999e9 100644
--- a/src/java/org/apache/lucene/index/MergePolicy.java
+++ b/src/java/org/apache/lucene/index/MergePolicy.java
@@ -85,6 +85,7 @@ public abstract class MergePolicy implements java.io.Closeable {
final boolean useCompoundFile;
boolean aborted;
Throwable error;
+ boolean paused;
public OneMerge(SegmentInfos segments, boolean useCompoundFile) {
if (0 == segments.size())
@@ -110,6 +111,7 @@ public abstract class MergePolicy implements java.io.Closeable {
* not be committed. */
synchronized void abort() {
aborted = true;
+ notifyAll();
}
/** Returns true if this merge was aborted. */
@@ -118,8 +120,34 @@ public abstract class MergePolicy implements java.io.Closeable {
}
synchronized void checkAborted(Directory dir) throws MergeAbortedException {
- if (aborted)
+ if (aborted) {
throw new MergeAbortedException("merge is aborted: " + segString(dir));
+ }
+
+ while (paused) {
+ try {
+ // In theory we could wait() indefinitely, but we
+ // do 1000 msec, defensively
+ wait(1000);
+ } catch (InterruptedException ie) {
+ throw new RuntimeException(ie);
+ }
+ if (aborted) {
+ throw new MergeAbortedException("merge is aborted: " + segString(dir));
+ }
+ }
+ }
+
+ synchronized public void setPause(boolean paused) {
+ this.paused = paused;
+ if (!paused) {
+ // Wakeup merge thread, if it's waiting
+ notifyAll();
+ }
+ }
+
+ synchronized public boolean getPause() {
+ return paused;
}
String segString(Directory dir) {
@@ -262,5 +290,4 @@ public abstract class MergePolicy implements java.io.Closeable {
* compound file format.
*/
public abstract boolean useCompoundDocStore(SegmentInfos segments);
-
}
diff --git a/src/java/org/apache/lucene/index/SegmentInfos.java b/src/java/org/apache/lucene/index/SegmentInfos.java
index 1fa6b86d915..15cb2d85e59 100644
--- a/src/java/org/apache/lucene/index/SegmentInfos.java
+++ b/src/java/org/apache/lucene/index/SegmentInfos.java
@@ -911,4 +911,14 @@ public final class SegmentInfos extends Vector {
return true;
return false;
}
+
+ /** Returns sum of all segment's docCounts. Note that
+ * this does not include deletions */
+ public int totalDocCount() {
+ int count = 0;
+ for(SegmentInfo info : this) {
+ count += info.docCount;
+ }
+ return count;
+ }
}