From 8294e1ae2068aa39e91c25cbaabf62afae40a02e Mon Sep 17 00:00:00 2001
From: Simon Willnauer
Date: Mon, 24 Aug 2020 20:19:08 +0200
Subject: [PATCH] LUCENE-8962: Merge segments on getReader (#1623)
Add IndexWriter merge-on-refresh feature to selectively merge
small segments on getReader, subject to a configurable timeout,
to improve search performance by reducing the number of small
segments for searching.
Co-authored-by: Mike McCandless
---
lucene/CHANGES.txt | 4 +
.../org/apache/lucene/index/IndexWriter.java | 261 ++++++++++++++----
.../lucene/index/IndexWriterConfig.java | 15 +-
.../lucene/index/LiveIndexWriterConfig.java | 10 +-
.../org/apache/lucene/index/MergePolicy.java | 17 +-
.../org/apache/lucene/index/MergeTrigger.java | 4 +
.../org/apache/lucene/index/ReaderPool.java | 2 +-
.../lucene/index/StandardDirectoryReader.java | 24 +-
.../apache/lucene/index/TestIndexWriter.java | 2 +-
.../index/TestIndexWriterMergePolicy.java | 232 +++++++++++++---
.../lucene/index/TestIndexWriterReader.java | 33 ++-
.../TestSoftDeletesRetentionMergePolicy.java | 2 +-
.../apache/lucene/util/LuceneTestCase.java | 2 +-
.../apache/solr/update/SolrIndexConfig.java | 4 +-
.../solr/update/SolrIndexConfigTest.java | 4 +-
15 files changed, 478 insertions(+), 138 deletions(-)
diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt
index c2a960b9052..7280810d2a2 100644
--- a/lucene/CHANGES.txt
+++ b/lucene/CHANGES.txt
@@ -172,6 +172,10 @@ New Features
* LUCENE-9386: RegExpQuery added case insensitive matching option. (Mark Harwood)
+* LUCENE-8962: Add IndexWriter merge-on-refresh feature to selectively merge
+ small segments on getReader, subject to a configurable timeout, to improve
+ search performance by reducing the number of small segments for searching. (Simon Willnauer)
+
Improvements
---------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
index 6f189012501..424eb8747ce 100644
--- a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
+++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
@@ -39,6 +39,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.BooleanSupplier;
import java.util.function.IntPredicate;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
@@ -545,9 +546,10 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
// obtained during this flush are pooled, the first time
// this method is called:
readerPool.enableReaderPooling();
- DirectoryReader r = null;
+ StandardDirectoryReader r = null;
doBeforeFlush();
- boolean anyChanges = false;
+ boolean anyChanges;
+ final long maxFullFlushMergeWaitMillis = config.getMaxFullFlushMergeWaitMillis();
/*
* for releasing a NRT reader we must ensure that
* DW doesn't add any segments or deletes until we are
@@ -555,8 +557,46 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
* We release the two stage full flush after we are done opening the
* directory reader!
*/
+ MergePolicy.MergeSpecification onGetReaderMerges = null;
+ final AtomicBoolean stopCollectingMergedReaders = new AtomicBoolean(false);
+ final Map mergedReaders = new HashMap<>();
+ final Map openedReadOnlyClones = new HashMap<>();
+ // this function is used to control which SR are opened in order to keep track of them
+ // and to reuse them in the case we wait for merges in this getReader call.
+ IOUtils.IOFunction readerFactory = sci -> {
+ final ReadersAndUpdates rld = getPooledInstance(sci, true);
+ try {
+ assert Thread.holdsLock(IndexWriter.this);
+ SegmentReader segmentReader = rld.getReadOnlyClone(IOContext.READ);
+ if (maxFullFlushMergeWaitMillis > 0) { // only track this if we actually do fullFlush merges
+ openedReadOnlyClones.put(sci.info.name, segmentReader);
+ }
+ return segmentReader;
+ } finally {
+ release(rld);
+ }
+ };
+ Closeable onGetReaderMergeResources = null;
+ SegmentInfos openingSegmentInfos = null;
boolean success2 = false;
try {
+ /* this is the essential part of the getReader method. We need to take care of the following things:
+ * - flush all currently in-memory DWPTs to disk
+ * - apply all deletes & updates to new and to the existing DWPTs
+ * - prevent flushes and applying deletes of concurrently indexing DWPTs to be applied
+ * - open a SDR on the updated SIS
+ *
+ * in order to prevent concurrent flushes we call DocumentsWriter#flushAllThreads that swaps out the deleteQueue
+ * (this enforces a happens before relationship between this and the subsequent full flush) and informs the
+ * FlushControl (#markForFullFlush()) that it should prevent any new DWPTs from flushing until we are \
+ * done (DocumentsWriter#finishFullFlush(boolean)). All this is guarded by the fullFlushLock to prevent multiple
+ * full flushes from happening concurrently. Once the DocWriter has initiated a full flush we can sequentially flush
+ * and apply deletes & updates to the written segments without worrying about concurrently indexing DWPTs. The important
+ * aspect is that it all happens between DocumentsWriter#flushAllThread() and DocumentsWriter#finishFullFlush(boolean)
+ * since once the flush is marked as done deletes start to be applied to the segments on disk without guarantees that
+ * the corresponding added documents (in the update case) are flushed and visible when opening a SDR.
+ *
+ */
boolean success = false;
synchronized (fullFlushLock) {
try {
@@ -573,7 +613,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
if (applyAllDeletes) {
applyAllDeletesAndUpdates();
}
-
synchronized(this) {
// NOTE: we cannot carry doc values updates in memory yet, so we always must write them through to disk and re-open each
@@ -581,16 +620,50 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
// TODO: we could instead just clone SIS and pull/incref readers in sync'd block, and then do this w/o IW's lock?
// Must do this sync'd on IW to prevent a merge from completing at the last second and failing to write its DV updates:
- writeReaderPool(writeAllDeletes);
+ writeReaderPool(writeAllDeletes);
// Prevent segmentInfos from changing while opening the
// reader; in theory we could instead do similar retry logic,
// just like we do when loading segments_N
-
- r = StandardDirectoryReader.open(this, segmentInfos, applyAllDeletes, writeAllDeletes);
+ r = StandardDirectoryReader.open(this, readerFactory, segmentInfos, applyAllDeletes, writeAllDeletes);
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "return reader version=" + r.getVersion() + " reader=" + r);
}
+ if (maxFullFlushMergeWaitMillis > 0) {
+ // we take the SIS from the reader which has already pruned away fully deleted readers
+ // this makes pulling the readers below after the merge simpler since we can be safe that
+ // they are not closed. Every segment has a corresponding SR in the SDR we opened if we use
+ // this SIS
+ // we need to do this rather complicated management of SRs and infos since we can't wait for merges
+ // while we hold the fullFlushLock since the merge might hit a tragic event and that must not be reported
+ // while holding that lock. Merging outside of the lock ie. after calling docWriter.finishFullFlush(boolean) would
+ // yield wrong results because deletes might sneak in during the merge
+ openingSegmentInfos = r.getSegmentInfos().clone();
+ onGetReaderMerges = preparePointInTimeMerge(openingSegmentInfos, stopCollectingMergedReaders::get, MergeTrigger.GET_READER,
+ sci -> {
+ assert stopCollectingMergedReaders.get() == false : "illegal state merge reader must be not pulled since we already stopped waiting for merges";
+ SegmentReader apply = readerFactory.apply(sci);
+ mergedReaders.put(sci.info.name, apply);
+ // we need to incRef the files of the opened SR otherwise it's possible that another merge
+ // removes the segment before we pass it on to the SDR
+ deleter.incRef(sci.files());
+ });
+ onGetReaderMergeResources = () -> {
+ // this needs to be closed once after we are done. In the case of an exception it releases
+ // all resources, closes the merged readers and decrements the files references.
+ // this only happens for readers that haven't been removed from the mergedReaders and release elsewhere
+ synchronized (this) {
+ stopCollectingMergedReaders.set(true);
+ IOUtils.close(mergedReaders.values().stream().map(sr -> (Closeable) () -> {
+ try {
+ deleter.decRef(sr.getSegmentInfo().files());
+ } finally {
+ sr.close();
+ }
+ }).collect(Collectors.toList()));
+ }
+ };
+ }
}
success = true;
} finally {
@@ -607,6 +680,19 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
}
}
}
+ if (onGetReaderMerges != null) { // only relevant if we do merge on getReader
+ StandardDirectoryReader mergedReader = finishGetReaderMerge(stopCollectingMergedReaders, mergedReaders,
+ openedReadOnlyClones, openingSegmentInfos, applyAllDeletes,
+ writeAllDeletes, onGetReaderMerges, maxFullFlushMergeWaitMillis);
+ if (mergedReader != null) {
+ try {
+ r.close();
+ } finally {
+ r = mergedReader;
+ }
+ }
+ }
+
anyChanges |= maybeMerge.getAndSet(false);
if (anyChanges) {
maybeMerge(config.getMergePolicy(), MergeTrigger.FULL_FLUSH, UNBOUNDED_MAX_MERGE_SEGMENTS);
@@ -621,15 +707,66 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
} finally {
if (!success2) {
try {
- IOUtils.closeWhileHandlingException(r);
+ IOUtils.closeWhileHandlingException(r, onGetReaderMergeResources);
} finally {
maybeCloseOnTragicEvent();
}
+ } else {
+ IOUtils.close(onGetReaderMergeResources);
}
}
return r;
}
+ private StandardDirectoryReader finishGetReaderMerge(AtomicBoolean stopCollectingMergedReaders, Map mergedReaders,
+ Map openedReadOnlyClones, SegmentInfos openingSegmentInfos,
+ boolean applyAllDeletes, boolean writeAllDeletes,
+ MergePolicy.MergeSpecification pointInTimeMerges, long maxCommitMergeWaitMillis) throws IOException {
+ assert openingSegmentInfos != null;
+ mergeScheduler.merge(mergeSource, MergeTrigger.GET_READER);
+ pointInTimeMerges.await(maxCommitMergeWaitMillis, TimeUnit.MILLISECONDS);
+ synchronized (this) {
+ stopCollectingMergedReaders.set(true);
+ StandardDirectoryReader reader = maybeReopenMergedNRTReader(mergedReaders, openedReadOnlyClones, openingSegmentInfos,
+ applyAllDeletes, writeAllDeletes);
+ IOUtils.close(mergedReaders.values());
+ mergedReaders.clear();
+ return reader;
+ }
+ }
+
+ private StandardDirectoryReader maybeReopenMergedNRTReader(Map mergedReaders,
+ Map openedReadOnlyClones, SegmentInfos openingSegmentInfos,
+ boolean applyAllDeletes, boolean writeAllDeletes) throws IOException {
+ assert Thread.holdsLock(this);
+ if (mergedReaders.isEmpty() == false) {
+ Collection files = new ArrayList<>();
+ try {
+ return StandardDirectoryReader.open(this,
+ sci -> {
+ // as soon as we remove the reader and return it the StandardDirectoryReader#open
+ // will take care of closing it. We only need to handle the readers that remain in the
+ // mergedReaders map and close them.
+ SegmentReader remove = mergedReaders.remove(sci.info.name);
+ if (remove == null) {
+ remove = openedReadOnlyClones.remove(sci.info.name);
+ assert remove != null;
+ // each of the readers we reuse from the previous reader needs to be incRef'd
+ // since we reuse them but don't have an implicit incRef in the SDR:open call
+ remove.incRef();
+ } else {
+ files.addAll(remove.getSegmentInfo().files());
+ }
+ return remove;
+ }, openingSegmentInfos, applyAllDeletes, writeAllDeletes);
+ } finally {
+ // now the SDR#open call has incRef'd the files so we can let them go
+ deleter.decRef(files);
+ }
+ }
+ return null;
+ }
+
@Override
public final long ramBytesUsed() {
ensureOpen();
@@ -2131,10 +2268,14 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
private final void maybeMerge(MergePolicy mergePolicy, MergeTrigger trigger, int maxNumSegments) throws IOException {
ensureOpen(false);
if (updatePendingMerges(mergePolicy, trigger, maxNumSegments) != null) {
- mergeScheduler.merge(mergeSource, trigger);
+ executeMerge(trigger);
}
}
+ final void executeMerge(MergeTrigger trigger) throws IOException {
+ mergeScheduler.merge(mergeSource, trigger);
+ }
+
private synchronized MergePolicy.MergeSpecification updatePendingMerges(MergePolicy mergePolicy, MergeTrigger trigger, int maxNumSegments)
throws IOException {
@@ -2168,6 +2309,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
}
} else {
switch (trigger) {
+ case GET_READER:
case COMMIT:
spec = mergePolicy.findFullFlushMerges(trigger, segmentInfos, this);
break;
@@ -3189,9 +3331,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
SegmentInfos toCommit = null;
boolean anyChanges = false;
long seqNo;
- MergePolicy.MergeSpecification onCommitMerges = null;
- AtomicBoolean includeInCommit = new AtomicBoolean(true);
- final long maxCommitMergeWaitMillis = config.getMaxCommitMergeWaitMillis();
+ MergePolicy.MergeSpecification pointInTimeMerges = null;
+ AtomicBoolean stopAddingMergedSegments = new AtomicBoolean(false);
+ final long maxCommitMergeWaitMillis = config.getMaxFullFlushMergeWaitMillis();
// This is copied from doFlush, except it's modified to
// clone & incRef the flushed SegmentInfos inside the
// sync block:
@@ -3252,9 +3394,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
// removed the files we are now syncing.
deleter.incRef(toCommit.files(false));
if (anyChanges && maxCommitMergeWaitMillis > 0) {
- // we can safely call prepareOnCommitMerge since writeReaderPool(true) above wrote all
+ // we can safely call preparePointInTimeMerge since writeReaderPool(true) above wrote all
// necessary files to disk and checkpointed them.
- onCommitMerges = prepareOnCommitMerge(toCommit, includeInCommit);
+ pointInTimeMerges = preparePointInTimeMerge(toCommit, stopAddingMergedSegments::get, MergeTrigger.COMMIT, sci->{});
}
}
success = true;
@@ -3277,21 +3419,21 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
maybeCloseOnTragicEvent();
}
- if (onCommitMerges != null) {
+ if (pointInTimeMerges != null) {
if (infoStream.isEnabled("IW")) {
- infoStream.message("IW", "now run merges during commit: " + onCommitMerges.segString(directory));
+ infoStream.message("IW", "now run merges during commit: " + pointInTimeMerges.segString(directory));
}
mergeScheduler.merge(mergeSource, MergeTrigger.COMMIT);
- onCommitMerges.await(maxCommitMergeWaitMillis, TimeUnit.MILLISECONDS);
+ pointInTimeMerges.await(maxCommitMergeWaitMillis, TimeUnit.MILLISECONDS);
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "done waiting for merges during commit");
}
synchronized (this) {
// we need to call this under lock since mergeFinished above is also called under the IW lock
- includeInCommit.set(false);
+ stopAddingMergedSegments.set(true);
}
}
- // do this after handling any onCommitMerges since the files will have changed if any merges
+ // do this after handling any pointInTimeMerges since the files will have changed if any merges
// did complete
filesToCommit = toCommit.files(false);
try {
@@ -3322,21 +3464,24 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
}
/**
- * This optimization allows a commit to wait for merges on smallish segments to
- * reduce the eventual number of tiny segments in the commit point. We wrap a {@code OneMerge} to
- * update the {@code committingSegmentInfos} once the merge has finished. We replace the source segments
- * in the SIS that we are going to commit with the freshly merged segment, but ignore all deletions and updates
- * that are made to documents in the merged segment while it was merging. The updates that are made do not belong to
- * the point-in-time commit point and should therefore not be included. See the clone call in {@code onMergeComplete}
+ * This optimization allows a commit/getReader to wait for merges on smallish segments to
+ * reduce the eventual number of tiny segments in the commit point / NRT Reader. We wrap a {@code OneMerge} to
+ * update the {@code mergingSegmentInfos} once the merge has finished. We replace the source segments
+ * in the SIS that we are going to commit / open the reader on with the freshly merged segment, but ignore all deletions and updates
+ * that are made to documents in the merged segment while it was merging. The updates that are made do not belong to
+ * the point-in-time commit point / NRT READER and should therefore not be included. See the clone call in {@code onMergeComplete}
* below. We also ensure that we pull the merge readers while holding {@code IndexWriter}'s lock. Otherwise
* we could see concurrent deletions/updates applied that do not belong to the segment.
*/
- private MergePolicy.MergeSpecification prepareOnCommitMerge(SegmentInfos committingSegmentInfos, AtomicBoolean includeInCommit) throws IOException {
+ private MergePolicy.MergeSpecification preparePointInTimeMerge(SegmentInfos mergingSegmentInfos, BooleanSupplier stopCollectingMergeResults,
+ MergeTrigger trigger,
+ IOUtils.IOConsumer mergeFinished) throws IOException {
assert Thread.holdsLock(this);
- MergePolicy.MergeSpecification onCommitMerges = updatePendingMerges(new OneMergeWrappingMergePolicy(config.getMergePolicy(), toWrap ->
+ assert trigger == MergeTrigger.GET_READER || trigger == MergeTrigger.COMMIT : "illegal trigger: " + trigger;
+ MergePolicy.MergeSpecification pointInTimeMerges = updatePendingMerges(new OneMergeWrappingMergePolicy(config.getMergePolicy(), toWrap ->
new MergePolicy.OneMerge(toWrap.segments) {
SegmentCommitInfo origInfo;
- AtomicBoolean onlyOnce = new AtomicBoolean(false);
+ final AtomicBoolean onlyOnce = new AtomicBoolean(false);
@Override
public void mergeFinished(boolean committed, boolean segmentDropped) throws IOException {
@@ -3344,50 +3489,65 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
// includedInCommit will be set (above, by our caller) to false if the allowed max wall clock
// time (IWC.getMaxCommitMergeWaitMillis()) has elapsed, which means we did not make the timeout
- // and will not commit our merge to the to-be-commited SegmentInfos
-
+ // and will not commit our merge to the to-be-committed SegmentInfos
if (segmentDropped == false
&& committed
- && includeInCommit.get()) {
+ && stopCollectingMergeResults.getAsBoolean() == false) {
+
+ // make sure onMergeComplete really was called:
+ assert origInfo != null;
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "now apply merge during commit: " + toWrap.segString());
}
- // make sure onMergeComplete really was called:
- assert origInfo != null;
-
- deleter.incRef(origInfo.files());
+ if (trigger == MergeTrigger.COMMIT) {
+ // if we do this in a getReader call here this is obsolete since we already hold a reader that has
+ // incRef'd these files
+ deleter.incRef(origInfo.files());
+ }
Set mergedSegmentNames = new HashSet<>();
for (SegmentCommitInfo sci : segments) {
mergedSegmentNames.add(sci.info.name);
}
List toCommitMergedAwaySegments = new ArrayList<>();
- for (SegmentCommitInfo sci : committingSegmentInfos) {
+ for (SegmentCommitInfo sci : mergingSegmentInfos) {
if (mergedSegmentNames.contains(sci.info.name)) {
toCommitMergedAwaySegments.add(sci);
- deleter.decRef(sci.files());
+ if (trigger == MergeTrigger.COMMIT) {
+ // if we do this in a getReader call here this is obsolete since we already hold a reader that has
+ // incRef'd these files and will decRef them when it's closed
+ deleter.decRef(sci.files());
+ }
}
}
// Construct a OneMerge that applies to toCommit
MergePolicy.OneMerge applicableMerge = new MergePolicy.OneMerge(toCommitMergedAwaySegments);
applicableMerge.info = origInfo;
long segmentCounter = Long.parseLong(origInfo.info.name.substring(1), Character.MAX_RADIX);
- committingSegmentInfos.counter = Math.max(committingSegmentInfos.counter, segmentCounter + 1);
- committingSegmentInfos.applyMergeChanges(applicableMerge, false);
+ mergingSegmentInfos.counter = Math.max(mergingSegmentInfos.counter, segmentCounter + 1);
+ mergingSegmentInfos.applyMergeChanges(applicableMerge, false);
} else {
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "skip apply merge during commit: " + toWrap.segString());
}
}
- toWrap.mergeFinished(committed, false);
+ toWrap.mergeFinished(committed, segmentDropped);
super.mergeFinished(committed, segmentDropped);
}
@Override
- void onMergeComplete() {
- // clone the target info to make sure we have the original info without the updated del and update gens
- origInfo = info.clone();
+ void onMergeComplete() throws IOException {
+ assert Thread.holdsLock(IndexWriter.this);
+ if (stopCollectingMergeResults.getAsBoolean() == false
+ && isAborted() == false
+ && info.info.maxDoc() > 0/* never do this if the segment if dropped / empty */) {
+ mergeFinished.accept(info);
+ // clone the target info to make sure we have the original info without the updated del and update gens
+ origInfo = info.clone();
+ }
+ toWrap.onMergeComplete();
+ super.onMergeComplete();
}
@Override
@@ -3404,11 +3564,11 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
return toWrap.wrapForMerge(reader); // must delegate
}
}
- ), MergeTrigger.COMMIT, UNBOUNDED_MAX_MERGE_SEGMENTS);
- if (onCommitMerges != null) {
+ ), trigger, UNBOUNDED_MAX_MERGE_SEGMENTS);
+ if (pointInTimeMerges != null) {
boolean closeReaders = true;
try {
- for (MergePolicy.OneMerge merge : onCommitMerges.merges) {
+ for (MergePolicy.OneMerge merge : pointInTimeMerges.merges) {
IOContext context = new IOContext(merge.getStoreMergeInfo());
merge.initMergeReaders(
sci -> {
@@ -3422,17 +3582,20 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
closeReaders = false;
} finally {
if (closeReaders) {
- IOUtils.applyToAll(onCommitMerges.merges, merge -> {
+ IOUtils.applyToAll(pointInTimeMerges.merges, merge -> {
// that merge is broken we need to clean up after it - it's fine we still have the IW lock to do this
boolean removed = pendingMerges.remove(merge);
assert removed: "merge should be pending but isn't: " + merge.segString();
- abortOneMerge(merge);
- mergeFinish(merge);
+ try {
+ abortOneMerge(merge);
+ } finally {
+ mergeFinish(merge);
+ }
});
}
}
}
- return onCommitMerges;
+ return pointInTimeMerges;
}
/**
diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java
index 6dcdf839b19..25f78c7fd57 100644
--- a/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java
+++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java
@@ -110,8 +110,8 @@ public final class IndexWriterConfig extends LiveIndexWriterConfig {
/** Default value for whether calls to {@link IndexWriter#close()} include a commit. */
public final static boolean DEFAULT_COMMIT_ON_CLOSE = true;
- /** Default value for time to wait for merges on commit (when using a {@link MergePolicy} that implements {@link MergePolicy#findFullFlushMerges}). */
- public static final long DEFAULT_MAX_COMMIT_MERGE_WAIT_MILLIS = 0;
+ /** Default value for time to wait for merges on commit or getReader (when using a {@link MergePolicy} that implements {@link MergePolicy#findFullFlushMerges}). */
+ public static final long DEFAULT_MAX_FULL_FLUSH_MERGE_WAIT_MILLIS = 0;
// indicates whether this config instance is already attached to a writer.
// not final so that it can be cloned properly.
@@ -463,17 +463,18 @@ public final class IndexWriterConfig extends LiveIndexWriterConfig {
}
/**
- * Expert: sets the amount of time to wait for merges (during {@link IndexWriter#commit}) returned by
+ * Expert: sets the amount of time to wait for merges (during {@link IndexWriter#commit}
+ * or {@link IndexWriter#getReader(boolean, boolean)}) returned by
* MergePolicy.findFullFlushMerges(...).
* If this time is reached, we proceed with the commit based on segments merged up to that point.
- * The merges are not cancelled, and will still run to completion independent of the commit,
- * like natural segment merges. The default is {@value IndexWriterConfig#DEFAULT_MAX_COMMIT_MERGE_WAIT_MILLIS}
.
+ * The merges are not aborted, and will still run to completion independent of the commit or getReader call,
+ * like natural segment merges. The default is {@value IndexWriterConfig#DEFAULT_MAX_FULL_FLUSH_MERGE_WAIT_MILLIS}
.
*
* Note: This settings has no effect unless {@link MergePolicy#findFullFlushMerges(MergeTrigger, SegmentInfos, MergePolicy.MergeContext)}
* has an implementation that actually returns merges which by default doesn't return any merges.
*/
- public IndexWriterConfig setMaxCommitMergeWaitMillis(long maxCommitMergeWaitMillis) {
- this.maxCommitMergeWaitMillis = maxCommitMergeWaitMillis;
+ public IndexWriterConfig setMaxFullFlushMergeWaitMillis(long maxFullFlushMergeWaitMillis) {
+ this.maxFullFlushMergeWaitMillis = maxFullFlushMergeWaitMillis;
return this;
}
diff --git a/lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java b/lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java
index 1450331829c..f979984f3bb 100644
--- a/lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java
+++ b/lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java
@@ -110,7 +110,7 @@ public class LiveIndexWriterConfig {
protected String softDeletesField = null;
/** Amount of time to wait for merges returned by MergePolicy.findFullFlushMerges(...) */
- protected volatile long maxCommitMergeWaitMillis;
+ protected volatile long maxFullFlushMergeWaitMillis;
// used by IndexWriterConfig
LiveIndexWriterConfig(Analyzer analyzer) {
@@ -134,7 +134,7 @@ public class LiveIndexWriterConfig {
flushPolicy = new FlushByRamOrCountsPolicy();
readerPooling = IndexWriterConfig.DEFAULT_READER_POOLING;
perThreadHardLimitMB = IndexWriterConfig.DEFAULT_RAM_PER_THREAD_HARD_LIMIT_MB;
- maxCommitMergeWaitMillis = IndexWriterConfig.DEFAULT_MAX_COMMIT_MERGE_WAIT_MILLIS;
+ maxFullFlushMergeWaitMillis = IndexWriterConfig.DEFAULT_MAX_FULL_FLUSH_MERGE_WAIT_MILLIS;
}
/** Returns the default analyzer to use for indexing documents. */
@@ -469,8 +469,8 @@ public class LiveIndexWriterConfig {
* If this time is reached, we proceed with the commit based on segments merged up to that point.
* The merges are not cancelled, and may still run to completion independent of the commit.
*/
- public long getMaxCommitMergeWaitMillis() {
- return maxCommitMergeWaitMillis;
+ public long getMaxFullFlushMergeWaitMillis() {
+ return maxFullFlushMergeWaitMillis;
}
@Override
@@ -496,7 +496,7 @@ public class LiveIndexWriterConfig {
sb.append("indexSort=").append(getIndexSort()).append("\n");
sb.append("checkPendingFlushOnUpdate=").append(isCheckPendingFlushOnUpdate()).append("\n");
sb.append("softDeletesField=").append(getSoftDeletesField()).append("\n");
- sb.append("maxCommitMergeWaitMillis=").append(getMaxCommitMergeWaitMillis()).append("\n");
+ sb.append("maxFullFlushMergeWaitMillis=").append(getMaxFullFlushMergeWaitMillis()).append("\n");
return sb.toString();
}
}
diff --git a/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java b/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java
index 796d3b8b480..641facf5619 100644
--- a/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java
+++ b/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java
@@ -421,7 +421,7 @@ public abstract class MergePolicy {
/**
* Called just before the merge is applied to IndexWriter's SegmentInfos
*/
- void onMergeComplete() {
+ void onMergeComplete() throws IOException {
}
/**
@@ -623,19 +623,20 @@ public abstract class MergePolicy {
/**
* Identifies merges that we want to execute (synchronously) on commit. By default, this will do no merging on commit.
* If you implement this method in your {@code MergePolicy} you must also set a non-zero timeout using
- * {@link IndexWriterConfig#setMaxCommitMergeWaitMillis}.
+ * {@link IndexWriterConfig#setMaxFullFlushMergeWaitMillis}.
*
- * Any merges returned here will make {@link IndexWriter#commit()} or {@link IndexWriter#prepareCommit()} block until
- * the merges complete or until {@link IndexWriterConfig#getMaxCommitMergeWaitMillis()} has elapsed. This may be
- * used to merge small segments that have just been flushed as part of the commit, reducing the number of segments in
- * the commit. If a merge does not complete in the allotted time, it will continue to execute, and eventually finish and
- * apply to future commits, but will not be reflected in the current commit.
+ * Any merges returned here will make {@link IndexWriter#commit()}, {@link IndexWriter#prepareCommit()}
+ * or {@link IndexWriter#getReader(boolean, boolean)} block until
+ * the merges complete or until {@link IndexWriterConfig#getMaxFullFlushMergeWaitMillis()} has elapsed. This may be
+ * used to merge small segments that have just been flushed, reducing the number of segments in
+ * the point in time snapshot. If a merge does not complete in the allotted time, it will continue to execute, and eventually finish and
+ * apply to future point in time snapshot, but will not be reflected in the current one.
*
* If a {@link OneMerge} in the returned {@link MergeSpecification} includes a segment already included in a registered
* merge, then {@link IndexWriter#commit()} or {@link IndexWriter#prepareCommit()} will throw a {@link IllegalStateException}.
* Use {@link MergeContext#getMergingSegments()} to determine which segments are currently registered to merge.
*
- * @param mergeTrigger the event that triggered the merge (COMMIT or FULL_FLUSH).
+ * @param mergeTrigger the event that triggered the merge (COMMIT or GET_READER).
* @param segmentInfos the total set of segments in the index (while preparing the commit)
* @param mergeContext the MergeContext to find the merges on, which should be used to determine which segments are
* already in a registered merge (see {@link MergeContext#getMergingSegments()}).
diff --git a/lucene/core/src/java/org/apache/lucene/index/MergeTrigger.java b/lucene/core/src/java/org/apache/lucene/index/MergeTrigger.java
index 01a6b15a035..f11493f0250 100644
--- a/lucene/core/src/java/org/apache/lucene/index/MergeTrigger.java
+++ b/lucene/core/src/java/org/apache/lucene/index/MergeTrigger.java
@@ -53,4 +53,8 @@ public enum MergeTrigger {
* Merge was triggered on commit.
*/
COMMIT,
+ /**
+ * Merge was triggered on opening NRT readers.
+ */
+ GET_READER,
}
diff --git a/lucene/core/src/java/org/apache/lucene/index/ReaderPool.java b/lucene/core/src/java/org/apache/lucene/index/ReaderPool.java
index b792be26873..8a269c89931 100644
--- a/lucene/core/src/java/org/apache/lucene/index/ReaderPool.java
+++ b/lucene/core/src/java/org/apache/lucene/index/ReaderPool.java
@@ -404,7 +404,7 @@ final class ReaderPool implements Closeable {
private boolean noDups() {
Set seen = new HashSet<>();
for(SegmentCommitInfo info : readerMap.keySet()) {
- assert !seen.contains(info.info.name);
+ assert !seen.contains(info.info.name) : "seen twice: " + info.info.name ;
seen.add(info.info.name);
}
return true;
diff --git a/lucene/core/src/java/org/apache/lucene/index/StandardDirectoryReader.java b/lucene/core/src/java/org/apache/lucene/index/StandardDirectoryReader.java
index 8904eef8d8a..1003acfddf4 100644
--- a/lucene/core/src/java/org/apache/lucene/index/StandardDirectoryReader.java
+++ b/lucene/core/src/java/org/apache/lucene/index/StandardDirectoryReader.java
@@ -82,7 +82,8 @@ public final class StandardDirectoryReader extends DirectoryReader {
}
/** Used by near real-time search */
- static DirectoryReader open(IndexWriter writer, SegmentInfos infos, boolean applyAllDeletes, boolean writeAllDeletes) throws IOException {
+ static StandardDirectoryReader open(IndexWriter writer, IOUtils.IOFunction readerFunction,
+ SegmentInfos infos, boolean applyAllDeletes, boolean writeAllDeletes) throws IOException {
// IndexWriter synchronizes externally before calling
// us, which ensures infos will not change; so there's
// no need to process segments in reverse order
@@ -101,19 +102,14 @@ public final class StandardDirectoryReader extends DirectoryReader {
// IndexWriter's segmentInfos:
final SegmentCommitInfo info = infos.info(i);
assert info.info.dir == dir;
- final ReadersAndUpdates rld = writer.getPooledInstance(info, true);
- try {
- final SegmentReader reader = rld.getReadOnlyClone(IOContext.READ);
- if (reader.numDocs() > 0 || writer.getConfig().mergePolicy.keepFullyDeletedSegment(() -> reader)) {
- // Steal the ref:
- readers.add(reader);
- infosUpto++;
- } else {
- reader.decRef();
- segmentInfos.remove(infosUpto);
- }
- } finally {
- writer.release(rld);
+ final SegmentReader reader = readerFunction.apply(info);
+ if (reader.numDocs() > 0 || writer.getConfig().mergePolicy.keepFullyDeletedSegment(() -> reader)) {
+ // Steal the ref:
+ readers.add(reader);
+ infosUpto++;
+ } else {
+ reader.decRef();
+ segmentInfos.remove(infosUpto);
}
}
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java
index 2c9bc23dca3..79836a71e94 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java
@@ -4206,7 +4206,7 @@ public class TestIndexWriter extends LuceneTestCase {
public void testMergeOnCommitKeepFullyDeletedSegments() throws Exception {
Directory dir = newDirectory();
IndexWriterConfig iwc = newIndexWriterConfig();
- iwc.setMaxCommitMergeWaitMillis(30 * 1000);
+ iwc.setMaxFullFlushMergeWaitMillis(30 * 1000);
iwc.mergePolicy = new FilterMergePolicy(newMergePolicy()) {
@Override
public boolean keepFullyDeletedSegment(IOSupplier readerIOSupplier) {
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterMergePolicy.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterMergePolicy.java
index f7f37d05310..241f21fb254 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterMergePolicy.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterMergePolicy.java
@@ -32,32 +32,13 @@ import org.apache.lucene.document.StringField;
import org.apache.lucene.index.IndexWriterConfig.OpenMode;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchAllDocsQuery;
+import org.apache.lucene.search.TermQuery;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.LuceneTestCase;
public class TestIndexWriterMergePolicy extends LuceneTestCase {
- private static final MergePolicy MERGE_ON_COMMIT_POLICY = new LogDocMergePolicy() {
- @Override
- public MergeSpecification findFullFlushMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext) {
- // Optimize down to a single segment on commit
- if (mergeTrigger == MergeTrigger.COMMIT && segmentInfos.size() > 1) {
- List nonMergingSegments = new ArrayList<>();
- for (SegmentCommitInfo sci : segmentInfos) {
- if (mergeContext.getMergingSegments().contains(sci) == false) {
- nonMergingSegments.add(sci);
- }
- }
- if (nonMergingSegments.size() > 1) {
- MergeSpecification mergeSpecification = new MergeSpecification();
- mergeSpecification.add(new OneMerge(nonMergingSegments));
- return mergeSpecification;
- }
- }
- return null;
- }
- };
// Test the normal case
public void testNormalCase() throws IOException {
@@ -324,7 +305,7 @@ public class TestIndexWriterMergePolicy extends LuceneTestCase {
firstWriter.close(); // When this writer closes, it does not merge on commit.
IndexWriterConfig iwc = newIndexWriterConfig(new MockAnalyzer(random()))
- .setMergePolicy(MERGE_ON_COMMIT_POLICY).setMaxCommitMergeWaitMillis(Integer.MAX_VALUE);
+ .setMergePolicy(new MergeOnXMergePolicy(newMergePolicy(), MergeTrigger.COMMIT)).setMaxFullFlushMergeWaitMillis(Integer.MAX_VALUE);
IndexWriter writerWithMergePolicy = new IndexWriter(dir, iwc);
@@ -369,13 +350,14 @@ public class TestIndexWriterMergePolicy extends LuceneTestCase {
// TODO: Add more checks for other non-double setters!
}
- public void testCarryOverNewDeletes() throws IOException, InterruptedException {
+
+ public void testCarryOverNewDeletesOnCommit() throws IOException, InterruptedException {
try (Directory directory = newDirectory()) {
boolean useSoftDeletes = random().nextBoolean();
CountDownLatch waitForMerge = new CountDownLatch(1);
CountDownLatch waitForUpdate = new CountDownLatch(1);
try (IndexWriter writer = new IndexWriter(directory, newIndexWriterConfig()
- .setMergePolicy(MERGE_ON_COMMIT_POLICY).setMaxCommitMergeWaitMillis(30 * 1000)
+ .setMergePolicy(new MergeOnXMergePolicy(NoMergePolicy.INSTANCE, MergeTrigger.COMMIT)).setMaxFullFlushMergeWaitMillis(30 * 1000)
.setSoftDeletesField("soft_delete")
.setMaxBufferedDocs(Integer.MAX_VALUE)
.setRAMBufferSizeMB(100)
@@ -443,12 +425,22 @@ public class TestIndexWriterMergePolicy extends LuceneTestCase {
* This test makes sure we release the merge readers on abort. MDW will fail if it
* can't close all files
*/
- public void testAbortCommitMerge() throws IOException, InterruptedException {
+
+ public void testAbortMergeOnCommit() throws IOException, InterruptedException {
+ abortMergeOnX(false);
+ }
+
+ public void testAbortMergeOnGetReader() throws IOException, InterruptedException {
+ abortMergeOnX(true);
+ }
+
+ void abortMergeOnX(boolean useGetReader) throws IOException, InterruptedException {
try (Directory directory = newDirectory()) {
CountDownLatch waitForMerge = new CountDownLatch(1);
CountDownLatch waitForDeleteAll = new CountDownLatch(1);
try (IndexWriter writer = new IndexWriter(directory, newIndexWriterConfig()
- .setMergePolicy(MERGE_ON_COMMIT_POLICY).setMaxCommitMergeWaitMillis(30 * 1000)
+ .setMergePolicy(new MergeOnXMergePolicy(newMergePolicy(), useGetReader ? MergeTrigger.GET_READER : MergeTrigger.COMMIT))
+ .setMaxFullFlushMergeWaitMillis(30 * 1000)
.setMergeScheduler(new SerialMergeScheduler() {
@Override
public synchronized void merge(MergeSource mergeSource, MergeTrigger trigger) throws IOException {
@@ -472,10 +464,20 @@ public class TestIndexWriterMergePolicy extends LuceneTestCase {
writer.flush();
writer.addDocument(d2);
Thread t = new Thread(() -> {
+ boolean success = false;
try {
- writer.commit();
+ if (useGetReader) {
+ writer.getReader().close();
+ } else {
+ writer.commit();
+ }
+ success = true;
} catch (IOException e) {
throw new AssertionError(e);
+ } finally {
+ if (success == false) {
+ waitForMerge.countDown();
+ }
}
});
t.start();
@@ -487,10 +489,100 @@ public class TestIndexWriterMergePolicy extends LuceneTestCase {
}
}
+ public void testForceMergeWhileGetReader() throws IOException, InterruptedException {
+ try (Directory directory = newDirectory()) {
+ CountDownLatch waitForMerge = new CountDownLatch(1);
+ CountDownLatch waitForForceMergeCalled = new CountDownLatch(1);
+ try (IndexWriter writer = new IndexWriter(directory, newIndexWriterConfig()
+ .setMergePolicy(new MergeOnXMergePolicy(newMergePolicy(), MergeTrigger.GET_READER))
+ .setMaxFullFlushMergeWaitMillis(30 * 1000)
+ .setMergeScheduler(new SerialMergeScheduler() {
+ @Override
+ public void merge(MergeSource mergeSource, MergeTrigger trigger) throws IOException {
+ waitForMerge.countDown();
+ try {
+ waitForForceMergeCalled.await();
+ } catch (InterruptedException e) {
+ throw new AssertionError(e);
+ }
+ super.merge(mergeSource, trigger);
+ }
+ }))) {
+ Document d1 = new Document();
+ d1.add(new StringField("id", "1", Field.Store.NO));
+ writer.addDocument(d1);
+ writer.flush();
+ Document d2 = new Document();
+ d2.add(new StringField("id", "2", Field.Store.NO));
+ writer.addDocument(d2);
+ Thread t = new Thread(() -> {
+ try (DirectoryReader reader = writer.getReader()){
+ assertEquals(2, reader.maxDoc());
+ } catch (IOException e) {
+ throw new AssertionError(e);
+ }
+ });
+ t.start();
+ waitForMerge.await();
+ Document d3 = new Document();
+ d3.add(new StringField("id", "3", Field.Store.NO));
+ writer.addDocument(d3);
+ waitForForceMergeCalled.countDown();
+ writer.forceMerge(1);
+ t.join();
+ }
+ }
+ }
+
+ public void testFailAfterMergeCommitted() throws IOException {
+ try (Directory directory = newDirectory()) {
+ AtomicBoolean mergeAndFail = new AtomicBoolean(false);
+ try (IndexWriter writer = new IndexWriter(directory, newIndexWriterConfig()
+ .setMergePolicy(new MergeOnXMergePolicy(NoMergePolicy.INSTANCE, MergeTrigger.GET_READER))
+ .setMaxFullFlushMergeWaitMillis(30 * 1000)
+ .setMergeScheduler(new SerialMergeScheduler())) {
+ @Override
+ protected void doAfterFlush() throws IOException {
+ if (mergeAndFail.get() && hasPendingMerges()) {
+ executeMerge(MergeTrigger.GET_READER);
+ throw new RuntimeException("boom");
+ }
+ }
+ }) {
+ Document d1 = new Document();
+ d1.add(new StringField("id", "1", Field.Store.NO));
+ writer.addDocument(d1);
+ writer.flush();
+ Document d2 = new Document();
+ d2.add(new StringField("id", "2", Field.Store.NO));
+ writer.addDocument(d2);
+ writer.flush();
+ mergeAndFail.set(true);
+ try (DirectoryReader reader = writer.getReader()){
+ assertNotNull(reader); // make compiler happy and use the reader
+ fail();
+ } catch (RuntimeException e) {
+ assertEquals("boom", e.getMessage());
+ } finally {
+ mergeAndFail.set(false);
+ }
+ }
+ }
+ }
+
+ public void testStressUpdateSameDocumentWithMergeOnGetReader() throws IOException, InterruptedException {
+ stressUpdateSameDocumentWithMergeOnX(true);
+ }
+
public void testStressUpdateSameDocumentWithMergeOnCommit() throws IOException, InterruptedException {
+ stressUpdateSameDocumentWithMergeOnX(false);
+ }
+
+ void stressUpdateSameDocumentWithMergeOnX(boolean useGetReader) throws IOException, InterruptedException {
try (Directory directory = newDirectory()) {
try (RandomIndexWriter writer = new RandomIndexWriter(random(), directory, newIndexWriterConfig()
- .setMergePolicy(MERGE_ON_COMMIT_POLICY).setMaxCommitMergeWaitMillis(10 + random().nextInt(2000))
+ .setMergePolicy(new MergeOnXMergePolicy(newMergePolicy(), useGetReader ? MergeTrigger.GET_READER : MergeTrigger.COMMIT))
+ .setMaxFullFlushMergeWaitMillis(10 + random().nextInt(2000))
.setSoftDeletesField("soft_delete")
.setMergeScheduler(new ConcurrentMergeScheduler()))) {
Document d1 = new Document();
@@ -499,13 +591,17 @@ public class TestIndexWriterMergePolicy extends LuceneTestCase {
writer.commit();
AtomicInteger iters = new AtomicInteger(100 + random().nextInt(TEST_NIGHTLY ? 5000 : 1000));
+ AtomicInteger numFullFlushes = new AtomicInteger(10 + random().nextInt(TEST_NIGHTLY ? 500 : 100));
AtomicBoolean done = new AtomicBoolean(false);
Thread[] threads = new Thread[1 + random().nextInt(4)];
for (int i = 0; i < threads.length; i++) {
Thread t = new Thread(() -> {
try {
- while (iters.decrementAndGet() > 0) {
+ while (iters.decrementAndGet() > 0 || numFullFlushes.get() > 0) {
writer.updateDocument(new Term("id", "1"), d1);
+ if (random().nextBoolean()) {
+ writer.addDocument(new Document());
+ }
}
} catch (Exception e) {
throw new AssertionError(e);
@@ -519,14 +615,22 @@ public class TestIndexWriterMergePolicy extends LuceneTestCase {
}
try {
while (done.get() == false) {
- if (random().nextBoolean()) {
- writer.commit();
- }
- try (DirectoryReader open = new SoftDeletesDirectoryReaderWrapper(DirectoryReader.open(directory), "___soft_deletes")) {
- assertEquals(1, open.numDocs());
+ if (useGetReader) {
+ try (DirectoryReader reader = writer.getReader()) {
+ assertEquals(1, new IndexSearcher(reader).search(new TermQuery(new Term("id", "1")), 10).totalHits.value);
+ }
+ } else {
+ if (random().nextBoolean()) {
+ writer.commit();
+ }
+ try (DirectoryReader open = new SoftDeletesDirectoryReaderWrapper(DirectoryReader.open(directory), "___soft_deletes")) {
+ assertEquals(1, new IndexSearcher(open).search(new TermQuery(new Term("id", "1")), 10).totalHits.value);
+ }
}
+ numFullFlushes.decrementAndGet();
}
} finally {
+ numFullFlushes.set(0);
for (Thread t : threads) {
t.join();
}
@@ -534,4 +638,66 @@ public class TestIndexWriterMergePolicy extends LuceneTestCase {
}
}
}
+
+ // Test basic semantics of merge on getReader
+ public void testMergeOnGetReader() throws IOException {
+ Directory dir = newDirectory();
+
+ IndexWriter firstWriter = new IndexWriter(dir, newIndexWriterConfig(new MockAnalyzer(random()))
+ .setMergePolicy(NoMergePolicy.INSTANCE));
+ for (int i = 0; i < 5; i++) {
+ TestIndexWriter.addDoc(firstWriter);
+ firstWriter.flush();
+ }
+ DirectoryReader firstReader = DirectoryReader.open(firstWriter);
+ assertEquals(5, firstReader.leaves().size());
+ firstReader.close();
+ firstWriter.close(); // When this writer closes, it does not merge on commit.
+
+ IndexWriterConfig iwc = newIndexWriterConfig(new MockAnalyzer(random()))
+ .setMergePolicy(new MergeOnXMergePolicy(newMergePolicy(), MergeTrigger.GET_READER)).setMaxFullFlushMergeWaitMillis(Integer.MAX_VALUE);
+
+ IndexWriter writerWithMergePolicy = new IndexWriter(dir, iwc);
+
+ try (DirectoryReader unmergedReader = DirectoryReader.open(dir)) { // No changes. GetReader doesn't trigger a merge.
+ assertEquals(5, unmergedReader.leaves().size());
+ }
+
+ TestIndexWriter.addDoc(writerWithMergePolicy);
+ try (DirectoryReader mergedReader = writerWithMergePolicy.getReader()) {
+ // Doc added, do merge on getReader.
+ assertEquals(1, mergedReader.leaves().size());
+ }
+
+ writerWithMergePolicy.close();
+ dir.close();
+ }
+
+ private static class MergeOnXMergePolicy extends FilterMergePolicy {
+ private final MergeTrigger trigger;
+
+ private MergeOnXMergePolicy(MergePolicy in, MergeTrigger trigger) {
+ super(in);
+ this.trigger = trigger;
+ }
+
+ @Override
+ public MergeSpecification findFullFlushMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext) {
+ // Optimize down to a single segment on commit
+ if (mergeTrigger == trigger && segmentInfos.size() > 1) {
+ List nonMergingSegments = new ArrayList<>();
+ for (SegmentCommitInfo sci : segmentInfos) {
+ if (mergeContext.getMergingSegments().contains(sci) == false) {
+ nonMergingSegments.add(sci);
+ }
+ }
+ if (nonMergingSegments.size() > 1) {
+ MergeSpecification mergeSpecification = new MergeSpecification();
+ mergeSpecification.add(new OneMerge(nonMergingSegments));
+ return mergeSpecification;
+ }
+ }
+ return null;
+ }
+ }
}
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterReader.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterReader.java
index 228c34366db..ab1b7e9d6a0 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterReader.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterReader.java
@@ -241,7 +241,7 @@ public class TestIndexWriterReader extends LuceneTestCase {
boolean doFullMerge = false;
Directory dir1 = getAssertNoDeletesDirectory(newDirectory());
- IndexWriterConfig iwc = newIndexWriterConfig(new MockAnalyzer(random()));
+ IndexWriterConfig iwc = newIndexWriterConfig(new MockAnalyzer(random())).setMaxFullFlushMergeWaitMillis(0);
if (iwc.getMaxBufferedDocs() < 20) {
iwc.setMaxBufferedDocs(20);
}
@@ -294,11 +294,11 @@ public class TestIndexWriterReader extends LuceneTestCase {
boolean doFullMerge = false;
Directory dir1 = getAssertNoDeletesDirectory(newDirectory());
- IndexWriter writer = new IndexWriter(dir1, newIndexWriterConfig(new MockAnalyzer(random())));
+ IndexWriter writer = new IndexWriter(dir1, newIndexWriterConfig(new MockAnalyzer(random())).setMaxFullFlushMergeWaitMillis(0));
// create a 2nd index
Directory dir2 = newDirectory();
- IndexWriter writer2 = new IndexWriter(dir2, newIndexWriterConfig(new MockAnalyzer(random())));
+ IndexWriter writer2 = new IndexWriter(dir2, newIndexWriterConfig(new MockAnalyzer(random())).setMaxFullFlushMergeWaitMillis(0));
createIndexNoClose(!doFullMerge, "index2", writer2);
writer2.close();
@@ -324,7 +324,7 @@ public class TestIndexWriterReader extends LuceneTestCase {
boolean doFullMerge = true;
Directory dir1 = getAssertNoDeletesDirectory(newDirectory());
- IndexWriter writer = new IndexWriter(dir1, newIndexWriterConfig(new MockAnalyzer(random())));
+ IndexWriter writer = new IndexWriter(dir1, newIndexWriterConfig(new MockAnalyzer(random())).setMaxFullFlushMergeWaitMillis(0));
// create the index
createIndexNoClose(!doFullMerge, "index1", writer);
writer.flush(false, true);
@@ -361,7 +361,7 @@ public class TestIndexWriterReader extends LuceneTestCase {
writer.close();
// reopen the writer to verify the delete made it to the directory
- writer = new IndexWriter(dir1, newIndexWriterConfig(new MockAnalyzer(random())));
+ writer = new IndexWriter(dir1, newIndexWriterConfig(new MockAnalyzer(random())).setMaxFullFlushMergeWaitMillis(0));
IndexReader w2r1 = writer.getReader();
assertEquals(0, count(new Term("id", id10), w2r1));
w2r1.close();
@@ -377,7 +377,8 @@ public class TestIndexWriterReader extends LuceneTestCase {
Directory mainDir = getAssertNoDeletesDirectory(newDirectory());
IndexWriter mainWriter = new IndexWriter(mainDir, newIndexWriterConfig(new MockAnalyzer(random()))
- .setMergePolicy(newLogMergePolicy()));
+ .setMergePolicy(newLogMergePolicy())
+ .setMaxFullFlushMergeWaitMillis(0));
TestUtil.reduceOpenFiles(mainWriter);
AddDirectoriesThreads addDirThreads = new AddDirectoriesThreads(numIter, mainWriter);
@@ -421,6 +422,7 @@ public class TestIndexWriterReader extends LuceneTestCase {
this.mainWriter = mainWriter;
addDir = newDirectory();
IndexWriter writer = new IndexWriter(addDir, newIndexWriterConfig(new MockAnalyzer(random()))
+ .setMaxFullFlushMergeWaitMillis(0)
.setMaxBufferedDocs(2));
TestUtil.reduceOpenFiles(writer);
for (int i = 0; i < NUM_INIT_DOCS; i++) {
@@ -533,7 +535,8 @@ public class TestIndexWriterReader extends LuceneTestCase {
*/
public void doTestIndexWriterReopenSegment(boolean doFullMerge) throws Exception {
Directory dir1 = getAssertNoDeletesDirectory(newDirectory());
- IndexWriter writer = new IndexWriter(dir1, newIndexWriterConfig(new MockAnalyzer(random())));
+ IndexWriter writer = new IndexWriter(dir1, newIndexWriterConfig(new MockAnalyzer(random()))
+ .setMaxFullFlushMergeWaitMillis(0));
IndexReader r1 = writer.getReader();
assertEquals(0, r1.maxDoc());
createIndexNoClose(false, "index1", writer);
@@ -569,7 +572,7 @@ public class TestIndexWriterReader extends LuceneTestCase {
writer.close();
// test whether the changes made it to the directory
- writer = new IndexWriter(dir1, newIndexWriterConfig(new MockAnalyzer(random())));
+ writer = new IndexWriter(dir1, newIndexWriterConfig(new MockAnalyzer(random())).setMaxFullFlushMergeWaitMillis(0));
IndexReader w2r1 = writer.getReader();
// insure the deletes were actually flushed to the directory
assertEquals(200, w2r1.maxDoc());
@@ -619,6 +622,7 @@ public class TestIndexWriterReader extends LuceneTestCase {
dir1,
newIndexWriterConfig(new MockAnalyzer(random()))
.setMaxBufferedDocs(2)
+ .setMaxFullFlushMergeWaitMillis(0)
.setMergedSegmentWarmer((leafReader) -> warmCount.incrementAndGet())
.setMergeScheduler(new ConcurrentMergeScheduler())
.setMergePolicy(newLogMergePolicy())
@@ -653,7 +657,8 @@ public class TestIndexWriterReader extends LuceneTestCase {
public void testAfterCommit() throws Exception {
Directory dir1 = getAssertNoDeletesDirectory(newDirectory());
IndexWriter writer = new IndexWriter(dir1, newIndexWriterConfig(new MockAnalyzer(random()))
- .setMergeScheduler(new ConcurrentMergeScheduler()));
+ .setMergeScheduler(new ConcurrentMergeScheduler())
+ .setMaxFullFlushMergeWaitMillis(0));
writer.commit();
// create the index
@@ -685,7 +690,7 @@ public class TestIndexWriterReader extends LuceneTestCase {
// Make sure reader remains usable even if IndexWriter closes
public void testAfterClose() throws Exception {
Directory dir1 = getAssertNoDeletesDirectory(newDirectory());
- IndexWriter writer = new IndexWriter(dir1, newIndexWriterConfig(new MockAnalyzer(random())));
+ IndexWriter writer = new IndexWriter(dir1, newIndexWriterConfig(new MockAnalyzer(random())).setMaxFullFlushMergeWaitMillis(0));
// create the index
createIndexNoClose(false, "test", writer);
@@ -715,7 +720,7 @@ public class TestIndexWriterReader extends LuceneTestCase {
Directory dir1 = getAssertNoDeletesDirectory(newDirectory());
final IndexWriter writer = new IndexWriter(
dir1,
- newIndexWriterConfig(new MockAnalyzer(random()))
+ newIndexWriterConfig(new MockAnalyzer(random())).setMaxFullFlushMergeWaitMillis(0)
.setMergePolicy(newLogMergePolicy(2))
);
@@ -1031,7 +1036,7 @@ public class TestIndexWriterReader extends LuceneTestCase {
Directory d = getAssertNoDeletesDirectory(newDirectory());
IndexWriter w = new IndexWriter(
d,
- newIndexWriterConfig(new MockAnalyzer(random())));
+ newIndexWriterConfig(new MockAnalyzer(random())).setMaxFullFlushMergeWaitMillis(0));
DirectoryReader r = w.getReader(); // start pooling readers
@@ -1085,7 +1090,7 @@ public class TestIndexWriterReader extends LuceneTestCase {
}
});
- IndexWriterConfig conf = newIndexWriterConfig(new MockAnalyzer(random()));
+ IndexWriterConfig conf = newIndexWriterConfig(new MockAnalyzer(random())).setMaxFullFlushMergeWaitMillis(0);
conf.setMergePolicy(NoMergePolicy.INSTANCE); // prevent merges from getting in the way
IndexWriter writer = new IndexWriter(dir, conf);
@@ -1116,7 +1121,7 @@ public class TestIndexWriterReader extends LuceneTestCase {
Directory dir = getAssertNoDeletesDirectory(new ByteBuffersDirectory());
// Don't use newIndexWriterConfig, because we need a
// "sane" mergePolicy:
- IndexWriterConfig iwc = new IndexWriterConfig(new MockAnalyzer(random()));
+ IndexWriterConfig iwc = new IndexWriterConfig(new MockAnalyzer(random())).setMaxFullFlushMergeWaitMillis(0);
IndexWriter w = new IndexWriter(dir, iwc);
// Create 500 segments:
for(int i=0;i<500;i++) {
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestSoftDeletesRetentionMergePolicy.java b/lucene/core/src/test/org/apache/lucene/index/TestSoftDeletesRetentionMergePolicy.java
index 9c845f65708..1a10610d85b 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestSoftDeletesRetentionMergePolicy.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestSoftDeletesRetentionMergePolicy.java
@@ -109,7 +109,7 @@ public class TestSoftDeletesRetentionMergePolicy extends LuceneTestCase {
public void testKeepFullyDeletedSegments() throws IOException {
Directory dir = newDirectory();
- IndexWriterConfig indexWriterConfig = newIndexWriterConfig();
+ IndexWriterConfig indexWriterConfig = newIndexWriterConfig().setMergePolicy(NoMergePolicy.INSTANCE);
IndexWriter writer = new IndexWriter(dir, indexWriterConfig);
Document doc = new Document();
diff --git a/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java b/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java
index 2adf31802a6..5343eaefb3c 100644
--- a/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java
+++ b/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java
@@ -1003,7 +1003,7 @@ public abstract class LuceneTestCase extends Assert {
if (rarely(r)) {
c.setCheckPendingFlushUpdate(false);
}
- c.setMaxCommitMergeWaitMillis(rarely() ? atLeast(r, 1000) : atLeast(r, 200));
+ c.setMaxFullFlushMergeWaitMillis(rarely() ? atLeast(r, 1000) : atLeast(r, 200));
return c;
}
diff --git a/solr/core/src/java/org/apache/solr/update/SolrIndexConfig.java b/solr/core/src/java/org/apache/solr/update/SolrIndexConfig.java
index 50bda0ef67f..a364757b8af 100644
--- a/solr/core/src/java/org/apache/solr/update/SolrIndexConfig.java
+++ b/solr/core/src/java/org/apache/solr/update/SolrIndexConfig.java
@@ -73,7 +73,7 @@ public class SolrIndexConfig implements MapSerializable {
* When using a custom merge policy that allows triggering synchronous merges on commit
* (see {@link MergePolicy#findFullFlushMerges(org.apache.lucene.index.MergeTrigger, org.apache.lucene.index.SegmentInfos, org.apache.lucene.index.MergePolicy.MergeContext)}),
* a timeout (in milliseconds) can be set for those merges to finish. Use {@code 1000} in the {@code } section.
- * See {@link IndexWriterConfig#setMaxCommitMergeWaitMillis(long)}.
+ * See {@link IndexWriterConfig#setMaxFullFlushMergeWaitMillis(long)}.
*
*
* Note that as of Solr 8.6, no {@code MergePolicy} shipped with Lucene/Solr make use of
@@ -248,7 +248,7 @@ public class SolrIndexConfig implements MapSerializable {
}
if (maxCommitMergeWaitMillis > 0) {
- iwc.setMaxCommitMergeWaitMillis(maxCommitMergeWaitMillis);
+ iwc.setMaxFullFlushMergeWaitMillis(maxCommitMergeWaitMillis);
}
iwc.setSimilarity(schema.getSimilarity());
diff --git a/solr/core/src/test/org/apache/solr/update/SolrIndexConfigTest.java b/solr/core/src/test/org/apache/solr/update/SolrIndexConfigTest.java
index 2e4e59759c1..4f43b10a260 100644
--- a/solr/core/src/test/org/apache/solr/update/SolrIndexConfigTest.java
+++ b/solr/core/src/test/org/apache/solr/update/SolrIndexConfigTest.java
@@ -221,10 +221,10 @@ public class SolrIndexConfigTest extends SolrTestCaseJ4 {
public void testMaxCommitMergeWaitTime() throws Exception {
SolrConfig sc = new SolrConfig(TEST_PATH().resolve("collection1"), "solrconfig-test-misc.xml");
assertEquals(-1, sc.indexConfig.maxCommitMergeWaitMillis);
- assertEquals(IndexWriterConfig.DEFAULT_MAX_COMMIT_MERGE_WAIT_MILLIS, sc.indexConfig.toIndexWriterConfig(h.getCore()).getMaxCommitMergeWaitMillis());
+ assertEquals(IndexWriterConfig.DEFAULT_MAX_FULL_FLUSH_MERGE_WAIT_MILLIS, sc.indexConfig.toIndexWriterConfig(h.getCore()).getMaxFullFlushMergeWaitMillis());
System.setProperty("solr.tests.maxCommitMergeWaitTime", "10");
sc = new SolrConfig(TEST_PATH().resolve("collection1"), "solrconfig-test-misc.xml");
assertEquals(10, sc.indexConfig.maxCommitMergeWaitMillis);
- assertEquals(10, sc.indexConfig.toIndexWriterConfig(h.getCore()).getMaxCommitMergeWaitMillis());
+ assertEquals(10, sc.indexConfig.toIndexWriterConfig(h.getCore()).getMaxFullFlushMergeWaitMillis());
}
}