From 90a64497dd0a903eb0797ed0d95d47969d640387 Mon Sep 17 00:00:00 2001 From: Michael McCandless Date: Wed, 19 Nov 2014 00:03:16 +0000 Subject: [PATCH] LUCENE-6063: allow overriding whether/how ConcurrentMergeScheduler stalls incoming threads when merges are falling behind git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1640456 13f79535-47bb-0310-9956-ffa450edef68 --- lucene/CHANGES.txt | 4 ++ .../index/ConcurrentMergeScheduler.java | 66 +++++++++++-------- .../index/TestConcurrentMergeScheduler.java | 21 +++++- .../apache/lucene/util/LuceneTestCase.java | 11 +++- 4 files changed, 73 insertions(+), 29 deletions(-) diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt index bcd916bae82..73576493174 100644 --- a/lucene/CHANGES.txt +++ b/lucene/CHANGES.txt @@ -98,6 +98,10 @@ New Features * LUCENE-5929: Also extract terms to highlight from block join queries. (Julie Tibshirani via Mike McCandless) +* LUCENE-6063: Allow overriding whether/how ConcurrentMergeScheduler + stalls incoming threads when merges are falling behind (Mike + McCandless) + API Changes * LUCENE-5900: Deprecated more constructors taking Version in *InfixSuggester and diff --git a/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java b/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java index 2dbe25f4545..256002a160b 100644 --- a/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java +++ b/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java @@ -334,33 +334,7 @@ public class ConcurrentMergeScheduler extends MergeScheduler { // pending merges, until it's empty: while (true) { - long startStallTime = 0; - while (writer.hasPendingMerges() && mergeThreadCount() >= maxMergeCount) { - // This means merging has fallen too far behind: we - // have already created maxMergeCount threads, and - // now there's at least one more merge pending. - // Note that only maxThreadCount of - // those created merge threads will actually be - // running; the rest will be paused (see - // updateMergeThreads). We stall this producer - // thread to prevent creation of new segments, - // until merging has caught up: - startStallTime = System.currentTimeMillis(); - if (verbose()) { - message(" too many merges; stalling..."); - } - try { - wait(); - } catch (InterruptedException ie) { - throw new ThreadInterruptedException(ie); - } - } - - if (verbose()) { - if (startStallTime != 0) { - message(" stalled for " + (System.currentTimeMillis()-startStallTime) + " msec"); - } - } + maybeStall(); MergePolicy.OneMerge merge = writer.getNextMerge(); if (merge == null) { @@ -400,6 +374,44 @@ public class ConcurrentMergeScheduler extends MergeScheduler { } } + /** This is invoked by {@link #merge} to possibly stall the incoming + * thread when there are too many merges running or pending. The + * default behavior is to force this thread, which is producing too + * many segments for merging to keep up, to wait until merges catch + * up. Applications that can take other less drastic measures, such + * as limiting how many threads are allowed to index, can do nothing + * here and throttle elsewhere. */ + + protected synchronized void maybeStall() { + long startStallTime = 0; + while (writer.hasPendingMerges() && mergeThreadCount() >= maxMergeCount) { + // This means merging has fallen too far behind: we + // have already created maxMergeCount threads, and + // now there's at least one more merge pending. + // Note that only maxThreadCount of + // those created merge threads will actually be + // running; the rest will be paused (see + // updateMergeThreads). We stall this producer + // thread to prevent creation of new segments, + // until merging has caught up: + startStallTime = System.currentTimeMillis(); + if (verbose()) { + message(" too many merges; stalling..."); + } + try { + wait(); + } catch (InterruptedException ie) { + throw new ThreadInterruptedException(ie); + } + } + + if (verbose()) { + if (startStallTime != 0) { + message(" stalled for " + (System.currentTimeMillis()-startStallTime) + " msec"); + } + } + } + /** Does the actual merge, by calling {@link IndexWriter#merge} */ protected void doMerge(MergePolicy.OneMerge merge) throws IOException { writer.merge(merge); diff --git a/lucene/core/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java b/lucene/core/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java index f691ed006be..7935db70751 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java @@ -339,7 +339,6 @@ public class TestConcurrentMergeScheduler extends LuceneTestCase { dir.close(); } - private static class TrackingCMS extends ConcurrentMergeScheduler { long totMergedBytes; CountDownLatch atLeastOneMerge; @@ -454,4 +453,24 @@ public class TestConcurrentMergeScheduler extends LuceneTestCase { w.close(); d.close(); } + + // LUCENE-6063 + public void testMaybeStallCalled() throws Exception { + final AtomicBoolean wasCalled = new AtomicBoolean(); + Directory dir = newDirectory(); + IndexWriterConfig iwc = newIndexWriterConfig(new MockAnalyzer(random())); + iwc.setMergeScheduler(new ConcurrentMergeScheduler() { + @Override + protected void maybeStall() { + wasCalled.set(true); + } + }); + IndexWriter w = new IndexWriter(dir, iwc); + w.addDocument(new Document()); + w.forceMerge(1); + assertTrue(wasCalled.get()); + + w.close(); + dir.close(); + } } diff --git a/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java b/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java index 970be09863f..0f64def08f4 100644 --- a/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java +++ b/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java @@ -886,7 +886,16 @@ public abstract class LuceneTestCase extends Assert { } else if (rarely(r)) { int maxThreadCount = TestUtil.nextInt(r, 1, 4); int maxMergeCount = TestUtil.nextInt(r, maxThreadCount, maxThreadCount + 4); - ConcurrentMergeScheduler cms = new ConcurrentMergeScheduler(); + ConcurrentMergeScheduler cms; + if (r.nextBoolean()) { + cms = new ConcurrentMergeScheduler(); + } else { + cms = new ConcurrentMergeScheduler() { + @Override + protected synchronized void maybeStall() { + } + }; + } cms.setMaxMergesAndThreads(maxMergeCount, maxThreadCount); c.setMergeScheduler(cms); }