diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt index 225df83ef8c..311b2b24eaf 100644 --- a/lucene/CHANGES.txt +++ b/lucene/CHANGES.txt @@ -144,6 +144,10 @@ Bug Fixes not synced. Instead, it now tracks an 'epoch' version, which is incremented whenever the taxonomy is re-created, or replaced. (Shai Erera) +* LUCENE-4544: Fixed off-by-1 in ConcurrentMergeScheduler that would + allow 1+maxMergeCount merges threads to be created, instead of just + maxMergeCount (Radim Kolar, Mike McCandless) + Optimizations * LUCENE-4536: PackedInts on-disk format is now byte-aligned (it used to be diff --git a/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java b/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java index e3539cf18ae..3e8ad9a4a97 100644 --- a/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java +++ b/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java @@ -302,7 +302,7 @@ public class ConcurrentMergeScheduler extends MergeScheduler { } @Override - public void merge(IndexWriter writer) throws IOException { + public synchronized void merge(IndexWriter writer) throws IOException { assert !Thread.holdsLock(writer); @@ -328,31 +328,34 @@ public class ConcurrentMergeScheduler extends MergeScheduler { // pending merges, until it's empty: while (true) { - synchronized(this) { - long startStallTime = 0; - while (mergeThreadCount() >= 1+maxMergeCount) { - startStallTime = System.currentTimeMillis(); - if (verbose()) { - message(" too many merges; stalling..."); - } - try { - wait(); - } catch (InterruptedException ie) { - throw new ThreadInterruptedException(ie); - } - } - + long startStallTime = 0; + while (writer.hasPendingMerges() && mergeThreadCount() >= maxMergeCount) { + // This means merging has fallen too far behind: we + // have already created maxMergeCount threads, and + // now there's at least one more merge pending. + // Note that only maxThreadCount of + // those created merge threads will actually be + // running; the rest will be paused (see + // updateMergeThreads). We stall this producer + // thread to prevent creation of new segments, + // until merging has caught up: + startStallTime = System.currentTimeMillis(); if (verbose()) { - if (startStallTime != 0) { - message(" stalled for " + (System.currentTimeMillis()-startStallTime) + " msec"); - } + message(" too many merges; stalling..."); + } + try { + wait(); + } catch (InterruptedException ie) { + throw new ThreadInterruptedException(ie); } } + if (verbose()) { + if (startStallTime != 0) { + message(" stalled for " + (System.currentTimeMillis()-startStallTime) + " msec"); + } + } - // TODO: we could be careful about which merges to do in - // the BG (eg maybe the "biggest" ones) vs FG, which - // merges to do first (the easiest ones?), etc. MergePolicy.OneMerge merge = writer.getNextMerge(); if (merge == null) { if (verbose()) { @@ -361,34 +364,28 @@ public class ConcurrentMergeScheduler extends MergeScheduler { return; } - // We do this w/ the primary thread to keep - // deterministic assignment of segment names - writer.mergeInit(merge); - boolean success = false; try { - synchronized(this) { - if (verbose()) { - message(" consider merge " + writer.segString(merge.segments)); - } - - // OK to spawn a new merge thread to handle this - // merge: - final MergeThread merger = getMergeThread(writer, merge); - mergeThreads.add(merger); - if (verbose()) { - message(" launch new thread [" + merger.getName() + "]"); - } - - merger.start(); - - // Must call this after starting the thread else - // the new thread is removed from mergeThreads - // (since it's not alive yet): - updateMergeThreads(); - - success = true; + if (verbose()) { + message(" consider merge " + writer.segString(merge.segments)); } + + // OK to spawn a new merge thread to handle this + // merge: + final MergeThread merger = getMergeThread(writer, merge); + mergeThreads.add(merger); + if (verbose()) { + message(" launch new thread [" + merger.getName() + "]"); + } + + merger.start(); + + // Must call this after starting the thread else + // the new thread is removed from mergeThreads + // (since it's not alive yet): + updateMergeThreads(); + + success = true; } finally { if (!success) { writer.mergeFinish(merge); @@ -482,7 +479,6 @@ public class ConcurrentMergeScheduler extends MergeScheduler { // merge that writer says is necessary: merge = tWriter.getNextMerge(); if (merge != null) { - tWriter.mergeInit(merge); updateMergeThreads(); if (verbose()) { message(" merge thread: do another merge " + tWriter.segString(merge.segments)); @@ -546,4 +542,13 @@ public class ConcurrentMergeScheduler extends MergeScheduler { void clearSuppressExceptions() { suppressExceptions = false; } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(getClass().getSimpleName() + ": "); + sb.append("maxThreadCount=").append(maxThreadCount).append(", "); + sb.append("maxMergeCount=").append(maxMergeCount).append(", "); + sb.append("mergeThreadPriority=").append(mergeThreadPriority); + return sb.toString(); + } } diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java index 04e61fbb4ea..145a50aefd8 100644 --- a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java +++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java @@ -1890,6 +1890,15 @@ public class IndexWriter implements Closeable, TwoPhaseCommit { } } + /** + * Expert: returns true if there are merges waiting to be scheduled. + * + * @lucene.experimental + */ + public synchronized boolean hasPendingMerges() { + return pendingMerges.size() != 0; + } + /** * Close the IndexWriter without committing * any changes that have occurred since the last commit @@ -2073,7 +2082,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit { // they are aborted. while(runningMerges.size() > 0) { if (infoStream.isEnabled("IW")) { - infoStream.message("IW", "now wait for " + runningMerges.size() + " running merge to abort"); + infoStream.message("IW", "now wait for " + runningMerges.size() + " running merge/s to abort"); } doWait(); } diff --git a/lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java b/lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java index 9201642a750..bb63a5d5e8d 100755 --- a/lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java +++ b/lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java @@ -555,7 +555,7 @@ public class LiveIndexWriterConfig { sb.append("commit=").append(commit == null ? "null" : commit).append("\n"); sb.append("openMode=").append(getOpenMode()).append("\n"); sb.append("similarity=").append(getSimilarity().getClass().getName()).append("\n"); - sb.append("mergeScheduler=").append(getMergeScheduler().getClass().getName()).append("\n"); + sb.append("mergeScheduler=").append(getMergeScheduler()).append("\n"); sb.append("default WRITE_LOCK_TIMEOUT=").append(IndexWriterConfig.WRITE_LOCK_TIMEOUT).append("\n"); sb.append("writeLockTimeout=").append(getWriteLockTimeout()).append("\n"); sb.append("codec=").append(getCodec()).append("\n"); diff --git a/lucene/core/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java b/lucene/core/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java index ee1ff1c6703..91d563f4764 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java @@ -18,14 +18,20 @@ package org.apache.lucene.index; */ import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.lucene.analysis.MockAnalyzer; import org.apache.lucene.document.Document; import org.apache.lucene.document.Field; +import org.apache.lucene.document.TextField; import org.apache.lucene.index.IndexWriterConfig.OpenMode; import org.apache.lucene.store.Directory; import org.apache.lucene.store.MockDirectoryWrapper; import org.apache.lucene.util.LuceneTestCase; +import org.apache.lucene.util._TestUtil; public class TestConcurrentMergeScheduler extends LuceneTestCase { @@ -245,4 +251,74 @@ public class TestConcurrentMergeScheduler extends LuceneTestCase { directory.close(); } + + // LUCENE-4544 + public void testMaxMergeCount() throws Exception { + Directory dir = newDirectory(); + IndexWriterConfig iwc = new IndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())); + + final int maxMergeCount = _TestUtil.nextInt(random(), 1, 5); + final int maxMergeThreads = _TestUtil.nextInt(random(), 1, maxMergeCount); + final CountDownLatch enoughMergesWaiting = new CountDownLatch(maxMergeCount); + final AtomicInteger runningMergeCount = new AtomicInteger(0); + final AtomicBoolean failed = new AtomicBoolean(); + + if (VERBOSE) { + System.out.println("TEST: maxMergeCount=" + maxMergeCount + " maxMergeThreads=" + maxMergeThreads); + } + + ConcurrentMergeScheduler cms = new ConcurrentMergeScheduler() { + + @Override + protected void doMerge(MergePolicy.OneMerge merge) throws IOException { + try { + // Stall all incoming merges until we see + // maxMergeCount: + int count = runningMergeCount.incrementAndGet(); + try { + assertTrue("count=" + count + " vs maxMergeCount=" + maxMergeCount, count <= maxMergeCount); + enoughMergesWaiting.countDown(); + + // Stall this merge until we see exactly + // maxMergeCount merges waiting + while (true) { + if (enoughMergesWaiting.await(10, TimeUnit.MILLISECONDS) || failed.get()) { + break; + } + } + // Then sleep a bit to give a chance for the bug + // (too many pending merges) to appear: + Thread.sleep(20); + super.doMerge(merge); + } finally { + runningMergeCount.decrementAndGet(); + } + } catch (Throwable t) { + failed.set(true); + writer.mergeFinish(merge); + throw new RuntimeException(t); + } + } + }; + cms.setMaxThreadCount(maxMergeThreads); + cms.setMaxMergeCount(maxMergeCount); + iwc.setMergeScheduler(cms); + iwc.setMaxBufferedDocs(2); + + TieredMergePolicy tmp = new TieredMergePolicy(); + iwc.setMergePolicy(tmp); + tmp.setMaxMergeAtOnce(2); + tmp.setSegmentsPerTier(2); + + IndexWriter w = new IndexWriter(dir, iwc); + Document doc = new Document(); + doc.add(newField("field", "field", TextField.TYPE_NOT_STORED)); + while(enoughMergesWaiting.getCount() != 0 && !failed.get()) { + for(int i=0;i<10;i++) { + w.addDocument(doc); + } + } + w.close(false); + dir.close(); + } } diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java index eba64752323..1642e2aab10 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java @@ -424,7 +424,10 @@ public class TestIndexWriterExceptions extends LuceneTestCase { public void testExceptionOnMergeInit() throws IOException { Directory dir = newDirectory(); IndexWriterConfig conf = newIndexWriterConfig( TEST_VERSION_CURRENT, new MockAnalyzer(random())) - .setMaxBufferedDocs(2).setMergeScheduler(new ConcurrentMergeScheduler()).setMergePolicy(newLogMergePolicy()); + .setMaxBufferedDocs(2).setMergePolicy(newLogMergePolicy()); + ConcurrentMergeScheduler cms = new ConcurrentMergeScheduler(); + cms.setSuppressExceptions(); + conf.setMergeScheduler(cms); ((LogMergePolicy) conf.getMergePolicy()).setMergeFactor(2); MockIndexWriter3 w = new MockIndexWriter3(dir, conf); w.doFail = true;