LUCENE-4544: fix off-by-one in max number of CMS merge threads

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1408089 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael McCandless 2012-11-11 19:19:55 +00:00
parent e92475e10e
commit 89cbd54e08
6 changed files with 147 additions and 50 deletions

View File

@ -144,6 +144,10 @@ Bug Fixes
not synced. Instead, it now tracks an 'epoch' version, which is incremented
whenever the taxonomy is re-created, or replaced. (Shai Erera)
* LUCENE-4544: Fixed off-by-1 in ConcurrentMergeScheduler that would
allow 1+maxMergeCount merges threads to be created, instead of just
maxMergeCount (Radim Kolar, Mike McCandless)
Optimizations
* LUCENE-4536: PackedInts on-disk format is now byte-aligned (it used to be

View File

@ -302,7 +302,7 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
}
@Override
public void merge(IndexWriter writer) throws IOException {
public synchronized void merge(IndexWriter writer) throws IOException {
assert !Thread.holdsLock(writer);
@ -328,31 +328,34 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
// pending merges, until it's empty:
while (true) {
synchronized(this) {
long startStallTime = 0;
while (mergeThreadCount() >= 1+maxMergeCount) {
startStallTime = System.currentTimeMillis();
if (verbose()) {
message(" too many merges; stalling...");
}
try {
wait();
} catch (InterruptedException ie) {
throw new ThreadInterruptedException(ie);
}
}
long startStallTime = 0;
while (writer.hasPendingMerges() && mergeThreadCount() >= maxMergeCount) {
// This means merging has fallen too far behind: we
// have already created maxMergeCount threads, and
// now there's at least one more merge pending.
// Note that only maxThreadCount of
// those created merge threads will actually be
// running; the rest will be paused (see
// updateMergeThreads). We stall this producer
// thread to prevent creation of new segments,
// until merging has caught up:
startStallTime = System.currentTimeMillis();
if (verbose()) {
if (startStallTime != 0) {
message(" stalled for " + (System.currentTimeMillis()-startStallTime) + " msec");
}
message(" too many merges; stalling...");
}
try {
wait();
} catch (InterruptedException ie) {
throw new ThreadInterruptedException(ie);
}
}
if (verbose()) {
if (startStallTime != 0) {
message(" stalled for " + (System.currentTimeMillis()-startStallTime) + " msec");
}
}
// TODO: we could be careful about which merges to do in
// the BG (eg maybe the "biggest" ones) vs FG, which
// merges to do first (the easiest ones?), etc.
MergePolicy.OneMerge merge = writer.getNextMerge();
if (merge == null) {
if (verbose()) {
@ -361,34 +364,28 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
return;
}
// We do this w/ the primary thread to keep
// deterministic assignment of segment names
writer.mergeInit(merge);
boolean success = false;
try {
synchronized(this) {
if (verbose()) {
message(" consider merge " + writer.segString(merge.segments));
}
// OK to spawn a new merge thread to handle this
// merge:
final MergeThread merger = getMergeThread(writer, merge);
mergeThreads.add(merger);
if (verbose()) {
message(" launch new thread [" + merger.getName() + "]");
}
merger.start();
// Must call this after starting the thread else
// the new thread is removed from mergeThreads
// (since it's not alive yet):
updateMergeThreads();
success = true;
if (verbose()) {
message(" consider merge " + writer.segString(merge.segments));
}
// OK to spawn a new merge thread to handle this
// merge:
final MergeThread merger = getMergeThread(writer, merge);
mergeThreads.add(merger);
if (verbose()) {
message(" launch new thread [" + merger.getName() + "]");
}
merger.start();
// Must call this after starting the thread else
// the new thread is removed from mergeThreads
// (since it's not alive yet):
updateMergeThreads();
success = true;
} finally {
if (!success) {
writer.mergeFinish(merge);
@ -482,7 +479,6 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
// merge that writer says is necessary:
merge = tWriter.getNextMerge();
if (merge != null) {
tWriter.mergeInit(merge);
updateMergeThreads();
if (verbose()) {
message(" merge thread: do another merge " + tWriter.segString(merge.segments));
@ -546,4 +542,13 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
void clearSuppressExceptions() {
suppressExceptions = false;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder(getClass().getSimpleName() + ": ");
sb.append("maxThreadCount=").append(maxThreadCount).append(", ");
sb.append("maxMergeCount=").append(maxMergeCount).append(", ");
sb.append("mergeThreadPriority=").append(mergeThreadPriority);
return sb.toString();
}
}

View File

@ -1890,6 +1890,15 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
}
}
/**
* Expert: returns true if there are merges waiting to be scheduled.
*
* @lucene.experimental
*/
public synchronized boolean hasPendingMerges() {
return pendingMerges.size() != 0;
}
/**
* Close the <code>IndexWriter</code> without committing
* any changes that have occurred since the last commit
@ -2073,7 +2082,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
// they are aborted.
while(runningMerges.size() > 0) {
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "now wait for " + runningMerges.size() + " running merge to abort");
infoStream.message("IW", "now wait for " + runningMerges.size() + " running merge/s to abort");
}
doWait();
}

View File

@ -555,7 +555,7 @@ public class LiveIndexWriterConfig {
sb.append("commit=").append(commit == null ? "null" : commit).append("\n");
sb.append("openMode=").append(getOpenMode()).append("\n");
sb.append("similarity=").append(getSimilarity().getClass().getName()).append("\n");
sb.append("mergeScheduler=").append(getMergeScheduler().getClass().getName()).append("\n");
sb.append("mergeScheduler=").append(getMergeScheduler()).append("\n");
sb.append("default WRITE_LOCK_TIMEOUT=").append(IndexWriterConfig.WRITE_LOCK_TIMEOUT).append("\n");
sb.append("writeLockTimeout=").append(getWriteLockTimeout()).append("\n");
sb.append("codec=").append(getCodec()).append("\n");

View File

@ -18,14 +18,20 @@ package org.apache.lucene.index;
*/
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lucene.analysis.MockAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.TextField;
import org.apache.lucene.index.IndexWriterConfig.OpenMode;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.MockDirectoryWrapper;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util._TestUtil;
public class TestConcurrentMergeScheduler extends LuceneTestCase {
@ -245,4 +251,74 @@ public class TestConcurrentMergeScheduler extends LuceneTestCase {
directory.close();
}
// LUCENE-4544
public void testMaxMergeCount() throws Exception {
Directory dir = newDirectory();
IndexWriterConfig iwc = new IndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random()));
final int maxMergeCount = _TestUtil.nextInt(random(), 1, 5);
final int maxMergeThreads = _TestUtil.nextInt(random(), 1, maxMergeCount);
final CountDownLatch enoughMergesWaiting = new CountDownLatch(maxMergeCount);
final AtomicInteger runningMergeCount = new AtomicInteger(0);
final AtomicBoolean failed = new AtomicBoolean();
if (VERBOSE) {
System.out.println("TEST: maxMergeCount=" + maxMergeCount + " maxMergeThreads=" + maxMergeThreads);
}
ConcurrentMergeScheduler cms = new ConcurrentMergeScheduler() {
@Override
protected void doMerge(MergePolicy.OneMerge merge) throws IOException {
try {
// Stall all incoming merges until we see
// maxMergeCount:
int count = runningMergeCount.incrementAndGet();
try {
assertTrue("count=" + count + " vs maxMergeCount=" + maxMergeCount, count <= maxMergeCount);
enoughMergesWaiting.countDown();
// Stall this merge until we see exactly
// maxMergeCount merges waiting
while (true) {
if (enoughMergesWaiting.await(10, TimeUnit.MILLISECONDS) || failed.get()) {
break;
}
}
// Then sleep a bit to give a chance for the bug
// (too many pending merges) to appear:
Thread.sleep(20);
super.doMerge(merge);
} finally {
runningMergeCount.decrementAndGet();
}
} catch (Throwable t) {
failed.set(true);
writer.mergeFinish(merge);
throw new RuntimeException(t);
}
}
};
cms.setMaxThreadCount(maxMergeThreads);
cms.setMaxMergeCount(maxMergeCount);
iwc.setMergeScheduler(cms);
iwc.setMaxBufferedDocs(2);
TieredMergePolicy tmp = new TieredMergePolicy();
iwc.setMergePolicy(tmp);
tmp.setMaxMergeAtOnce(2);
tmp.setSegmentsPerTier(2);
IndexWriter w = new IndexWriter(dir, iwc);
Document doc = new Document();
doc.add(newField("field", "field", TextField.TYPE_NOT_STORED));
while(enoughMergesWaiting.getCount() != 0 && !failed.get()) {
for(int i=0;i<10;i++) {
w.addDocument(doc);
}
}
w.close(false);
dir.close();
}
}

View File

@ -424,7 +424,10 @@ public class TestIndexWriterExceptions extends LuceneTestCase {
public void testExceptionOnMergeInit() throws IOException {
Directory dir = newDirectory();
IndexWriterConfig conf = newIndexWriterConfig( TEST_VERSION_CURRENT, new MockAnalyzer(random()))
.setMaxBufferedDocs(2).setMergeScheduler(new ConcurrentMergeScheduler()).setMergePolicy(newLogMergePolicy());
.setMaxBufferedDocs(2).setMergePolicy(newLogMergePolicy());
ConcurrentMergeScheduler cms = new ConcurrentMergeScheduler();
cms.setSuppressExceptions();
conf.setMergeScheduler(cms);
((LogMergePolicy) conf.getMergePolicy()).setMergeFactor(2);
MockIndexWriter3 w = new MockIndexWriter3(dir, conf);
w.doFail = true;