LUCENE-6063: allow overriding whether/how ConcurrentMergeScheduler stalls incoming threads when merges are falling behind

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1640456 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael McCandless 2014-11-19 00:03:16 +00:00
parent ad79399db1
commit 90a64497dd
4 changed files with 73 additions and 29 deletions

View File

@ -98,6 +98,10 @@ New Features
* LUCENE-5929: Also extract terms to highlight from block join
queries. (Julie Tibshirani via Mike McCandless)
* LUCENE-6063: Allow overriding whether/how ConcurrentMergeScheduler
stalls incoming threads when merges are falling behind (Mike
McCandless)
API Changes
* LUCENE-5900: Deprecated more constructors taking Version in *InfixSuggester and

View File

@ -334,33 +334,7 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
// pending merges, until it's empty:
while (true) {
long startStallTime = 0;
while (writer.hasPendingMerges() && mergeThreadCount() >= maxMergeCount) {
// This means merging has fallen too far behind: we
// have already created maxMergeCount threads, and
// now there's at least one more merge pending.
// Note that only maxThreadCount of
// those created merge threads will actually be
// running; the rest will be paused (see
// updateMergeThreads). We stall this producer
// thread to prevent creation of new segments,
// until merging has caught up:
startStallTime = System.currentTimeMillis();
if (verbose()) {
message(" too many merges; stalling...");
}
try {
wait();
} catch (InterruptedException ie) {
throw new ThreadInterruptedException(ie);
}
}
if (verbose()) {
if (startStallTime != 0) {
message(" stalled for " + (System.currentTimeMillis()-startStallTime) + " msec");
}
}
maybeStall();
MergePolicy.OneMerge merge = writer.getNextMerge();
if (merge == null) {
@ -400,6 +374,44 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
}
}
/** This is invoked by {@link #merge} to possibly stall the incoming
* thread when there are too many merges running or pending. The
* default behavior is to force this thread, which is producing too
* many segments for merging to keep up, to wait until merges catch
* up. Applications that can take other less drastic measures, such
* as limiting how many threads are allowed to index, can do nothing
* here and throttle elsewhere. */
protected synchronized void maybeStall() {
long startStallTime = 0;
while (writer.hasPendingMerges() && mergeThreadCount() >= maxMergeCount) {
// This means merging has fallen too far behind: we
// have already created maxMergeCount threads, and
// now there's at least one more merge pending.
// Note that only maxThreadCount of
// those created merge threads will actually be
// running; the rest will be paused (see
// updateMergeThreads). We stall this producer
// thread to prevent creation of new segments,
// until merging has caught up:
startStallTime = System.currentTimeMillis();
if (verbose()) {
message(" too many merges; stalling...");
}
try {
wait();
} catch (InterruptedException ie) {
throw new ThreadInterruptedException(ie);
}
}
if (verbose()) {
if (startStallTime != 0) {
message(" stalled for " + (System.currentTimeMillis()-startStallTime) + " msec");
}
}
}
/** Does the actual merge, by calling {@link IndexWriter#merge} */
protected void doMerge(MergePolicy.OneMerge merge) throws IOException {
writer.merge(merge);

View File

@ -339,7 +339,6 @@ public class TestConcurrentMergeScheduler extends LuceneTestCase {
dir.close();
}
private static class TrackingCMS extends ConcurrentMergeScheduler {
long totMergedBytes;
CountDownLatch atLeastOneMerge;
@ -454,4 +453,24 @@ public class TestConcurrentMergeScheduler extends LuceneTestCase {
w.close();
d.close();
}
// LUCENE-6063
public void testMaybeStallCalled() throws Exception {
final AtomicBoolean wasCalled = new AtomicBoolean();
Directory dir = newDirectory();
IndexWriterConfig iwc = newIndexWriterConfig(new MockAnalyzer(random()));
iwc.setMergeScheduler(new ConcurrentMergeScheduler() {
@Override
protected void maybeStall() {
wasCalled.set(true);
}
});
IndexWriter w = new IndexWriter(dir, iwc);
w.addDocument(new Document());
w.forceMerge(1);
assertTrue(wasCalled.get());
w.close();
dir.close();
}
}

View File

@ -886,7 +886,16 @@ public abstract class LuceneTestCase extends Assert {
} else if (rarely(r)) {
int maxThreadCount = TestUtil.nextInt(r, 1, 4);
int maxMergeCount = TestUtil.nextInt(r, maxThreadCount, maxThreadCount + 4);
ConcurrentMergeScheduler cms = new ConcurrentMergeScheduler();
ConcurrentMergeScheduler cms;
if (r.nextBoolean()) {
cms = new ConcurrentMergeScheduler();
} else {
cms = new ConcurrentMergeScheduler() {
@Override
protected synchronized void maybeStall() {
}
};
}
cms.setMaxMergesAndThreads(maxMergeCount, maxThreadCount);
c.setMergeScheduler(cms);
}