diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt index 205576638af..a9301eb9f1c 100644 --- a/lucene/CHANGES.txt +++ b/lucene/CHANGES.txt @@ -185,6 +185,12 @@ Bug Fixes * LUCENE-9309: Wait for #addIndexes merges when aborting merges. (Simon Willnauer) +* LUCENE-9337: Ensure CMS updates it's thread accounting datastructures consistently. + CMS today releases it's lock after finishing a merge before it re-acquires it to update + the thread accounting datastructures. This causes threading issues where concurrently + finishing threads fail to pick up pending merges causing potential thread starvation on + forceMerge calls. (Simon Willnauer) + Other --------------------- diff --git a/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java b/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java index f20ae318560..0324cd3a191 100644 --- a/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java +++ b/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java @@ -18,6 +18,7 @@ package org.apache.lucene.index; import java.io.IOException; +import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.List; import java.util.Locale; @@ -633,6 +634,27 @@ public class ConcurrentMergeScheduler extends MergeScheduler { return thread; } + synchronized void runOnMergeFinished(IndexWriter writer) { + // the merge call as well as the merge thread handling in the finally + // block must be sync'd on CMS otherwise stalling decisions might cause + // us to miss pending merges + assert mergeThreads.contains(Thread.currentThread()) : "caller is not a merge thread"; + // Let CMS run new merges if necessary: + try { + merge(writer, MergeTrigger.MERGE_FINISHED, true); + } catch (AlreadyClosedException ace) { + // OK + } catch (IOException ioe) { + throw new UncheckedIOException(ioe); + } finally { + removeMergeThread(); + updateMergeThreads(); + // In case we had stalled indexing, we can now wake up + // and possibly unstall: + notifyAll(); + } + } + /** Runs a merge thread to execute a single merge, then exits. */ protected class MergeThread extends Thread implements Comparable { final IndexWriter writer; @@ -664,18 +686,8 @@ public class ConcurrentMergeScheduler extends MergeScheduler { if (verbose()) { message(" merge thread: done"); } - - // Let CMS run new merges if necessary: - try { - merge(writer, MergeTrigger.MERGE_FINISHED, true); - } catch (AlreadyClosedException ace) { - // OK - } catch (IOException ioe) { - throw new RuntimeException(ioe); - } - + runOnMergeFinished(writer); } catch (Throwable exc) { - if (exc instanceof MergePolicy.MergeAbortedException) { // OK to ignore } else if (suppressExceptions == false) { @@ -683,17 +695,6 @@ public class ConcurrentMergeScheduler extends MergeScheduler { // testing. handleMergeException(writer.getDirectory(), exc); } - - } finally { - synchronized(ConcurrentMergeScheduler.this) { - removeMergeThread(); - - updateMergeThreads(); - - // In case we had stalled indexing, we can now wake up - // and possibly unstall: - ConcurrentMergeScheduler.this.notifyAll(); - } } } } diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java index ad002b88d68..d7cedda6d4e 100644 --- a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java +++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java @@ -1990,7 +1990,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable, if (doWait) { synchronized(this) { while(true) { - if (tragedy.get() != null) { throw new IllegalStateException("this writer hit an unrecoverable error; cannot complete forceMerge", tragedy.get()); } @@ -2007,10 +2006,12 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable, } } - if (maxNumSegmentsMergesPending()) + if (maxNumSegmentsMergesPending()) { + testPoint("forceMergeBeforeWait"); doWait(); - else + } else { break; + } } } @@ -4377,6 +4378,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable, * but without holding synchronized lock on IndexWriter * instance */ private int mergeMiddle(MergePolicy.OneMerge merge, MergePolicy mergePolicy) throws IOException { + testPoint("mergeMiddleStart"); merge.checkAborted(); Directory mergeDirectory = config.getMergeScheduler().wrapForMerge(merge, directory); diff --git a/lucene/core/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java b/lucene/core/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java index f820fa9b968..f3a91ef9ad0 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java @@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import com.carrotsearch.randomizedtesting.generators.RandomStrings; import org.apache.lucene.analysis.MockAnalyzer; import org.apache.lucene.document.Document; import org.apache.lucene.document.Field; @@ -32,6 +33,7 @@ import org.apache.lucene.index.IndexWriterConfig.OpenMode; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.Directory; import org.apache.lucene.store.MockDirectoryWrapper; +import org.apache.lucene.util.InfoStream; import org.apache.lucene.util.LuceneTestCase; import org.apache.lucene.util.TestUtil; @@ -657,4 +659,88 @@ public class TestConcurrentMergeScheduler extends LuceneTestCase { assertFalse(failed.get()); } + + /* + * This test tries to produce 2 merges running concurrently with 2 segments per merge. While these + * merges run we kick off a forceMerge that puts a pending merge in the queue but waits for things to happen. + * While we do this we reduce maxMergeCount to 1. If concurrency in CMS is not right the forceMerge will wait forever + * since none of the currently running merges picks up the pending merge. This test fails every time. + */ + public void testChangeMaxMergeCountyWhileForceMerge() throws IOException, InterruptedException { + int numIters = TEST_NIGHTLY ? 100 : 10; + for (int iters = 0; iters < numIters; iters++) { + LogDocMergePolicy mp = new LogDocMergePolicy(); + mp.setMergeFactor(2); + CountDownLatch forceMergeWaits = new CountDownLatch(1); + CountDownLatch mergeThreadsStartAfterWait = new CountDownLatch(1); + CountDownLatch mergeThreadsArrived = new CountDownLatch(2); + InfoStream stream = new InfoStream() { + @Override + public void message(String component, String message) { + if ("TP".equals(component) && "mergeMiddleStart".equals(message)) { + mergeThreadsArrived.countDown(); + try { + mergeThreadsStartAfterWait.await(); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + } else if ("TP".equals(component) && "forceMergeBeforeWait".equals(message)) { + forceMergeWaits.countDown(); + } + } + + @Override + public boolean isEnabled(String component) { + return "TP".equals(component); + } + + @Override + public void close() { + } + }; + try (Directory dir = newDirectory(); + IndexWriter writer = new IndexWriter(dir, + new IndexWriterConfig().setMergeScheduler(new ConcurrentMergeScheduler()) + .setMergePolicy(mp).setInfoStream(stream)) { + @Override + protected boolean isEnableTestPoints() { + return true; + } + }) { + Thread t = new Thread(() -> { + try { + writer.forceMerge(1); + } catch (IOException e) { + throw new AssertionError(e); + } + }); + ConcurrentMergeScheduler cms = (ConcurrentMergeScheduler) writer.getConfig().getMergeScheduler(); + cms.setMaxMergesAndThreads(2, 2); + try { + for (int i = 0; i < 4; i++) { + Document document = new Document(); + document.add(new TextField("foo", "the quick brown fox jumps over the lazy dog", Field.Store.YES)); + document.add(new TextField("bar", RandomStrings.randomRealisticUnicodeOfLength(random(), 20), Field.Store.YES)); + writer.addDocument(document); + writer.flush(); + } + assertEquals(writer.cloneSegmentInfos().toString(), 4, writer.getSegmentCount()); + mergeThreadsArrived.await(); + t.start(); + forceMergeWaits.await(); + cms.setMaxMergesAndThreads(1, 1); + } finally { + mergeThreadsStartAfterWait.countDown(); + } + + while (t.isAlive()) { + t.join(10); + if (cms.mergeThreadCount() == 0 && writer.hasPendingMerges()) { + fail("writer has pending merges but no CMS threads are running"); + } + } + assertEquals(1, writer.getSegmentCount()); + } + } + } }