LUCENE-9337: Ensure CMS updates it's thread accounting datastructures consistently (#1443)

CMS today releases it's lock after finishing a merge before it re-acquires it to update
the thread accounting datastructures. This causes threading issues where concurrently
finishing threads fail to pick up pending merges causing potential thread starvation
on forceMerge calls.
This commit is contained in:
Simon Willnauer 2020-04-22 14:30:14 +02:00 committed by GitHub
parent e0c06ee6a6
commit 2b6ae53cd9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 120 additions and 25 deletions

View File

@ -185,6 +185,12 @@ Bug Fixes
* LUCENE-9309: Wait for #addIndexes merges when aborting merges. (Simon Willnauer)
* LUCENE-9337: Ensure CMS updates it's thread accounting datastructures consistently.
CMS today releases it's lock after finishing a merge before it re-acquires it to update
the thread accounting datastructures. This causes threading issues where concurrently
finishing threads fail to pick up pending merges causing potential thread starvation on
forceMerge calls. (Simon Willnauer)
Other
---------------------

View File

@ -18,6 +18,7 @@ package org.apache.lucene.index;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
@ -633,6 +634,27 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
return thread;
}
synchronized void runOnMergeFinished(IndexWriter writer) {
// the merge call as well as the merge thread handling in the finally
// block must be sync'd on CMS otherwise stalling decisions might cause
// us to miss pending merges
assert mergeThreads.contains(Thread.currentThread()) : "caller is not a merge thread";
// Let CMS run new merges if necessary:
try {
merge(writer, MergeTrigger.MERGE_FINISHED, true);
} catch (AlreadyClosedException ace) {
// OK
} catch (IOException ioe) {
throw new UncheckedIOException(ioe);
} finally {
removeMergeThread();
updateMergeThreads();
// In case we had stalled indexing, we can now wake up
// and possibly unstall:
notifyAll();
}
}
/** Runs a merge thread to execute a single merge, then exits. */
protected class MergeThread extends Thread implements Comparable<MergeThread> {
final IndexWriter writer;
@ -664,18 +686,8 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
if (verbose()) {
message(" merge thread: done");
}
// Let CMS run new merges if necessary:
try {
merge(writer, MergeTrigger.MERGE_FINISHED, true);
} catch (AlreadyClosedException ace) {
// OK
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
runOnMergeFinished(writer);
} catch (Throwable exc) {
if (exc instanceof MergePolicy.MergeAbortedException) {
// OK to ignore
} else if (suppressExceptions == false) {
@ -683,17 +695,6 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
// testing.
handleMergeException(writer.getDirectory(), exc);
}
} finally {
synchronized(ConcurrentMergeScheduler.this) {
removeMergeThread();
updateMergeThreads();
// In case we had stalled indexing, we can now wake up
// and possibly unstall:
ConcurrentMergeScheduler.this.notifyAll();
}
}
}
}

View File

@ -1990,7 +1990,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
if (doWait) {
synchronized(this) {
while(true) {
if (tragedy.get() != null) {
throw new IllegalStateException("this writer hit an unrecoverable error; cannot complete forceMerge", tragedy.get());
}
@ -2007,12 +2006,14 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
}
}
if (maxNumSegmentsMergesPending())
if (maxNumSegmentsMergesPending()) {
testPoint("forceMergeBeforeWait");
doWait();
else
} else {
break;
}
}
}
// If close is called while we are still
// running, throw an exception so the calling
@ -4377,6 +4378,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
* but without holding synchronized lock on IndexWriter
* instance */
private int mergeMiddle(MergePolicy.OneMerge merge, MergePolicy mergePolicy) throws IOException {
testPoint("mergeMiddleStart");
merge.checkAborted();
Directory mergeDirectory = config.getMergeScheduler().wrapForMerge(merge, directory);

View File

@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import com.carrotsearch.randomizedtesting.generators.RandomStrings;
import org.apache.lucene.analysis.MockAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
@ -32,6 +33,7 @@ import org.apache.lucene.index.IndexWriterConfig.OpenMode;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.MockDirectoryWrapper;
import org.apache.lucene.util.InfoStream;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.TestUtil;
@ -657,4 +659,88 @@ public class TestConcurrentMergeScheduler extends LuceneTestCase {
assertFalse(failed.get());
}
/*
* This test tries to produce 2 merges running concurrently with 2 segments per merge. While these
* merges run we kick off a forceMerge that puts a pending merge in the queue but waits for things to happen.
* While we do this we reduce maxMergeCount to 1. If concurrency in CMS is not right the forceMerge will wait forever
* since none of the currently running merges picks up the pending merge. This test fails every time.
*/
public void testChangeMaxMergeCountyWhileForceMerge() throws IOException, InterruptedException {
int numIters = TEST_NIGHTLY ? 100 : 10;
for (int iters = 0; iters < numIters; iters++) {
LogDocMergePolicy mp = new LogDocMergePolicy();
mp.setMergeFactor(2);
CountDownLatch forceMergeWaits = new CountDownLatch(1);
CountDownLatch mergeThreadsStartAfterWait = new CountDownLatch(1);
CountDownLatch mergeThreadsArrived = new CountDownLatch(2);
InfoStream stream = new InfoStream() {
@Override
public void message(String component, String message) {
if ("TP".equals(component) && "mergeMiddleStart".equals(message)) {
mergeThreadsArrived.countDown();
try {
mergeThreadsStartAfterWait.await();
} catch (InterruptedException e) {
throw new AssertionError(e);
}
} else if ("TP".equals(component) && "forceMergeBeforeWait".equals(message)) {
forceMergeWaits.countDown();
}
}
@Override
public boolean isEnabled(String component) {
return "TP".equals(component);
}
@Override
public void close() {
}
};
try (Directory dir = newDirectory();
IndexWriter writer = new IndexWriter(dir,
new IndexWriterConfig().setMergeScheduler(new ConcurrentMergeScheduler())
.setMergePolicy(mp).setInfoStream(stream)) {
@Override
protected boolean isEnableTestPoints() {
return true;
}
}) {
Thread t = new Thread(() -> {
try {
writer.forceMerge(1);
} catch (IOException e) {
throw new AssertionError(e);
}
});
ConcurrentMergeScheduler cms = (ConcurrentMergeScheduler) writer.getConfig().getMergeScheduler();
cms.setMaxMergesAndThreads(2, 2);
try {
for (int i = 0; i < 4; i++) {
Document document = new Document();
document.add(new TextField("foo", "the quick brown fox jumps over the lazy dog", Field.Store.YES));
document.add(new TextField("bar", RandomStrings.randomRealisticUnicodeOfLength(random(), 20), Field.Store.YES));
writer.addDocument(document);
writer.flush();
}
assertEquals(writer.cloneSegmentInfos().toString(), 4, writer.getSegmentCount());
mergeThreadsArrived.await();
t.start();
forceMergeWaits.await();
cms.setMaxMergesAndThreads(1, 1);
} finally {
mergeThreadsStartAfterWait.countDown();
}
while (t.isAlive()) {
t.join(10);
if (cms.mergeThreadCount() == 0 && writer.hasPendingMerges()) {
fail("writer has pending merges but no CMS threads are running");
}
}
assertEquals(1, writer.getSegmentCount());
}
}
}
}