mirror of https://github.com/apache/lucene.git
LUCENE-8962: Merge segments on getReader (#1623)
Add IndexWriter merge-on-refresh feature to selectively merge small segments on getReader, subject to a configurable timeout, to improve search performance by reducing the number of small segments for searching. Co-authored-by: Mike McCandless <mikemccand@apache.org>
This commit is contained in:
parent
96a853b200
commit
8294e1ae20
|
@ -172,6 +172,10 @@ New Features
|
|||
|
||||
* LUCENE-9386: RegExpQuery added case insensitive matching option. (Mark Harwood)
|
||||
|
||||
* LUCENE-8962: Add IndexWriter merge-on-refresh feature to selectively merge
|
||||
small segments on getReader, subject to a configurable timeout, to improve
|
||||
search performance by reducing the number of small segments for searching. (Simon Willnauer)
|
||||
|
||||
Improvements
|
||||
---------------------
|
||||
|
||||
|
|
|
@ -39,6 +39,7 @@ import java.util.concurrent.atomic.AtomicInteger;
|
|||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.function.BooleanSupplier;
|
||||
import java.util.function.IntPredicate;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.StreamSupport;
|
||||
|
@ -545,9 +546,10 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
|
|||
// obtained during this flush are pooled, the first time
|
||||
// this method is called:
|
||||
readerPool.enableReaderPooling();
|
||||
DirectoryReader r = null;
|
||||
StandardDirectoryReader r = null;
|
||||
doBeforeFlush();
|
||||
boolean anyChanges = false;
|
||||
boolean anyChanges;
|
||||
final long maxFullFlushMergeWaitMillis = config.getMaxFullFlushMergeWaitMillis();
|
||||
/*
|
||||
* for releasing a NRT reader we must ensure that
|
||||
* DW doesn't add any segments or deletes until we are
|
||||
|
@ -555,8 +557,46 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
|
|||
* We release the two stage full flush after we are done opening the
|
||||
* directory reader!
|
||||
*/
|
||||
MergePolicy.MergeSpecification onGetReaderMerges = null;
|
||||
final AtomicBoolean stopCollectingMergedReaders = new AtomicBoolean(false);
|
||||
final Map<String, SegmentReader> mergedReaders = new HashMap<>();
|
||||
final Map<String, SegmentReader> openedReadOnlyClones = new HashMap<>();
|
||||
// this function is used to control which SR are opened in order to keep track of them
|
||||
// and to reuse them in the case we wait for merges in this getReader call.
|
||||
IOUtils.IOFunction<SegmentCommitInfo, SegmentReader> readerFactory = sci -> {
|
||||
final ReadersAndUpdates rld = getPooledInstance(sci, true);
|
||||
try {
|
||||
assert Thread.holdsLock(IndexWriter.this);
|
||||
SegmentReader segmentReader = rld.getReadOnlyClone(IOContext.READ);
|
||||
if (maxFullFlushMergeWaitMillis > 0) { // only track this if we actually do fullFlush merges
|
||||
openedReadOnlyClones.put(sci.info.name, segmentReader);
|
||||
}
|
||||
return segmentReader;
|
||||
} finally {
|
||||
release(rld);
|
||||
}
|
||||
};
|
||||
Closeable onGetReaderMergeResources = null;
|
||||
SegmentInfos openingSegmentInfos = null;
|
||||
boolean success2 = false;
|
||||
try {
|
||||
/* this is the essential part of the getReader method. We need to take care of the following things:
|
||||
* - flush all currently in-memory DWPTs to disk
|
||||
* - apply all deletes & updates to new and to the existing DWPTs
|
||||
* - prevent flushes and applying deletes of concurrently indexing DWPTs to be applied
|
||||
* - open a SDR on the updated SIS
|
||||
*
|
||||
* in order to prevent concurrent flushes we call DocumentsWriter#flushAllThreads that swaps out the deleteQueue
|
||||
* (this enforces a happens before relationship between this and the subsequent full flush) and informs the
|
||||
* FlushControl (#markForFullFlush()) that it should prevent any new DWPTs from flushing until we are \
|
||||
* done (DocumentsWriter#finishFullFlush(boolean)). All this is guarded by the fullFlushLock to prevent multiple
|
||||
* full flushes from happening concurrently. Once the DocWriter has initiated a full flush we can sequentially flush
|
||||
* and apply deletes & updates to the written segments without worrying about concurrently indexing DWPTs. The important
|
||||
* aspect is that it all happens between DocumentsWriter#flushAllThread() and DocumentsWriter#finishFullFlush(boolean)
|
||||
* since once the flush is marked as done deletes start to be applied to the segments on disk without guarantees that
|
||||
* the corresponding added documents (in the update case) are flushed and visible when opening a SDR.
|
||||
*
|
||||
*/
|
||||
boolean success = false;
|
||||
synchronized (fullFlushLock) {
|
||||
try {
|
||||
|
@ -573,7 +613,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
|
|||
if (applyAllDeletes) {
|
||||
applyAllDeletesAndUpdates();
|
||||
}
|
||||
|
||||
synchronized(this) {
|
||||
|
||||
// NOTE: we cannot carry doc values updates in memory yet, so we always must write them through to disk and re-open each
|
||||
|
@ -581,16 +620,50 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
|
|||
|
||||
// TODO: we could instead just clone SIS and pull/incref readers in sync'd block, and then do this w/o IW's lock?
|
||||
// Must do this sync'd on IW to prevent a merge from completing at the last second and failing to write its DV updates:
|
||||
writeReaderPool(writeAllDeletes);
|
||||
writeReaderPool(writeAllDeletes);
|
||||
|
||||
// Prevent segmentInfos from changing while opening the
|
||||
// reader; in theory we could instead do similar retry logic,
|
||||
// just like we do when loading segments_N
|
||||
|
||||
r = StandardDirectoryReader.open(this, segmentInfos, applyAllDeletes, writeAllDeletes);
|
||||
r = StandardDirectoryReader.open(this, readerFactory, segmentInfos, applyAllDeletes, writeAllDeletes);
|
||||
if (infoStream.isEnabled("IW")) {
|
||||
infoStream.message("IW", "return reader version=" + r.getVersion() + " reader=" + r);
|
||||
}
|
||||
if (maxFullFlushMergeWaitMillis > 0) {
|
||||
// we take the SIS from the reader which has already pruned away fully deleted readers
|
||||
// this makes pulling the readers below after the merge simpler since we can be safe that
|
||||
// they are not closed. Every segment has a corresponding SR in the SDR we opened if we use
|
||||
// this SIS
|
||||
// we need to do this rather complicated management of SRs and infos since we can't wait for merges
|
||||
// while we hold the fullFlushLock since the merge might hit a tragic event and that must not be reported
|
||||
// while holding that lock. Merging outside of the lock ie. after calling docWriter.finishFullFlush(boolean) would
|
||||
// yield wrong results because deletes might sneak in during the merge
|
||||
openingSegmentInfos = r.getSegmentInfos().clone();
|
||||
onGetReaderMerges = preparePointInTimeMerge(openingSegmentInfos, stopCollectingMergedReaders::get, MergeTrigger.GET_READER,
|
||||
sci -> {
|
||||
assert stopCollectingMergedReaders.get() == false : "illegal state merge reader must be not pulled since we already stopped waiting for merges";
|
||||
SegmentReader apply = readerFactory.apply(sci);
|
||||
mergedReaders.put(sci.info.name, apply);
|
||||
// we need to incRef the files of the opened SR otherwise it's possible that another merge
|
||||
// removes the segment before we pass it on to the SDR
|
||||
deleter.incRef(sci.files());
|
||||
});
|
||||
onGetReaderMergeResources = () -> {
|
||||
// this needs to be closed once after we are done. In the case of an exception it releases
|
||||
// all resources, closes the merged readers and decrements the files references.
|
||||
// this only happens for readers that haven't been removed from the mergedReaders and release elsewhere
|
||||
synchronized (this) {
|
||||
stopCollectingMergedReaders.set(true);
|
||||
IOUtils.close(mergedReaders.values().stream().map(sr -> (Closeable) () -> {
|
||||
try {
|
||||
deleter.decRef(sr.getSegmentInfo().files());
|
||||
} finally {
|
||||
sr.close();
|
||||
}
|
||||
}).collect(Collectors.toList()));
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
success = true;
|
||||
} finally {
|
||||
|
@ -607,6 +680,19 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
|
|||
}
|
||||
}
|
||||
}
|
||||
if (onGetReaderMerges != null) { // only relevant if we do merge on getReader
|
||||
StandardDirectoryReader mergedReader = finishGetReaderMerge(stopCollectingMergedReaders, mergedReaders,
|
||||
openedReadOnlyClones, openingSegmentInfos, applyAllDeletes,
|
||||
writeAllDeletes, onGetReaderMerges, maxFullFlushMergeWaitMillis);
|
||||
if (mergedReader != null) {
|
||||
try {
|
||||
r.close();
|
||||
} finally {
|
||||
r = mergedReader;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
anyChanges |= maybeMerge.getAndSet(false);
|
||||
if (anyChanges) {
|
||||
maybeMerge(config.getMergePolicy(), MergeTrigger.FULL_FLUSH, UNBOUNDED_MAX_MERGE_SEGMENTS);
|
||||
|
@ -621,15 +707,66 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
|
|||
} finally {
|
||||
if (!success2) {
|
||||
try {
|
||||
IOUtils.closeWhileHandlingException(r);
|
||||
IOUtils.closeWhileHandlingException(r, onGetReaderMergeResources);
|
||||
} finally {
|
||||
maybeCloseOnTragicEvent();
|
||||
}
|
||||
} else {
|
||||
IOUtils.close(onGetReaderMergeResources);
|
||||
}
|
||||
}
|
||||
return r;
|
||||
}
|
||||
|
||||
private StandardDirectoryReader finishGetReaderMerge(AtomicBoolean stopCollectingMergedReaders, Map<String, SegmentReader> mergedReaders,
|
||||
Map<String, SegmentReader> openedReadOnlyClones, SegmentInfos openingSegmentInfos,
|
||||
boolean applyAllDeletes, boolean writeAllDeletes,
|
||||
MergePolicy.MergeSpecification pointInTimeMerges, long maxCommitMergeWaitMillis) throws IOException {
|
||||
assert openingSegmentInfos != null;
|
||||
mergeScheduler.merge(mergeSource, MergeTrigger.GET_READER);
|
||||
pointInTimeMerges.await(maxCommitMergeWaitMillis, TimeUnit.MILLISECONDS);
|
||||
synchronized (this) {
|
||||
stopCollectingMergedReaders.set(true);
|
||||
StandardDirectoryReader reader = maybeReopenMergedNRTReader(mergedReaders, openedReadOnlyClones, openingSegmentInfos,
|
||||
applyAllDeletes, writeAllDeletes);
|
||||
IOUtils.close(mergedReaders.values());
|
||||
mergedReaders.clear();
|
||||
return reader;
|
||||
}
|
||||
}
|
||||
|
||||
private StandardDirectoryReader maybeReopenMergedNRTReader(Map<String, SegmentReader> mergedReaders,
|
||||
Map<String, SegmentReader> openedReadOnlyClones, SegmentInfos openingSegmentInfos,
|
||||
boolean applyAllDeletes, boolean writeAllDeletes) throws IOException {
|
||||
assert Thread.holdsLock(this);
|
||||
if (mergedReaders.isEmpty() == false) {
|
||||
Collection<String> files = new ArrayList<>();
|
||||
try {
|
||||
return StandardDirectoryReader.open(this,
|
||||
sci -> {
|
||||
// as soon as we remove the reader and return it the StandardDirectoryReader#open
|
||||
// will take care of closing it. We only need to handle the readers that remain in the
|
||||
// mergedReaders map and close them.
|
||||
SegmentReader remove = mergedReaders.remove(sci.info.name);
|
||||
if (remove == null) {
|
||||
remove = openedReadOnlyClones.remove(sci.info.name);
|
||||
assert remove != null;
|
||||
// each of the readers we reuse from the previous reader needs to be incRef'd
|
||||
// since we reuse them but don't have an implicit incRef in the SDR:open call
|
||||
remove.incRef();
|
||||
} else {
|
||||
files.addAll(remove.getSegmentInfo().files());
|
||||
}
|
||||
return remove;
|
||||
}, openingSegmentInfos, applyAllDeletes, writeAllDeletes);
|
||||
} finally {
|
||||
// now the SDR#open call has incRef'd the files so we can let them go
|
||||
deleter.decRef(files);
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public final long ramBytesUsed() {
|
||||
ensureOpen();
|
||||
|
@ -2131,10 +2268,14 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
|
|||
private final void maybeMerge(MergePolicy mergePolicy, MergeTrigger trigger, int maxNumSegments) throws IOException {
|
||||
ensureOpen(false);
|
||||
if (updatePendingMerges(mergePolicy, trigger, maxNumSegments) != null) {
|
||||
mergeScheduler.merge(mergeSource, trigger);
|
||||
executeMerge(trigger);
|
||||
}
|
||||
}
|
||||
|
||||
final void executeMerge(MergeTrigger trigger) throws IOException {
|
||||
mergeScheduler.merge(mergeSource, trigger);
|
||||
}
|
||||
|
||||
private synchronized MergePolicy.MergeSpecification updatePendingMerges(MergePolicy mergePolicy, MergeTrigger trigger, int maxNumSegments)
|
||||
throws IOException {
|
||||
|
||||
|
@ -2168,6 +2309,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
|
|||
}
|
||||
} else {
|
||||
switch (trigger) {
|
||||
case GET_READER:
|
||||
case COMMIT:
|
||||
spec = mergePolicy.findFullFlushMerges(trigger, segmentInfos, this);
|
||||
break;
|
||||
|
@ -3189,9 +3331,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
|
|||
SegmentInfos toCommit = null;
|
||||
boolean anyChanges = false;
|
||||
long seqNo;
|
||||
MergePolicy.MergeSpecification onCommitMerges = null;
|
||||
AtomicBoolean includeInCommit = new AtomicBoolean(true);
|
||||
final long maxCommitMergeWaitMillis = config.getMaxCommitMergeWaitMillis();
|
||||
MergePolicy.MergeSpecification pointInTimeMerges = null;
|
||||
AtomicBoolean stopAddingMergedSegments = new AtomicBoolean(false);
|
||||
final long maxCommitMergeWaitMillis = config.getMaxFullFlushMergeWaitMillis();
|
||||
// This is copied from doFlush, except it's modified to
|
||||
// clone & incRef the flushed SegmentInfos inside the
|
||||
// sync block:
|
||||
|
@ -3252,9 +3394,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
|
|||
// removed the files we are now syncing.
|
||||
deleter.incRef(toCommit.files(false));
|
||||
if (anyChanges && maxCommitMergeWaitMillis > 0) {
|
||||
// we can safely call prepareOnCommitMerge since writeReaderPool(true) above wrote all
|
||||
// we can safely call preparePointInTimeMerge since writeReaderPool(true) above wrote all
|
||||
// necessary files to disk and checkpointed them.
|
||||
onCommitMerges = prepareOnCommitMerge(toCommit, includeInCommit);
|
||||
pointInTimeMerges = preparePointInTimeMerge(toCommit, stopAddingMergedSegments::get, MergeTrigger.COMMIT, sci->{});
|
||||
}
|
||||
}
|
||||
success = true;
|
||||
|
@ -3277,21 +3419,21 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
|
|||
maybeCloseOnTragicEvent();
|
||||
}
|
||||
|
||||
if (onCommitMerges != null) {
|
||||
if (pointInTimeMerges != null) {
|
||||
if (infoStream.isEnabled("IW")) {
|
||||
infoStream.message("IW", "now run merges during commit: " + onCommitMerges.segString(directory));
|
||||
infoStream.message("IW", "now run merges during commit: " + pointInTimeMerges.segString(directory));
|
||||
}
|
||||
mergeScheduler.merge(mergeSource, MergeTrigger.COMMIT);
|
||||
onCommitMerges.await(maxCommitMergeWaitMillis, TimeUnit.MILLISECONDS);
|
||||
pointInTimeMerges.await(maxCommitMergeWaitMillis, TimeUnit.MILLISECONDS);
|
||||
if (infoStream.isEnabled("IW")) {
|
||||
infoStream.message("IW", "done waiting for merges during commit");
|
||||
}
|
||||
synchronized (this) {
|
||||
// we need to call this under lock since mergeFinished above is also called under the IW lock
|
||||
includeInCommit.set(false);
|
||||
stopAddingMergedSegments.set(true);
|
||||
}
|
||||
}
|
||||
// do this after handling any onCommitMerges since the files will have changed if any merges
|
||||
// do this after handling any pointInTimeMerges since the files will have changed if any merges
|
||||
// did complete
|
||||
filesToCommit = toCommit.files(false);
|
||||
try {
|
||||
|
@ -3322,21 +3464,24 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
|
|||
}
|
||||
|
||||
/**
|
||||
* This optimization allows a commit to wait for merges on smallish segments to
|
||||
* reduce the eventual number of tiny segments in the commit point. We wrap a {@code OneMerge} to
|
||||
* update the {@code committingSegmentInfos} once the merge has finished. We replace the source segments
|
||||
* in the SIS that we are going to commit with the freshly merged segment, but ignore all deletions and updates
|
||||
* that are made to documents in the merged segment while it was merging. The updates that are made do not belong to
|
||||
* the point-in-time commit point and should therefore not be included. See the clone call in {@code onMergeComplete}
|
||||
* This optimization allows a commit/getReader to wait for merges on smallish segments to
|
||||
* reduce the eventual number of tiny segments in the commit point / NRT Reader. We wrap a {@code OneMerge} to
|
||||
* update the {@code mergingSegmentInfos} once the merge has finished. We replace the source segments
|
||||
* in the SIS that we are going to commit / open the reader on with the freshly merged segment, but ignore all deletions and updates
|
||||
* that are made to documents in the merged segment while it was merging. The updates that are made do not belong to
|
||||
* the point-in-time commit point / NRT READER and should therefore not be included. See the clone call in {@code onMergeComplete}
|
||||
* below. We also ensure that we pull the merge readers while holding {@code IndexWriter}'s lock. Otherwise
|
||||
* we could see concurrent deletions/updates applied that do not belong to the segment.
|
||||
*/
|
||||
private MergePolicy.MergeSpecification prepareOnCommitMerge(SegmentInfos committingSegmentInfos, AtomicBoolean includeInCommit) throws IOException {
|
||||
private MergePolicy.MergeSpecification preparePointInTimeMerge(SegmentInfos mergingSegmentInfos, BooleanSupplier stopCollectingMergeResults,
|
||||
MergeTrigger trigger,
|
||||
IOUtils.IOConsumer<SegmentCommitInfo> mergeFinished) throws IOException {
|
||||
assert Thread.holdsLock(this);
|
||||
MergePolicy.MergeSpecification onCommitMerges = updatePendingMerges(new OneMergeWrappingMergePolicy(config.getMergePolicy(), toWrap ->
|
||||
assert trigger == MergeTrigger.GET_READER || trigger == MergeTrigger.COMMIT : "illegal trigger: " + trigger;
|
||||
MergePolicy.MergeSpecification pointInTimeMerges = updatePendingMerges(new OneMergeWrappingMergePolicy(config.getMergePolicy(), toWrap ->
|
||||
new MergePolicy.OneMerge(toWrap.segments) {
|
||||
SegmentCommitInfo origInfo;
|
||||
AtomicBoolean onlyOnce = new AtomicBoolean(false);
|
||||
final AtomicBoolean onlyOnce = new AtomicBoolean(false);
|
||||
|
||||
@Override
|
||||
public void mergeFinished(boolean committed, boolean segmentDropped) throws IOException {
|
||||
|
@ -3344,50 +3489,65 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
|
|||
|
||||
// includedInCommit will be set (above, by our caller) to false if the allowed max wall clock
|
||||
// time (IWC.getMaxCommitMergeWaitMillis()) has elapsed, which means we did not make the timeout
|
||||
// and will not commit our merge to the to-be-commited SegmentInfos
|
||||
|
||||
// and will not commit our merge to the to-be-committed SegmentInfos
|
||||
if (segmentDropped == false
|
||||
&& committed
|
||||
&& includeInCommit.get()) {
|
||||
&& stopCollectingMergeResults.getAsBoolean() == false) {
|
||||
|
||||
// make sure onMergeComplete really was called:
|
||||
assert origInfo != null;
|
||||
|
||||
if (infoStream.isEnabled("IW")) {
|
||||
infoStream.message("IW", "now apply merge during commit: " + toWrap.segString());
|
||||
}
|
||||
|
||||
// make sure onMergeComplete really was called:
|
||||
assert origInfo != null;
|
||||
|
||||
deleter.incRef(origInfo.files());
|
||||
if (trigger == MergeTrigger.COMMIT) {
|
||||
// if we do this in a getReader call here this is obsolete since we already hold a reader that has
|
||||
// incRef'd these files
|
||||
deleter.incRef(origInfo.files());
|
||||
}
|
||||
Set<String> mergedSegmentNames = new HashSet<>();
|
||||
for (SegmentCommitInfo sci : segments) {
|
||||
mergedSegmentNames.add(sci.info.name);
|
||||
}
|
||||
List<SegmentCommitInfo> toCommitMergedAwaySegments = new ArrayList<>();
|
||||
for (SegmentCommitInfo sci : committingSegmentInfos) {
|
||||
for (SegmentCommitInfo sci : mergingSegmentInfos) {
|
||||
if (mergedSegmentNames.contains(sci.info.name)) {
|
||||
toCommitMergedAwaySegments.add(sci);
|
||||
deleter.decRef(sci.files());
|
||||
if (trigger == MergeTrigger.COMMIT) {
|
||||
// if we do this in a getReader call here this is obsolete since we already hold a reader that has
|
||||
// incRef'd these files and will decRef them when it's closed
|
||||
deleter.decRef(sci.files());
|
||||
}
|
||||
}
|
||||
}
|
||||
// Construct a OneMerge that applies to toCommit
|
||||
MergePolicy.OneMerge applicableMerge = new MergePolicy.OneMerge(toCommitMergedAwaySegments);
|
||||
applicableMerge.info = origInfo;
|
||||
long segmentCounter = Long.parseLong(origInfo.info.name.substring(1), Character.MAX_RADIX);
|
||||
committingSegmentInfos.counter = Math.max(committingSegmentInfos.counter, segmentCounter + 1);
|
||||
committingSegmentInfos.applyMergeChanges(applicableMerge, false);
|
||||
mergingSegmentInfos.counter = Math.max(mergingSegmentInfos.counter, segmentCounter + 1);
|
||||
mergingSegmentInfos.applyMergeChanges(applicableMerge, false);
|
||||
} else {
|
||||
if (infoStream.isEnabled("IW")) {
|
||||
infoStream.message("IW", "skip apply merge during commit: " + toWrap.segString());
|
||||
}
|
||||
}
|
||||
toWrap.mergeFinished(committed, false);
|
||||
toWrap.mergeFinished(committed, segmentDropped);
|
||||
super.mergeFinished(committed, segmentDropped);
|
||||
}
|
||||
|
||||
@Override
|
||||
void onMergeComplete() {
|
||||
// clone the target info to make sure we have the original info without the updated del and update gens
|
||||
origInfo = info.clone();
|
||||
void onMergeComplete() throws IOException {
|
||||
assert Thread.holdsLock(IndexWriter.this);
|
||||
if (stopCollectingMergeResults.getAsBoolean() == false
|
||||
&& isAborted() == false
|
||||
&& info.info.maxDoc() > 0/* never do this if the segment if dropped / empty */) {
|
||||
mergeFinished.accept(info);
|
||||
// clone the target info to make sure we have the original info without the updated del and update gens
|
||||
origInfo = info.clone();
|
||||
}
|
||||
toWrap.onMergeComplete();
|
||||
super.onMergeComplete();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -3404,11 +3564,11 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
|
|||
return toWrap.wrapForMerge(reader); // must delegate
|
||||
}
|
||||
}
|
||||
), MergeTrigger.COMMIT, UNBOUNDED_MAX_MERGE_SEGMENTS);
|
||||
if (onCommitMerges != null) {
|
||||
), trigger, UNBOUNDED_MAX_MERGE_SEGMENTS);
|
||||
if (pointInTimeMerges != null) {
|
||||
boolean closeReaders = true;
|
||||
try {
|
||||
for (MergePolicy.OneMerge merge : onCommitMerges.merges) {
|
||||
for (MergePolicy.OneMerge merge : pointInTimeMerges.merges) {
|
||||
IOContext context = new IOContext(merge.getStoreMergeInfo());
|
||||
merge.initMergeReaders(
|
||||
sci -> {
|
||||
|
@ -3422,17 +3582,20 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
|
|||
closeReaders = false;
|
||||
} finally {
|
||||
if (closeReaders) {
|
||||
IOUtils.applyToAll(onCommitMerges.merges, merge -> {
|
||||
IOUtils.applyToAll(pointInTimeMerges.merges, merge -> {
|
||||
// that merge is broken we need to clean up after it - it's fine we still have the IW lock to do this
|
||||
boolean removed = pendingMerges.remove(merge);
|
||||
assert removed: "merge should be pending but isn't: " + merge.segString();
|
||||
abortOneMerge(merge);
|
||||
mergeFinish(merge);
|
||||
try {
|
||||
abortOneMerge(merge);
|
||||
} finally {
|
||||
mergeFinish(merge);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
return onCommitMerges;
|
||||
return pointInTimeMerges;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -110,8 +110,8 @@ public final class IndexWriterConfig extends LiveIndexWriterConfig {
|
|||
/** Default value for whether calls to {@link IndexWriter#close()} include a commit. */
|
||||
public final static boolean DEFAULT_COMMIT_ON_CLOSE = true;
|
||||
|
||||
/** Default value for time to wait for merges on commit (when using a {@link MergePolicy} that implements {@link MergePolicy#findFullFlushMerges}). */
|
||||
public static final long DEFAULT_MAX_COMMIT_MERGE_WAIT_MILLIS = 0;
|
||||
/** Default value for time to wait for merges on commit or getReader (when using a {@link MergePolicy} that implements {@link MergePolicy#findFullFlushMerges}). */
|
||||
public static final long DEFAULT_MAX_FULL_FLUSH_MERGE_WAIT_MILLIS = 0;
|
||||
|
||||
// indicates whether this config instance is already attached to a writer.
|
||||
// not final so that it can be cloned properly.
|
||||
|
@ -463,17 +463,18 @@ public final class IndexWriterConfig extends LiveIndexWriterConfig {
|
|||
}
|
||||
|
||||
/**
|
||||
* Expert: sets the amount of time to wait for merges (during {@link IndexWriter#commit}) returned by
|
||||
* Expert: sets the amount of time to wait for merges (during {@link IndexWriter#commit}
|
||||
* or {@link IndexWriter#getReader(boolean, boolean)}) returned by
|
||||
* MergePolicy.findFullFlushMerges(...).
|
||||
* If this time is reached, we proceed with the commit based on segments merged up to that point.
|
||||
* The merges are not cancelled, and will still run to completion independent of the commit,
|
||||
* like natural segment merges. The default is <code>{@value IndexWriterConfig#DEFAULT_MAX_COMMIT_MERGE_WAIT_MILLIS}</code>.
|
||||
* The merges are not aborted, and will still run to completion independent of the commit or getReader call,
|
||||
* like natural segment merges. The default is <code>{@value IndexWriterConfig#DEFAULT_MAX_FULL_FLUSH_MERGE_WAIT_MILLIS}</code>.
|
||||
*
|
||||
* Note: This settings has no effect unless {@link MergePolicy#findFullFlushMerges(MergeTrigger, SegmentInfos, MergePolicy.MergeContext)}
|
||||
* has an implementation that actually returns merges which by default doesn't return any merges.
|
||||
*/
|
||||
public IndexWriterConfig setMaxCommitMergeWaitMillis(long maxCommitMergeWaitMillis) {
|
||||
this.maxCommitMergeWaitMillis = maxCommitMergeWaitMillis;
|
||||
public IndexWriterConfig setMaxFullFlushMergeWaitMillis(long maxFullFlushMergeWaitMillis) {
|
||||
this.maxFullFlushMergeWaitMillis = maxFullFlushMergeWaitMillis;
|
||||
return this;
|
||||
}
|
||||
|
||||
|
|
|
@ -110,7 +110,7 @@ public class LiveIndexWriterConfig {
|
|||
protected String softDeletesField = null;
|
||||
|
||||
/** Amount of time to wait for merges returned by MergePolicy.findFullFlushMerges(...) */
|
||||
protected volatile long maxCommitMergeWaitMillis;
|
||||
protected volatile long maxFullFlushMergeWaitMillis;
|
||||
|
||||
// used by IndexWriterConfig
|
||||
LiveIndexWriterConfig(Analyzer analyzer) {
|
||||
|
@ -134,7 +134,7 @@ public class LiveIndexWriterConfig {
|
|||
flushPolicy = new FlushByRamOrCountsPolicy();
|
||||
readerPooling = IndexWriterConfig.DEFAULT_READER_POOLING;
|
||||
perThreadHardLimitMB = IndexWriterConfig.DEFAULT_RAM_PER_THREAD_HARD_LIMIT_MB;
|
||||
maxCommitMergeWaitMillis = IndexWriterConfig.DEFAULT_MAX_COMMIT_MERGE_WAIT_MILLIS;
|
||||
maxFullFlushMergeWaitMillis = IndexWriterConfig.DEFAULT_MAX_FULL_FLUSH_MERGE_WAIT_MILLIS;
|
||||
}
|
||||
|
||||
/** Returns the default analyzer to use for indexing documents. */
|
||||
|
@ -469,8 +469,8 @@ public class LiveIndexWriterConfig {
|
|||
* If this time is reached, we proceed with the commit based on segments merged up to that point.
|
||||
* The merges are not cancelled, and may still run to completion independent of the commit.
|
||||
*/
|
||||
public long getMaxCommitMergeWaitMillis() {
|
||||
return maxCommitMergeWaitMillis;
|
||||
public long getMaxFullFlushMergeWaitMillis() {
|
||||
return maxFullFlushMergeWaitMillis;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -496,7 +496,7 @@ public class LiveIndexWriterConfig {
|
|||
sb.append("indexSort=").append(getIndexSort()).append("\n");
|
||||
sb.append("checkPendingFlushOnUpdate=").append(isCheckPendingFlushOnUpdate()).append("\n");
|
||||
sb.append("softDeletesField=").append(getSoftDeletesField()).append("\n");
|
||||
sb.append("maxCommitMergeWaitMillis=").append(getMaxCommitMergeWaitMillis()).append("\n");
|
||||
sb.append("maxFullFlushMergeWaitMillis=").append(getMaxFullFlushMergeWaitMillis()).append("\n");
|
||||
return sb.toString();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -421,7 +421,7 @@ public abstract class MergePolicy {
|
|||
/**
|
||||
* Called just before the merge is applied to IndexWriter's SegmentInfos
|
||||
*/
|
||||
void onMergeComplete() {
|
||||
void onMergeComplete() throws IOException {
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -623,19 +623,20 @@ public abstract class MergePolicy {
|
|||
/**
|
||||
* Identifies merges that we want to execute (synchronously) on commit. By default, this will do no merging on commit.
|
||||
* If you implement this method in your {@code MergePolicy} you must also set a non-zero timeout using
|
||||
* {@link IndexWriterConfig#setMaxCommitMergeWaitMillis}.
|
||||
* {@link IndexWriterConfig#setMaxFullFlushMergeWaitMillis}.
|
||||
*
|
||||
* Any merges returned here will make {@link IndexWriter#commit()} or {@link IndexWriter#prepareCommit()} block until
|
||||
* the merges complete or until {@link IndexWriterConfig#getMaxCommitMergeWaitMillis()} has elapsed. This may be
|
||||
* used to merge small segments that have just been flushed as part of the commit, reducing the number of segments in
|
||||
* the commit. If a merge does not complete in the allotted time, it will continue to execute, and eventually finish and
|
||||
* apply to future commits, but will not be reflected in the current commit.
|
||||
* Any merges returned here will make {@link IndexWriter#commit()}, {@link IndexWriter#prepareCommit()}
|
||||
* or {@link IndexWriter#getReader(boolean, boolean)} block until
|
||||
* the merges complete or until {@link IndexWriterConfig#getMaxFullFlushMergeWaitMillis()} has elapsed. This may be
|
||||
* used to merge small segments that have just been flushed, reducing the number of segments in
|
||||
* the point in time snapshot. If a merge does not complete in the allotted time, it will continue to execute, and eventually finish and
|
||||
* apply to future point in time snapshot, but will not be reflected in the current one.
|
||||
*
|
||||
* If a {@link OneMerge} in the returned {@link MergeSpecification} includes a segment already included in a registered
|
||||
* merge, then {@link IndexWriter#commit()} or {@link IndexWriter#prepareCommit()} will throw a {@link IllegalStateException}.
|
||||
* Use {@link MergeContext#getMergingSegments()} to determine which segments are currently registered to merge.
|
||||
*
|
||||
* @param mergeTrigger the event that triggered the merge (COMMIT or FULL_FLUSH).
|
||||
* @param mergeTrigger the event that triggered the merge (COMMIT or GET_READER).
|
||||
* @param segmentInfos the total set of segments in the index (while preparing the commit)
|
||||
* @param mergeContext the MergeContext to find the merges on, which should be used to determine which segments are
|
||||
* already in a registered merge (see {@link MergeContext#getMergingSegments()}).
|
||||
|
|
|
@ -53,4 +53,8 @@ public enum MergeTrigger {
|
|||
* Merge was triggered on commit.
|
||||
*/
|
||||
COMMIT,
|
||||
/**
|
||||
* Merge was triggered on opening NRT readers.
|
||||
*/
|
||||
GET_READER,
|
||||
}
|
||||
|
|
|
@ -404,7 +404,7 @@ final class ReaderPool implements Closeable {
|
|||
private boolean noDups() {
|
||||
Set<String> seen = new HashSet<>();
|
||||
for(SegmentCommitInfo info : readerMap.keySet()) {
|
||||
assert !seen.contains(info.info.name);
|
||||
assert !seen.contains(info.info.name) : "seen twice: " + info.info.name ;
|
||||
seen.add(info.info.name);
|
||||
}
|
||||
return true;
|
||||
|
|
|
@ -82,7 +82,8 @@ public final class StandardDirectoryReader extends DirectoryReader {
|
|||
}
|
||||
|
||||
/** Used by near real-time search */
|
||||
static DirectoryReader open(IndexWriter writer, SegmentInfos infos, boolean applyAllDeletes, boolean writeAllDeletes) throws IOException {
|
||||
static StandardDirectoryReader open(IndexWriter writer, IOUtils.IOFunction<SegmentCommitInfo, SegmentReader> readerFunction,
|
||||
SegmentInfos infos, boolean applyAllDeletes, boolean writeAllDeletes) throws IOException {
|
||||
// IndexWriter synchronizes externally before calling
|
||||
// us, which ensures infos will not change; so there's
|
||||
// no need to process segments in reverse order
|
||||
|
@ -101,19 +102,14 @@ public final class StandardDirectoryReader extends DirectoryReader {
|
|||
// IndexWriter's segmentInfos:
|
||||
final SegmentCommitInfo info = infos.info(i);
|
||||
assert info.info.dir == dir;
|
||||
final ReadersAndUpdates rld = writer.getPooledInstance(info, true);
|
||||
try {
|
||||
final SegmentReader reader = rld.getReadOnlyClone(IOContext.READ);
|
||||
if (reader.numDocs() > 0 || writer.getConfig().mergePolicy.keepFullyDeletedSegment(() -> reader)) {
|
||||
// Steal the ref:
|
||||
readers.add(reader);
|
||||
infosUpto++;
|
||||
} else {
|
||||
reader.decRef();
|
||||
segmentInfos.remove(infosUpto);
|
||||
}
|
||||
} finally {
|
||||
writer.release(rld);
|
||||
final SegmentReader reader = readerFunction.apply(info);
|
||||
if (reader.numDocs() > 0 || writer.getConfig().mergePolicy.keepFullyDeletedSegment(() -> reader)) {
|
||||
// Steal the ref:
|
||||
readers.add(reader);
|
||||
infosUpto++;
|
||||
} else {
|
||||
reader.decRef();
|
||||
segmentInfos.remove(infosUpto);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -4206,7 +4206,7 @@ public class TestIndexWriter extends LuceneTestCase {
|
|||
public void testMergeOnCommitKeepFullyDeletedSegments() throws Exception {
|
||||
Directory dir = newDirectory();
|
||||
IndexWriterConfig iwc = newIndexWriterConfig();
|
||||
iwc.setMaxCommitMergeWaitMillis(30 * 1000);
|
||||
iwc.setMaxFullFlushMergeWaitMillis(30 * 1000);
|
||||
iwc.mergePolicy = new FilterMergePolicy(newMergePolicy()) {
|
||||
@Override
|
||||
public boolean keepFullyDeletedSegment(IOSupplier<CodecReader> readerIOSupplier) {
|
||||
|
|
|
@ -32,32 +32,13 @@ import org.apache.lucene.document.StringField;
|
|||
import org.apache.lucene.index.IndexWriterConfig.OpenMode;
|
||||
import org.apache.lucene.search.IndexSearcher;
|
||||
import org.apache.lucene.search.MatchAllDocsQuery;
|
||||
import org.apache.lucene.search.TermQuery;
|
||||
import org.apache.lucene.store.Directory;
|
||||
|
||||
import org.apache.lucene.util.LuceneTestCase;
|
||||
|
||||
public class TestIndexWriterMergePolicy extends LuceneTestCase {
|
||||
|
||||
private static final MergePolicy MERGE_ON_COMMIT_POLICY = new LogDocMergePolicy() {
|
||||
@Override
|
||||
public MergeSpecification findFullFlushMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext) {
|
||||
// Optimize down to a single segment on commit
|
||||
if (mergeTrigger == MergeTrigger.COMMIT && segmentInfos.size() > 1) {
|
||||
List<SegmentCommitInfo> nonMergingSegments = new ArrayList<>();
|
||||
for (SegmentCommitInfo sci : segmentInfos) {
|
||||
if (mergeContext.getMergingSegments().contains(sci) == false) {
|
||||
nonMergingSegments.add(sci);
|
||||
}
|
||||
}
|
||||
if (nonMergingSegments.size() > 1) {
|
||||
MergeSpecification mergeSpecification = new MergeSpecification();
|
||||
mergeSpecification.add(new OneMerge(nonMergingSegments));
|
||||
return mergeSpecification;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
};
|
||||
|
||||
// Test the normal case
|
||||
public void testNormalCase() throws IOException {
|
||||
|
@ -324,7 +305,7 @@ public class TestIndexWriterMergePolicy extends LuceneTestCase {
|
|||
firstWriter.close(); // When this writer closes, it does not merge on commit.
|
||||
|
||||
IndexWriterConfig iwc = newIndexWriterConfig(new MockAnalyzer(random()))
|
||||
.setMergePolicy(MERGE_ON_COMMIT_POLICY).setMaxCommitMergeWaitMillis(Integer.MAX_VALUE);
|
||||
.setMergePolicy(new MergeOnXMergePolicy(newMergePolicy(), MergeTrigger.COMMIT)).setMaxFullFlushMergeWaitMillis(Integer.MAX_VALUE);
|
||||
|
||||
|
||||
IndexWriter writerWithMergePolicy = new IndexWriter(dir, iwc);
|
||||
|
@ -369,13 +350,14 @@ public class TestIndexWriterMergePolicy extends LuceneTestCase {
|
|||
// TODO: Add more checks for other non-double setters!
|
||||
}
|
||||
|
||||
public void testCarryOverNewDeletes() throws IOException, InterruptedException {
|
||||
|
||||
public void testCarryOverNewDeletesOnCommit() throws IOException, InterruptedException {
|
||||
try (Directory directory = newDirectory()) {
|
||||
boolean useSoftDeletes = random().nextBoolean();
|
||||
CountDownLatch waitForMerge = new CountDownLatch(1);
|
||||
CountDownLatch waitForUpdate = new CountDownLatch(1);
|
||||
try (IndexWriter writer = new IndexWriter(directory, newIndexWriterConfig()
|
||||
.setMergePolicy(MERGE_ON_COMMIT_POLICY).setMaxCommitMergeWaitMillis(30 * 1000)
|
||||
.setMergePolicy(new MergeOnXMergePolicy(NoMergePolicy.INSTANCE, MergeTrigger.COMMIT)).setMaxFullFlushMergeWaitMillis(30 * 1000)
|
||||
.setSoftDeletesField("soft_delete")
|
||||
.setMaxBufferedDocs(Integer.MAX_VALUE)
|
||||
.setRAMBufferSizeMB(100)
|
||||
|
@ -443,12 +425,22 @@ public class TestIndexWriterMergePolicy extends LuceneTestCase {
|
|||
* This test makes sure we release the merge readers on abort. MDW will fail if it
|
||||
* can't close all files
|
||||
*/
|
||||
public void testAbortCommitMerge() throws IOException, InterruptedException {
|
||||
|
||||
public void testAbortMergeOnCommit() throws IOException, InterruptedException {
|
||||
abortMergeOnX(false);
|
||||
}
|
||||
|
||||
public void testAbortMergeOnGetReader() throws IOException, InterruptedException {
|
||||
abortMergeOnX(true);
|
||||
}
|
||||
|
||||
void abortMergeOnX(boolean useGetReader) throws IOException, InterruptedException {
|
||||
try (Directory directory = newDirectory()) {
|
||||
CountDownLatch waitForMerge = new CountDownLatch(1);
|
||||
CountDownLatch waitForDeleteAll = new CountDownLatch(1);
|
||||
try (IndexWriter writer = new IndexWriter(directory, newIndexWriterConfig()
|
||||
.setMergePolicy(MERGE_ON_COMMIT_POLICY).setMaxCommitMergeWaitMillis(30 * 1000)
|
||||
.setMergePolicy(new MergeOnXMergePolicy(newMergePolicy(), useGetReader ? MergeTrigger.GET_READER : MergeTrigger.COMMIT))
|
||||
.setMaxFullFlushMergeWaitMillis(30 * 1000)
|
||||
.setMergeScheduler(new SerialMergeScheduler() {
|
||||
@Override
|
||||
public synchronized void merge(MergeSource mergeSource, MergeTrigger trigger) throws IOException {
|
||||
|
@ -472,10 +464,20 @@ public class TestIndexWriterMergePolicy extends LuceneTestCase {
|
|||
writer.flush();
|
||||
writer.addDocument(d2);
|
||||
Thread t = new Thread(() -> {
|
||||
boolean success = false;
|
||||
try {
|
||||
writer.commit();
|
||||
if (useGetReader) {
|
||||
writer.getReader().close();
|
||||
} else {
|
||||
writer.commit();
|
||||
}
|
||||
success = true;
|
||||
} catch (IOException e) {
|
||||
throw new AssertionError(e);
|
||||
} finally {
|
||||
if (success == false) {
|
||||
waitForMerge.countDown();
|
||||
}
|
||||
}
|
||||
});
|
||||
t.start();
|
||||
|
@ -487,10 +489,100 @@ public class TestIndexWriterMergePolicy extends LuceneTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void testForceMergeWhileGetReader() throws IOException, InterruptedException {
|
||||
try (Directory directory = newDirectory()) {
|
||||
CountDownLatch waitForMerge = new CountDownLatch(1);
|
||||
CountDownLatch waitForForceMergeCalled = new CountDownLatch(1);
|
||||
try (IndexWriter writer = new IndexWriter(directory, newIndexWriterConfig()
|
||||
.setMergePolicy(new MergeOnXMergePolicy(newMergePolicy(), MergeTrigger.GET_READER))
|
||||
.setMaxFullFlushMergeWaitMillis(30 * 1000)
|
||||
.setMergeScheduler(new SerialMergeScheduler() {
|
||||
@Override
|
||||
public void merge(MergeSource mergeSource, MergeTrigger trigger) throws IOException {
|
||||
waitForMerge.countDown();
|
||||
try {
|
||||
waitForForceMergeCalled.await();
|
||||
} catch (InterruptedException e) {
|
||||
throw new AssertionError(e);
|
||||
}
|
||||
super.merge(mergeSource, trigger);
|
||||
}
|
||||
}))) {
|
||||
Document d1 = new Document();
|
||||
d1.add(new StringField("id", "1", Field.Store.NO));
|
||||
writer.addDocument(d1);
|
||||
writer.flush();
|
||||
Document d2 = new Document();
|
||||
d2.add(new StringField("id", "2", Field.Store.NO));
|
||||
writer.addDocument(d2);
|
||||
Thread t = new Thread(() -> {
|
||||
try (DirectoryReader reader = writer.getReader()){
|
||||
assertEquals(2, reader.maxDoc());
|
||||
} catch (IOException e) {
|
||||
throw new AssertionError(e);
|
||||
}
|
||||
});
|
||||
t.start();
|
||||
waitForMerge.await();
|
||||
Document d3 = new Document();
|
||||
d3.add(new StringField("id", "3", Field.Store.NO));
|
||||
writer.addDocument(d3);
|
||||
waitForForceMergeCalled.countDown();
|
||||
writer.forceMerge(1);
|
||||
t.join();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void testFailAfterMergeCommitted() throws IOException {
|
||||
try (Directory directory = newDirectory()) {
|
||||
AtomicBoolean mergeAndFail = new AtomicBoolean(false);
|
||||
try (IndexWriter writer = new IndexWriter(directory, newIndexWriterConfig()
|
||||
.setMergePolicy(new MergeOnXMergePolicy(NoMergePolicy.INSTANCE, MergeTrigger.GET_READER))
|
||||
.setMaxFullFlushMergeWaitMillis(30 * 1000)
|
||||
.setMergeScheduler(new SerialMergeScheduler())) {
|
||||
@Override
|
||||
protected void doAfterFlush() throws IOException {
|
||||
if (mergeAndFail.get() && hasPendingMerges()) {
|
||||
executeMerge(MergeTrigger.GET_READER);
|
||||
throw new RuntimeException("boom");
|
||||
}
|
||||
}
|
||||
}) {
|
||||
Document d1 = new Document();
|
||||
d1.add(new StringField("id", "1", Field.Store.NO));
|
||||
writer.addDocument(d1);
|
||||
writer.flush();
|
||||
Document d2 = new Document();
|
||||
d2.add(new StringField("id", "2", Field.Store.NO));
|
||||
writer.addDocument(d2);
|
||||
writer.flush();
|
||||
mergeAndFail.set(true);
|
||||
try (DirectoryReader reader = writer.getReader()){
|
||||
assertNotNull(reader); // make compiler happy and use the reader
|
||||
fail();
|
||||
} catch (RuntimeException e) {
|
||||
assertEquals("boom", e.getMessage());
|
||||
} finally {
|
||||
mergeAndFail.set(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void testStressUpdateSameDocumentWithMergeOnGetReader() throws IOException, InterruptedException {
|
||||
stressUpdateSameDocumentWithMergeOnX(true);
|
||||
}
|
||||
|
||||
public void testStressUpdateSameDocumentWithMergeOnCommit() throws IOException, InterruptedException {
|
||||
stressUpdateSameDocumentWithMergeOnX(false);
|
||||
}
|
||||
|
||||
void stressUpdateSameDocumentWithMergeOnX(boolean useGetReader) throws IOException, InterruptedException {
|
||||
try (Directory directory = newDirectory()) {
|
||||
try (RandomIndexWriter writer = new RandomIndexWriter(random(), directory, newIndexWriterConfig()
|
||||
.setMergePolicy(MERGE_ON_COMMIT_POLICY).setMaxCommitMergeWaitMillis(10 + random().nextInt(2000))
|
||||
.setMergePolicy(new MergeOnXMergePolicy(newMergePolicy(), useGetReader ? MergeTrigger.GET_READER : MergeTrigger.COMMIT))
|
||||
.setMaxFullFlushMergeWaitMillis(10 + random().nextInt(2000))
|
||||
.setSoftDeletesField("soft_delete")
|
||||
.setMergeScheduler(new ConcurrentMergeScheduler()))) {
|
||||
Document d1 = new Document();
|
||||
|
@ -499,13 +591,17 @@ public class TestIndexWriterMergePolicy extends LuceneTestCase {
|
|||
writer.commit();
|
||||
|
||||
AtomicInteger iters = new AtomicInteger(100 + random().nextInt(TEST_NIGHTLY ? 5000 : 1000));
|
||||
AtomicInteger numFullFlushes = new AtomicInteger(10 + random().nextInt(TEST_NIGHTLY ? 500 : 100));
|
||||
AtomicBoolean done = new AtomicBoolean(false);
|
||||
Thread[] threads = new Thread[1 + random().nextInt(4)];
|
||||
for (int i = 0; i < threads.length; i++) {
|
||||
Thread t = new Thread(() -> {
|
||||
try {
|
||||
while (iters.decrementAndGet() > 0) {
|
||||
while (iters.decrementAndGet() > 0 || numFullFlushes.get() > 0) {
|
||||
writer.updateDocument(new Term("id", "1"), d1);
|
||||
if (random().nextBoolean()) {
|
||||
writer.addDocument(new Document());
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new AssertionError(e);
|
||||
|
@ -519,14 +615,22 @@ public class TestIndexWriterMergePolicy extends LuceneTestCase {
|
|||
}
|
||||
try {
|
||||
while (done.get() == false) {
|
||||
if (random().nextBoolean()) {
|
||||
writer.commit();
|
||||
}
|
||||
try (DirectoryReader open = new SoftDeletesDirectoryReaderWrapper(DirectoryReader.open(directory), "___soft_deletes")) {
|
||||
assertEquals(1, open.numDocs());
|
||||
if (useGetReader) {
|
||||
try (DirectoryReader reader = writer.getReader()) {
|
||||
assertEquals(1, new IndexSearcher(reader).search(new TermQuery(new Term("id", "1")), 10).totalHits.value);
|
||||
}
|
||||
} else {
|
||||
if (random().nextBoolean()) {
|
||||
writer.commit();
|
||||
}
|
||||
try (DirectoryReader open = new SoftDeletesDirectoryReaderWrapper(DirectoryReader.open(directory), "___soft_deletes")) {
|
||||
assertEquals(1, new IndexSearcher(open).search(new TermQuery(new Term("id", "1")), 10).totalHits.value);
|
||||
}
|
||||
}
|
||||
numFullFlushes.decrementAndGet();
|
||||
}
|
||||
} finally {
|
||||
numFullFlushes.set(0);
|
||||
for (Thread t : threads) {
|
||||
t.join();
|
||||
}
|
||||
|
@ -534,4 +638,66 @@ public class TestIndexWriterMergePolicy extends LuceneTestCase {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Test basic semantics of merge on getReader
|
||||
public void testMergeOnGetReader() throws IOException {
|
||||
Directory dir = newDirectory();
|
||||
|
||||
IndexWriter firstWriter = new IndexWriter(dir, newIndexWriterConfig(new MockAnalyzer(random()))
|
||||
.setMergePolicy(NoMergePolicy.INSTANCE));
|
||||
for (int i = 0; i < 5; i++) {
|
||||
TestIndexWriter.addDoc(firstWriter);
|
||||
firstWriter.flush();
|
||||
}
|
||||
DirectoryReader firstReader = DirectoryReader.open(firstWriter);
|
||||
assertEquals(5, firstReader.leaves().size());
|
||||
firstReader.close();
|
||||
firstWriter.close(); // When this writer closes, it does not merge on commit.
|
||||
|
||||
IndexWriterConfig iwc = newIndexWriterConfig(new MockAnalyzer(random()))
|
||||
.setMergePolicy(new MergeOnXMergePolicy(newMergePolicy(), MergeTrigger.GET_READER)).setMaxFullFlushMergeWaitMillis(Integer.MAX_VALUE);
|
||||
|
||||
IndexWriter writerWithMergePolicy = new IndexWriter(dir, iwc);
|
||||
|
||||
try (DirectoryReader unmergedReader = DirectoryReader.open(dir)) { // No changes. GetReader doesn't trigger a merge.
|
||||
assertEquals(5, unmergedReader.leaves().size());
|
||||
}
|
||||
|
||||
TestIndexWriter.addDoc(writerWithMergePolicy);
|
||||
try (DirectoryReader mergedReader = writerWithMergePolicy.getReader()) {
|
||||
// Doc added, do merge on getReader.
|
||||
assertEquals(1, mergedReader.leaves().size());
|
||||
}
|
||||
|
||||
writerWithMergePolicy.close();
|
||||
dir.close();
|
||||
}
|
||||
|
||||
private static class MergeOnXMergePolicy extends FilterMergePolicy {
|
||||
private final MergeTrigger trigger;
|
||||
|
||||
private MergeOnXMergePolicy(MergePolicy in, MergeTrigger trigger) {
|
||||
super(in);
|
||||
this.trigger = trigger;
|
||||
}
|
||||
|
||||
@Override
|
||||
public MergeSpecification findFullFlushMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext) {
|
||||
// Optimize down to a single segment on commit
|
||||
if (mergeTrigger == trigger && segmentInfos.size() > 1) {
|
||||
List<SegmentCommitInfo> nonMergingSegments = new ArrayList<>();
|
||||
for (SegmentCommitInfo sci : segmentInfos) {
|
||||
if (mergeContext.getMergingSegments().contains(sci) == false) {
|
||||
nonMergingSegments.add(sci);
|
||||
}
|
||||
}
|
||||
if (nonMergingSegments.size() > 1) {
|
||||
MergeSpecification mergeSpecification = new MergeSpecification();
|
||||
mergeSpecification.add(new OneMerge(nonMergingSegments));
|
||||
return mergeSpecification;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -241,7 +241,7 @@ public class TestIndexWriterReader extends LuceneTestCase {
|
|||
boolean doFullMerge = false;
|
||||
|
||||
Directory dir1 = getAssertNoDeletesDirectory(newDirectory());
|
||||
IndexWriterConfig iwc = newIndexWriterConfig(new MockAnalyzer(random()));
|
||||
IndexWriterConfig iwc = newIndexWriterConfig(new MockAnalyzer(random())).setMaxFullFlushMergeWaitMillis(0);
|
||||
if (iwc.getMaxBufferedDocs() < 20) {
|
||||
iwc.setMaxBufferedDocs(20);
|
||||
}
|
||||
|
@ -294,11 +294,11 @@ public class TestIndexWriterReader extends LuceneTestCase {
|
|||
boolean doFullMerge = false;
|
||||
|
||||
Directory dir1 = getAssertNoDeletesDirectory(newDirectory());
|
||||
IndexWriter writer = new IndexWriter(dir1, newIndexWriterConfig(new MockAnalyzer(random())));
|
||||
IndexWriter writer = new IndexWriter(dir1, newIndexWriterConfig(new MockAnalyzer(random())).setMaxFullFlushMergeWaitMillis(0));
|
||||
|
||||
// create a 2nd index
|
||||
Directory dir2 = newDirectory();
|
||||
IndexWriter writer2 = new IndexWriter(dir2, newIndexWriterConfig(new MockAnalyzer(random())));
|
||||
IndexWriter writer2 = new IndexWriter(dir2, newIndexWriterConfig(new MockAnalyzer(random())).setMaxFullFlushMergeWaitMillis(0));
|
||||
createIndexNoClose(!doFullMerge, "index2", writer2);
|
||||
writer2.close();
|
||||
|
||||
|
@ -324,7 +324,7 @@ public class TestIndexWriterReader extends LuceneTestCase {
|
|||
boolean doFullMerge = true;
|
||||
|
||||
Directory dir1 = getAssertNoDeletesDirectory(newDirectory());
|
||||
IndexWriter writer = new IndexWriter(dir1, newIndexWriterConfig(new MockAnalyzer(random())));
|
||||
IndexWriter writer = new IndexWriter(dir1, newIndexWriterConfig(new MockAnalyzer(random())).setMaxFullFlushMergeWaitMillis(0));
|
||||
// create the index
|
||||
createIndexNoClose(!doFullMerge, "index1", writer);
|
||||
writer.flush(false, true);
|
||||
|
@ -361,7 +361,7 @@ public class TestIndexWriterReader extends LuceneTestCase {
|
|||
writer.close();
|
||||
|
||||
// reopen the writer to verify the delete made it to the directory
|
||||
writer = new IndexWriter(dir1, newIndexWriterConfig(new MockAnalyzer(random())));
|
||||
writer = new IndexWriter(dir1, newIndexWriterConfig(new MockAnalyzer(random())).setMaxFullFlushMergeWaitMillis(0));
|
||||
IndexReader w2r1 = writer.getReader();
|
||||
assertEquals(0, count(new Term("id", id10), w2r1));
|
||||
w2r1.close();
|
||||
|
@ -377,7 +377,8 @@ public class TestIndexWriterReader extends LuceneTestCase {
|
|||
Directory mainDir = getAssertNoDeletesDirectory(newDirectory());
|
||||
|
||||
IndexWriter mainWriter = new IndexWriter(mainDir, newIndexWriterConfig(new MockAnalyzer(random()))
|
||||
.setMergePolicy(newLogMergePolicy()));
|
||||
.setMergePolicy(newLogMergePolicy())
|
||||
.setMaxFullFlushMergeWaitMillis(0));
|
||||
TestUtil.reduceOpenFiles(mainWriter);
|
||||
|
||||
AddDirectoriesThreads addDirThreads = new AddDirectoriesThreads(numIter, mainWriter);
|
||||
|
@ -421,6 +422,7 @@ public class TestIndexWriterReader extends LuceneTestCase {
|
|||
this.mainWriter = mainWriter;
|
||||
addDir = newDirectory();
|
||||
IndexWriter writer = new IndexWriter(addDir, newIndexWriterConfig(new MockAnalyzer(random()))
|
||||
.setMaxFullFlushMergeWaitMillis(0)
|
||||
.setMaxBufferedDocs(2));
|
||||
TestUtil.reduceOpenFiles(writer);
|
||||
for (int i = 0; i < NUM_INIT_DOCS; i++) {
|
||||
|
@ -533,7 +535,8 @@ public class TestIndexWriterReader extends LuceneTestCase {
|
|||
*/
|
||||
public void doTestIndexWriterReopenSegment(boolean doFullMerge) throws Exception {
|
||||
Directory dir1 = getAssertNoDeletesDirectory(newDirectory());
|
||||
IndexWriter writer = new IndexWriter(dir1, newIndexWriterConfig(new MockAnalyzer(random())));
|
||||
IndexWriter writer = new IndexWriter(dir1, newIndexWriterConfig(new MockAnalyzer(random()))
|
||||
.setMaxFullFlushMergeWaitMillis(0));
|
||||
IndexReader r1 = writer.getReader();
|
||||
assertEquals(0, r1.maxDoc());
|
||||
createIndexNoClose(false, "index1", writer);
|
||||
|
@ -569,7 +572,7 @@ public class TestIndexWriterReader extends LuceneTestCase {
|
|||
writer.close();
|
||||
|
||||
// test whether the changes made it to the directory
|
||||
writer = new IndexWriter(dir1, newIndexWriterConfig(new MockAnalyzer(random())));
|
||||
writer = new IndexWriter(dir1, newIndexWriterConfig(new MockAnalyzer(random())).setMaxFullFlushMergeWaitMillis(0));
|
||||
IndexReader w2r1 = writer.getReader();
|
||||
// insure the deletes were actually flushed to the directory
|
||||
assertEquals(200, w2r1.maxDoc());
|
||||
|
@ -619,6 +622,7 @@ public class TestIndexWriterReader extends LuceneTestCase {
|
|||
dir1,
|
||||
newIndexWriterConfig(new MockAnalyzer(random()))
|
||||
.setMaxBufferedDocs(2)
|
||||
.setMaxFullFlushMergeWaitMillis(0)
|
||||
.setMergedSegmentWarmer((leafReader) -> warmCount.incrementAndGet())
|
||||
.setMergeScheduler(new ConcurrentMergeScheduler())
|
||||
.setMergePolicy(newLogMergePolicy())
|
||||
|
@ -653,7 +657,8 @@ public class TestIndexWriterReader extends LuceneTestCase {
|
|||
public void testAfterCommit() throws Exception {
|
||||
Directory dir1 = getAssertNoDeletesDirectory(newDirectory());
|
||||
IndexWriter writer = new IndexWriter(dir1, newIndexWriterConfig(new MockAnalyzer(random()))
|
||||
.setMergeScheduler(new ConcurrentMergeScheduler()));
|
||||
.setMergeScheduler(new ConcurrentMergeScheduler())
|
||||
.setMaxFullFlushMergeWaitMillis(0));
|
||||
writer.commit();
|
||||
|
||||
// create the index
|
||||
|
@ -685,7 +690,7 @@ public class TestIndexWriterReader extends LuceneTestCase {
|
|||
// Make sure reader remains usable even if IndexWriter closes
|
||||
public void testAfterClose() throws Exception {
|
||||
Directory dir1 = getAssertNoDeletesDirectory(newDirectory());
|
||||
IndexWriter writer = new IndexWriter(dir1, newIndexWriterConfig(new MockAnalyzer(random())));
|
||||
IndexWriter writer = new IndexWriter(dir1, newIndexWriterConfig(new MockAnalyzer(random())).setMaxFullFlushMergeWaitMillis(0));
|
||||
|
||||
// create the index
|
||||
createIndexNoClose(false, "test", writer);
|
||||
|
@ -715,7 +720,7 @@ public class TestIndexWriterReader extends LuceneTestCase {
|
|||
Directory dir1 = getAssertNoDeletesDirectory(newDirectory());
|
||||
final IndexWriter writer = new IndexWriter(
|
||||
dir1,
|
||||
newIndexWriterConfig(new MockAnalyzer(random()))
|
||||
newIndexWriterConfig(new MockAnalyzer(random())).setMaxFullFlushMergeWaitMillis(0)
|
||||
.setMergePolicy(newLogMergePolicy(2))
|
||||
);
|
||||
|
||||
|
@ -1031,7 +1036,7 @@ public class TestIndexWriterReader extends LuceneTestCase {
|
|||
Directory d = getAssertNoDeletesDirectory(newDirectory());
|
||||
IndexWriter w = new IndexWriter(
|
||||
d,
|
||||
newIndexWriterConfig(new MockAnalyzer(random())));
|
||||
newIndexWriterConfig(new MockAnalyzer(random())).setMaxFullFlushMergeWaitMillis(0));
|
||||
|
||||
DirectoryReader r = w.getReader(); // start pooling readers
|
||||
|
||||
|
@ -1085,7 +1090,7 @@ public class TestIndexWriterReader extends LuceneTestCase {
|
|||
}
|
||||
});
|
||||
|
||||
IndexWriterConfig conf = newIndexWriterConfig(new MockAnalyzer(random()));
|
||||
IndexWriterConfig conf = newIndexWriterConfig(new MockAnalyzer(random())).setMaxFullFlushMergeWaitMillis(0);
|
||||
conf.setMergePolicy(NoMergePolicy.INSTANCE); // prevent merges from getting in the way
|
||||
IndexWriter writer = new IndexWriter(dir, conf);
|
||||
|
||||
|
@ -1116,7 +1121,7 @@ public class TestIndexWriterReader extends LuceneTestCase {
|
|||
Directory dir = getAssertNoDeletesDirectory(new ByteBuffersDirectory());
|
||||
// Don't use newIndexWriterConfig, because we need a
|
||||
// "sane" mergePolicy:
|
||||
IndexWriterConfig iwc = new IndexWriterConfig(new MockAnalyzer(random()));
|
||||
IndexWriterConfig iwc = new IndexWriterConfig(new MockAnalyzer(random())).setMaxFullFlushMergeWaitMillis(0);
|
||||
IndexWriter w = new IndexWriter(dir, iwc);
|
||||
// Create 500 segments:
|
||||
for(int i=0;i<500;i++) {
|
||||
|
|
|
@ -109,7 +109,7 @@ public class TestSoftDeletesRetentionMergePolicy extends LuceneTestCase {
|
|||
|
||||
public void testKeepFullyDeletedSegments() throws IOException {
|
||||
Directory dir = newDirectory();
|
||||
IndexWriterConfig indexWriterConfig = newIndexWriterConfig();
|
||||
IndexWriterConfig indexWriterConfig = newIndexWriterConfig().setMergePolicy(NoMergePolicy.INSTANCE);
|
||||
IndexWriter writer = new IndexWriter(dir, indexWriterConfig);
|
||||
|
||||
Document doc = new Document();
|
||||
|
|
|
@ -1003,7 +1003,7 @@ public abstract class LuceneTestCase extends Assert {
|
|||
if (rarely(r)) {
|
||||
c.setCheckPendingFlushUpdate(false);
|
||||
}
|
||||
c.setMaxCommitMergeWaitMillis(rarely() ? atLeast(r, 1000) : atLeast(r, 200));
|
||||
c.setMaxFullFlushMergeWaitMillis(rarely() ? atLeast(r, 1000) : atLeast(r, 200));
|
||||
return c;
|
||||
}
|
||||
|
||||
|
|
|
@ -73,7 +73,7 @@ public class SolrIndexConfig implements MapSerializable {
|
|||
* When using a custom merge policy that allows triggering synchronous merges on commit
|
||||
* (see {@link MergePolicy#findFullFlushMerges(org.apache.lucene.index.MergeTrigger, org.apache.lucene.index.SegmentInfos, org.apache.lucene.index.MergePolicy.MergeContext)}),
|
||||
* a timeout (in milliseconds) can be set for those merges to finish. Use {@code <maxCommitMergeWaitTime>1000</maxCommitMergeWaitTime>} in the {@code <indexConfig>} section.
|
||||
* See {@link IndexWriterConfig#setMaxCommitMergeWaitMillis(long)}.
|
||||
* See {@link IndexWriterConfig#setMaxFullFlushMergeWaitMillis(long)}.
|
||||
* </p>
|
||||
* <p>
|
||||
* Note that as of Solr 8.6, no {@code MergePolicy} shipped with Lucene/Solr make use of
|
||||
|
@ -248,7 +248,7 @@ public class SolrIndexConfig implements MapSerializable {
|
|||
}
|
||||
|
||||
if (maxCommitMergeWaitMillis > 0) {
|
||||
iwc.setMaxCommitMergeWaitMillis(maxCommitMergeWaitMillis);
|
||||
iwc.setMaxFullFlushMergeWaitMillis(maxCommitMergeWaitMillis);
|
||||
}
|
||||
|
||||
iwc.setSimilarity(schema.getSimilarity());
|
||||
|
|
|
@ -221,10 +221,10 @@ public class SolrIndexConfigTest extends SolrTestCaseJ4 {
|
|||
public void testMaxCommitMergeWaitTime() throws Exception {
|
||||
SolrConfig sc = new SolrConfig(TEST_PATH().resolve("collection1"), "solrconfig-test-misc.xml");
|
||||
assertEquals(-1, sc.indexConfig.maxCommitMergeWaitMillis);
|
||||
assertEquals(IndexWriterConfig.DEFAULT_MAX_COMMIT_MERGE_WAIT_MILLIS, sc.indexConfig.toIndexWriterConfig(h.getCore()).getMaxCommitMergeWaitMillis());
|
||||
assertEquals(IndexWriterConfig.DEFAULT_MAX_FULL_FLUSH_MERGE_WAIT_MILLIS, sc.indexConfig.toIndexWriterConfig(h.getCore()).getMaxFullFlushMergeWaitMillis());
|
||||
System.setProperty("solr.tests.maxCommitMergeWaitTime", "10");
|
||||
sc = new SolrConfig(TEST_PATH().resolve("collection1"), "solrconfig-test-misc.xml");
|
||||
assertEquals(10, sc.indexConfig.maxCommitMergeWaitMillis);
|
||||
assertEquals(10, sc.indexConfig.toIndexWriterConfig(h.getCore()).getMaxCommitMergeWaitMillis());
|
||||
assertEquals(10, sc.indexConfig.toIndexWriterConfig(h.getCore()).getMaxFullFlushMergeWaitMillis());
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue