mirror of https://github.com/apache/lucene.git
LUCENE-9339: Only call MergeScheduler when we actually found new merges (#1445)
IW#maybeMerge calls the MergeScheduler even if it didn't find any merges we should instead only do this if there is in-fact anything there to merge and safe the call into a sync'd method.
This commit is contained in:
parent
950a34ce13
commit
4a98918bfa
|
@ -126,6 +126,9 @@ API Changes
|
|||
The DocumentsWriterPerThreadPool is a packaged protected final class which made it impossible
|
||||
to customize. (Simon Willnauer)
|
||||
|
||||
* LUCENE-9339: MergeScheduler#merge doesn't accept a parameter if a new merge was found anymore.
|
||||
(Simon Willnauer)
|
||||
|
||||
New Features
|
||||
---------------------
|
||||
(No changes)
|
||||
|
|
|
@ -495,7 +495,7 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
|
|||
}
|
||||
|
||||
@Override
|
||||
public synchronized void merge(IndexWriter writer, MergeTrigger trigger, boolean newMergesFound) throws IOException {
|
||||
public synchronized void merge(IndexWriter writer, MergeTrigger trigger) throws IOException {
|
||||
|
||||
assert !Thread.holdsLock(writer);
|
||||
|
||||
|
@ -641,7 +641,7 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
|
|||
assert mergeThreads.contains(Thread.currentThread()) : "caller is not a merge thread";
|
||||
// Let CMS run new merges if necessary:
|
||||
try {
|
||||
merge(writer, MergeTrigger.MERGE_FINISHED, true);
|
||||
merge(writer, MergeTrigger.MERGE_FINISHED);
|
||||
} catch (AlreadyClosedException ace) {
|
||||
// OK
|
||||
} catch (IOException ioe) {
|
||||
|
|
|
@ -2070,7 +2070,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
|
|||
}
|
||||
}
|
||||
|
||||
mergeScheduler.merge(this, MergeTrigger.EXPLICIT, newMergesFound);
|
||||
mergeScheduler.merge(this, MergeTrigger.EXPLICIT);
|
||||
|
||||
if (spec != null && doWait) {
|
||||
final int numMerges = spec.merges.size();
|
||||
|
@ -2152,8 +2152,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
|
|||
|
||||
final void maybeMerge(MergePolicy mergePolicy, MergeTrigger trigger, int maxNumSegments) throws IOException {
|
||||
ensureOpen(false);
|
||||
boolean newMergesFound = updatePendingMerges(mergePolicy, trigger, maxNumSegments);
|
||||
mergeScheduler.merge(this, trigger, newMergesFound);
|
||||
if (updatePendingMerges(mergePolicy, trigger, maxNumSegments)) {
|
||||
mergeScheduler.merge(this, trigger);
|
||||
}
|
||||
}
|
||||
|
||||
private synchronized boolean updatePendingMerges(MergePolicy mergePolicy, MergeTrigger trigger, int maxNumSegments)
|
||||
|
@ -2534,7 +2535,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
|
|||
// Give merge scheduler last chance to run, in case
|
||||
// any pending merges are waiting. We can't hold IW's lock
|
||||
// when going into merge because it can lead to deadlock.
|
||||
mergeScheduler.merge(this, MergeTrigger.CLOSING, false);
|
||||
mergeScheduler.merge(this, MergeTrigger.CLOSING);
|
||||
|
||||
synchronized (this) {
|
||||
ensureOpen(false);
|
||||
|
@ -3473,7 +3474,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
|
|||
}
|
||||
|
||||
@SuppressWarnings("try")
|
||||
private final void finishCommit() throws IOException {
|
||||
private void finishCommit() throws IOException {
|
||||
|
||||
boolean commitCompleted = false;
|
||||
String committedSegmentsFileName = null;
|
||||
|
@ -4381,7 +4382,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
|
|||
testPoint("mergeMiddleStart");
|
||||
merge.checkAborted();
|
||||
|
||||
Directory mergeDirectory = config.getMergeScheduler().wrapForMerge(merge, directory);
|
||||
Directory mergeDirectory = mergeScheduler.wrapForMerge(merge, directory);
|
||||
List<SegmentCommitInfo> sourceSegments = merge.segments;
|
||||
|
||||
IOContext context = new IOContext(merge.getStoreMergeInfo());
|
||||
|
|
|
@ -40,10 +40,8 @@ public abstract class MergeScheduler implements Closeable {
|
|||
|
||||
/** Run the merges provided by {@link IndexWriter#getNextMerge()}.
|
||||
* @param writer the {@link IndexWriter} to obtain the merges from.
|
||||
* @param trigger the {@link MergeTrigger} that caused this merge to happen
|
||||
* @param newMergesFound <code>true</code> iff any new merges were found by the caller otherwise <code>false</code>
|
||||
* */
|
||||
public abstract void merge(IndexWriter writer, MergeTrigger trigger, boolean newMergesFound) throws IOException;
|
||||
* @param trigger the {@link MergeTrigger} that caused this merge to happen */
|
||||
public abstract void merge(IndexWriter writer, MergeTrigger trigger) throws IOException;
|
||||
|
||||
/**
|
||||
* Wraps the incoming {@link Directory} so that we can merge-throttle it
|
||||
|
|
|
@ -42,7 +42,7 @@ public final class NoMergeScheduler extends MergeScheduler {
|
|||
public void close() {}
|
||||
|
||||
@Override
|
||||
public void merge(IndexWriter writer, MergeTrigger trigger, boolean newMergesFound) {}
|
||||
public void merge(IndexWriter writer, MergeTrigger trigger) {}
|
||||
|
||||
@Override
|
||||
public Directory wrapForMerge(OneMerge merge, Directory in) {
|
||||
|
|
|
@ -31,7 +31,7 @@ public class SerialMergeScheduler extends MergeScheduler {
|
|||
* "synchronized" so that even if the application is using
|
||||
* multiple threads, only one merge may run at a time. */
|
||||
@Override
|
||||
synchronized public void merge(IndexWriter writer, MergeTrigger trigger, boolean newMergesFound) throws IOException {
|
||||
synchronized public void merge(IndexWriter writer, MergeTrigger trigger) throws IOException {
|
||||
while(true) {
|
||||
MergePolicy.OneMerge merge = writer.getNextMerge();
|
||||
if (merge == null) {
|
||||
|
|
|
@ -153,7 +153,7 @@ public class TestMergeSchedulerExternal extends LuceneTestCase {
|
|||
private static class ReportingMergeScheduler extends MergeScheduler {
|
||||
|
||||
@Override
|
||||
public void merge(IndexWriter writer, MergeTrigger trigger, boolean newMergesFound) throws IOException {
|
||||
public void merge(IndexWriter writer, MergeTrigger trigger) throws IOException {
|
||||
OneMerge merge = null;
|
||||
while ((merge = writer.getNextMerge()) != null) {
|
||||
if (VERBOSE) {
|
||||
|
|
|
@ -463,7 +463,8 @@ public class TestConcurrentMergeScheduler extends LuceneTestCase {
|
|||
public void testMaybeStallCalled() throws Exception {
|
||||
final AtomicBoolean wasCalled = new AtomicBoolean();
|
||||
Directory dir = newDirectory();
|
||||
IndexWriterConfig iwc = newIndexWriterConfig(new MockAnalyzer(random()));
|
||||
IndexWriterConfig iwc = newIndexWriterConfig(new MockAnalyzer(random()))
|
||||
.setMergePolicy(new LogByteSizeMergePolicy());
|
||||
iwc.setMergeScheduler(new ConcurrentMergeScheduler() {
|
||||
@Override
|
||||
protected boolean maybeStall(IndexWriter writer) {
|
||||
|
@ -473,9 +474,10 @@ public class TestConcurrentMergeScheduler extends LuceneTestCase {
|
|||
});
|
||||
IndexWriter w = new IndexWriter(dir, iwc);
|
||||
w.addDocument(new Document());
|
||||
w.flush();
|
||||
w.addDocument(new Document());
|
||||
w.forceMerge(1);
|
||||
assertTrue(wasCalled.get());
|
||||
|
||||
w.close();
|
||||
dir.close();
|
||||
}
|
||||
|
|
|
@ -310,7 +310,7 @@ public class TestIndexWriterMerging extends LuceneTestCase {
|
|||
// merging a segment with >= 20 (maxMergeDocs) docs
|
||||
private static class MyMergeScheduler extends MergeScheduler {
|
||||
@Override
|
||||
synchronized public void merge(IndexWriter writer, MergeTrigger trigger, boolean newMergesFound) throws IOException {
|
||||
synchronized public void merge(IndexWriter writer, MergeTrigger trigger) throws IOException {
|
||||
|
||||
while(true) {
|
||||
MergePolicy.OneMerge merge = writer.getNextMerge();
|
||||
|
|
|
@ -32,7 +32,7 @@ public class TestNoMergeScheduler extends LuceneTestCase {
|
|||
public void testNoMergeScheduler() throws Exception {
|
||||
MergeScheduler ms = NoMergeScheduler.INSTANCE;
|
||||
ms.close();
|
||||
ms.merge(null, RandomPicks.randomFrom(random(), MergeTrigger.values()), random().nextBoolean());
|
||||
ms.merge(null, RandomPicks.randomFrom(random(), MergeTrigger.values()));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -70,7 +70,7 @@ public abstract class BaseMergePolicyTestCase extends LuceneTestCase {
|
|||
final AtomicBoolean mayMerge = new AtomicBoolean(true);
|
||||
final MergeScheduler mergeScheduler = new SerialMergeScheduler() {
|
||||
@Override
|
||||
synchronized public void merge(IndexWriter writer, MergeTrigger trigger, boolean newMergesFound) throws IOException {
|
||||
synchronized public void merge(IndexWriter writer, MergeTrigger trigger) throws IOException {
|
||||
if (mayMerge.get() == false) {
|
||||
MergePolicy.OneMerge merge = writer.getNextMerge();
|
||||
if (merge != null) {
|
||||
|
@ -79,7 +79,7 @@ public abstract class BaseMergePolicyTestCase extends LuceneTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
super.merge(writer, trigger, newMergesFound);
|
||||
super.merge(writer, trigger);
|
||||
}
|
||||
};
|
||||
|
||||
|
|
Loading…
Reference in New Issue