LUCENE-8962: Merge small segments on commit (#1617)

Add IndexWriter merge-on-commit feature to selectively merge small segments on commit,
subject to a configurable timeout, to improve search performance by reducing the number of small
segments for searching.

Co-authored-by: Michael Froh <msfroh@apache.org>
Co-authored-by: Michael Sokolov <sokolov@falutin.net>
Co-authored-by: Mike McCandless <mikemccand@apache.org>
This commit is contained in:
Simon Willnauer 2020-06-27 22:25:45 +02:00 committed by GitHub
parent 698c9cce6c
commit 7f352a9665
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 749 additions and 171 deletions

View File

@ -418,6 +418,10 @@ Improvements
* LUCENE-9253: KoreanTokenizer now supports custom dictionaries(system, unknown). (Namgyu Kim)
* LUCENE-8962: Add IndexWriter merge-on-commit feature to selectively merge small segments on commit,
subject to a configurable timeout, to improve search performance by reducing the number of small
segments for searching (Michael Froh, Mike Sokolov, Mike Mccandless, Simon Willnauer)
* LUCENE-9171: QueryBuilder can now use BoostAttributes on input token streams to selectively
boost particular terms or synonyms in parsed queries. (Alessandro Benedetti, Alan Woodward)

View File

@ -57,6 +57,11 @@ public class FilterMergePolicy extends MergePolicy {
return in.findForcedDeletesMerges(segmentInfos, mergeContext);
}
@Override
public MergeSpecification findFullFlushMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext) throws IOException {
return in.findFullFlushMerges(mergeTrigger, segmentInfos, mergeContext);
}
@Override
public boolean useCompoundFile(SegmentInfos infos, SegmentCommitInfo mergedInfo, MergeContext mergeContext)
throws IOException {

View File

@ -29,11 +29,11 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@ -2167,8 +2167,14 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
}
}
} else {
switch (trigger) {
case COMMIT:
spec = mergePolicy.findFullFlushMerges(trigger, segmentInfos, this);
break;
default:
spec = mergePolicy.findMerges(trigger, segmentInfos, this);
}
}
if (spec != null) {
final int numMerges = spec.merges.size();
for(int i=0;i<numMerges;i++) {
@ -2462,15 +2468,15 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
/** Aborts running merges. Be careful when using this
* method: when you abort a long-running merge, you lose
* a lot of work that must later be redone. */
private synchronized void abortMerges() {
private synchronized void abortMerges() throws IOException {
// Abort all pending & running merges:
for (final MergePolicy.OneMerge merge : pendingMerges) {
IOUtils.applyToAll(pendingMerges, merge -> {
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "now abort pending merge " + segString(merge.segments));
}
merge.setAborted();
abortOneMerge(merge);
mergeFinish(merge);
}
});
pendingMerges.clear();
for (final MergePolicy.OneMerge merge : runningMerges) {
@ -3173,7 +3179,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
SegmentInfos toCommit = null;
boolean anyChanges = false;
long seqNo;
MergePolicy.MergeSpecification onCommitMerges = null;
AtomicBoolean includeInCommit = new AtomicBoolean(true);
final long maxCommitMergeWaitMillis = config.getMaxCommitMergeWaitMillis();
// This is copied from doFlush, except it's modified to
// clone & incRef the flushed SegmentInfos inside the
// sync block:
@ -3226,16 +3234,18 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
// corresponding add from an updateDocument) can
// sneak into the commit point:
toCommit = segmentInfos.clone();
pendingCommitChangeCount = changeCount.get();
// This protects the segmentInfos we are now going
// to commit. This is important in case, eg, while
// we are trying to sync all referenced files, a
// merge completes which would otherwise have
// removed the files we are now syncing.
filesToCommit = toCommit.files(false);
deleter.incRef(filesToCommit);
deleter.incRef(toCommit.files(false));
if (anyChanges && maxCommitMergeWaitMillis > 0) {
// we can safely call prepareOnCommitMerge since writeReaderPool(true) above wrote all
// necessary files to disk and checkpointed them.
onCommitMerges = prepareOnCommitMerge(toCommit, includeInCommit);
}
}
success = true;
} finally {
@ -3257,6 +3267,23 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
maybeCloseOnTragicEvent();
}
if (onCommitMerges != null) {
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "now run merges during commit: " + onCommitMerges.segString(directory));
}
mergeScheduler.merge(mergeSource, MergeTrigger.COMMIT);
onCommitMerges.await(maxCommitMergeWaitMillis, TimeUnit.MILLISECONDS);
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "done waiting for merges during commit");
}
synchronized (this) {
// we need to call this under lock since mergeFinished above is also called under the IW lock
includeInCommit.set(false);
}
}
// do this after handling any onCommitMerges since the files will have changed if any merges
// did complete
filesToCommit = toCommit.files(false);
try {
if (anyChanges) {
maybeMerge.set(true);
@ -3284,6 +3311,120 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
}
}
/**
* This optimization allows a commit to wait for merges on smallish segments to
* reduce the eventual number of tiny segments in the commit point. We wrap a {@code OneMerge} to
* update the {@code committingSegmentInfos} once the merge has finished. We replace the source segments
* in the SIS that we are going to commit with the freshly merged segment, but ignore all deletions and updates
* that are made to documents in the merged segment while it was merging. The updates that are made do not belong to
* the point-in-time commit point and should therefore not be included. See the clone call in {@code onMergeComplete}
* below. We also ensure that we pull the merge readers while holding {@code IndexWriter}'s lock. Otherwise
* we could see concurrent deletions/updates applied that do not belong to the segment.
*/
private MergePolicy.MergeSpecification prepareOnCommitMerge(SegmentInfos committingSegmentInfos, AtomicBoolean includeInCommit) throws IOException {
assert Thread.holdsLock(this);
MergePolicy.MergeSpecification onCommitMerges = updatePendingMerges(new OneMergeWrappingMergePolicy(config.getMergePolicy(), toWrap ->
new MergePolicy.OneMerge(toWrap.segments) {
SegmentCommitInfo origInfo;
AtomicBoolean onlyOnce = new AtomicBoolean(false);
@Override
public void mergeFinished(boolean committed, boolean segmentDropped) throws IOException {
assert Thread.holdsLock(IndexWriter.this);
// includedInCommit will be set (above, by our caller) to false if the allowed max wall clock
// time (IWC.getMaxCommitMergeWaitMillis()) has elapsed, which means we did not make the timeout
// and will not commit our merge to the to-be-commited SegmentInfos
if (segmentDropped == false
&& committed
&& includeInCommit.get()) {
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "now apply merge during commit: " + toWrap.segString());
}
// make sure onMergeComplete really was called:
assert origInfo != null;
deleter.incRef(origInfo.files());
Set<String> mergedSegmentNames = new HashSet<>();
for (SegmentCommitInfo sci : segments) {
mergedSegmentNames.add(sci.info.name);
}
List<SegmentCommitInfo> toCommitMergedAwaySegments = new ArrayList<>();
for (SegmentCommitInfo sci : committingSegmentInfos) {
if (mergedSegmentNames.contains(sci.info.name)) {
toCommitMergedAwaySegments.add(sci);
deleter.decRef(sci.files());
}
}
// Construct a OneMerge that applies to toCommit
MergePolicy.OneMerge applicableMerge = new MergePolicy.OneMerge(toCommitMergedAwaySegments);
applicableMerge.info = origInfo;
long segmentCounter = Long.parseLong(origInfo.info.name.substring(1), Character.MAX_RADIX);
committingSegmentInfos.counter = Math.max(committingSegmentInfos.counter, segmentCounter + 1);
committingSegmentInfos.applyMergeChanges(applicableMerge, false);
} else {
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "skip apply merge during commit: " + toWrap.segString());
}
}
toWrap.mergeFinished(committed, false);
super.mergeFinished(committed, segmentDropped);
}
@Override
void onMergeComplete() {
// clone the target info to make sure we have the original info without the updated del and update gens
origInfo = info.clone();
}
@Override
void initMergeReaders(IOUtils.IOFunction<SegmentCommitInfo, MergePolicy.MergeReader> readerFactory) throws IOException {
if (onlyOnce.compareAndSet(false, true)) {
// we do this only once below to pull readers as point in time readers with respect to the commit point
// we try to update
super.initMergeReaders(readerFactory);
}
}
@Override
public CodecReader wrapForMerge(CodecReader reader) throws IOException {
return toWrap.wrapForMerge(reader); // must delegate
}
}
), MergeTrigger.COMMIT, UNBOUNDED_MAX_MERGE_SEGMENTS);
if (onCommitMerges != null) {
boolean closeReaders = true;
try {
for (MergePolicy.OneMerge merge : onCommitMerges.merges) {
IOContext context = new IOContext(merge.getStoreMergeInfo());
merge.initMergeReaders(
sci -> {
final ReadersAndUpdates rld = getPooledInstance(sci, true);
// calling setIsMerging is important since it causes the RaU to record all DV updates
// in a separate map in order to be applied to the merged segment after it's done
rld.setIsMerging();
return rld.getReaderForMerge(context);
});
}
closeReaders = false;
} finally {
if (closeReaders) {
IOUtils.applyToAll(onCommitMerges.merges, merge -> {
// that merge is broken we need to clean up after it - it's fine we still have the IW lock to do this
boolean removed = pendingMerges.remove(merge);
assert removed: "merge should be pending but isn't: " + merge.segString();
abortOneMerge(merge);
mergeFinish(merge);
});
}
}
}
return onCommitMerges;
}
/**
* Ensures that all changes in the reader-pool are written to disk.
* @param writeDeletes if <code>true</code> if deletes should be written to disk too.
@ -3697,7 +3838,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
MergeState.DocMap segDocMap = mergeState.docMaps[i];
MergeState.DocMap segLeafDocMap = mergeState.leafDocMaps[i];
carryOverHardDeletes(mergedDeletesAndUpdates, maxDoc, mergeState.liveDocs[i], merge.hardLiveDocs.get(i), rld.getHardLiveDocs(),
carryOverHardDeletes(mergedDeletesAndUpdates, maxDoc, mergeState.liveDocs[i], merge.getMergeReader().get(i).hardLiveDocs, rld.getHardLiveDocs(),
segDocMap, segLeafDocMap);
// Now carry over all doc values updates that were resolved while we were merging, remapping the docIDs to the newly merged docIDs.
@ -3850,7 +3991,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
@SuppressWarnings("try")
private synchronized boolean commitMerge(MergePolicy.OneMerge merge, MergeState mergeState) throws IOException {
merge.onMergeComplete();
testPoint("startCommitMerge");
if (tragedy.get() != null) {
@ -3964,7 +4105,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
// Must close before checkpoint, otherwise IFD won't be
// able to delete the held-open files from the merge
// readers:
closeMergeReaders(merge, false);
closeMergeReaders(merge, false, dropSegment);
}
if (infoStream.isEnabled("IW")) {
@ -4026,11 +4167,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
try {
try {
mergeInit(merge);
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "now merge\n merge=" + segString(merge.segments) + "\n index=" + segString());
}
mergeMiddle(merge, mergePolicy);
mergeSuccess(merge);
success = true;
@ -4039,7 +4178,11 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
}
} finally {
synchronized(this) {
// Readers are already closed in commitMerge if we didn't hit
// an exc:
if (success == false) {
closeMergeReaders(merge, true, false);
}
mergeFinish(merge);
if (success == false) {
@ -4071,6 +4214,11 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
/** Hook that's called when the specified merge is complete. */
protected void mergeSuccess(MergePolicy.OneMerge merge) {}
private void abortOneMerge(MergePolicy.OneMerge merge) throws IOException {
merge.setAborted();
closeMergeReaders(merge, true, false);
}
/** Checks whether this merge involves any segments
* already participating in a merge. If not, this merge
* is "registered", meaning we record that its segments
@ -4085,7 +4233,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
assert merge.segments.size() > 0;
if (stopMerges) {
merge.setAborted();
abortOneMerge(merge);
throw new MergePolicy.MergeAbortedException("merge is aborted: " + segString(merge.segments));
}
@ -4286,11 +4434,12 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
}
@SuppressWarnings("try")
private synchronized void closeMergeReaders(MergePolicy.OneMerge merge, boolean suppressExceptions) throws IOException {
private synchronized void closeMergeReaders(MergePolicy.OneMerge merge, boolean suppressExceptions, boolean droppedSegment) throws IOException {
if (merge.hasFinished() == false) {
final boolean drop = suppressExceptions == false;
try (Closeable finalizer = () -> merge.mergeFinished(suppressExceptions == false)) {
IOUtils.applyToAll(merge.readers, sr -> {
// first call mergeFinished before we potentially drop the reader and the last reference.
merge.close(suppressExceptions == false, droppedSegment, mr -> {
final SegmentReader sr = mr.reader;
final ReadersAndUpdates rld = getPooledInstance(sr.getOriginalSegmentInfo(), false);
// We still hold a ref so it should not have been removed:
assert rld != null;
@ -4305,11 +4454,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
readerPool.drop(rld.info);
}
});
} finally {
Collections.fill(merge.readers, null);
}
} else {
assert merge.readers.stream().filter(Objects::nonNull).count() == 0 : "we are done but still have readers: " + merge.readers;
assert merge.getMergeReader().isEmpty() : "we are done but still have readers: " + merge.getMergeReader();
assert suppressExceptions : "can't be done and not suppressing exceptions";
}
}
@ -4352,8 +4498,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
merge.checkAborted();
Directory mergeDirectory = mergeScheduler.wrapForMerge(merge, directory);
List<SegmentCommitInfo> sourceSegments = merge.segments;
IOContext context = new IOContext(merge.getStoreMergeInfo());
final TrackingDirectoryWrapper dirWrapper = new TrackingDirectoryWrapper(mergeDirectory);
@ -4362,45 +4506,25 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
infoStream.message("IW", "merging " + segString(merge.segments));
}
merge.readers = new ArrayList<>(sourceSegments.size());
merge.hardLiveDocs = new ArrayList<>(sourceSegments.size());
// This is try/finally to make sure merger's readers are
// closed:
boolean success = false;
try {
int segUpto = 0;
while(segUpto < sourceSegments.size()) {
final SegmentCommitInfo info = sourceSegments.get(segUpto);
// Hold onto the "live" reader; we will use this to
// commit merged deletes
final ReadersAndUpdates rld = getPooledInstance(info, true);
merge.initMergeReaders(sci -> {
final ReadersAndUpdates rld = getPooledInstance(sci, true);
rld.setIsMerging();
ReadersAndUpdates.MergeReader mr = rld.getReaderForMerge(context);
SegmentReader reader = mr.reader;
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "seg=" + segString(info) + " reader=" + reader);
}
merge.hardLiveDocs.add(mr.hardLiveDocs);
merge.readers.add(reader);
segUpto++;
}
return rld.getReaderForMerge(context);
});
// Let the merge wrap readers
List<CodecReader> mergeReaders = new ArrayList<>();
Counter softDeleteCount = Counter.newCounter(false);
for (int r = 0; r < merge.readers.size(); r++) {
SegmentReader reader = merge.readers.get(r);
for (MergePolicy.MergeReader mergeReader : merge.getMergeReader()) {
SegmentReader reader = mergeReader.reader;
CodecReader wrappedReader = merge.wrapForMerge(reader);
validateMergeReader(wrappedReader);
if (softDeletesEnabled) {
if (reader != wrappedReader) { // if we don't have a wrapped reader we won't preserve any soft-deletes
Bits hardLiveDocs = merge.hardLiveDocs.get(r);
Bits hardLiveDocs = mergeReader.hardLiveDocs;
if (hardLiveDocs != null) { // we only need to do this accounting if we have mixed deletes
Bits wrappedLiveDocs = wrappedReader.getLiveDocs();
Counter hardDeleteCounter = Counter.newCounter(false);
@ -4611,7 +4735,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
// Readers are already closed in commitMerge if we didn't hit
// an exc:
if (success == false) {
closeMergeReaders(merge, true);
closeMergeReaders(merge, true, false);
}
}

View File

@ -110,6 +110,9 @@ public final class IndexWriterConfig extends LiveIndexWriterConfig {
/** Default value for whether calls to {@link IndexWriter#close()} include a commit. */
public final static boolean DEFAULT_COMMIT_ON_CLOSE = true;
/** Default value for time to wait for merges on commit (when using a {@link MergePolicy} that implements {@link MergePolicy#findFullFlushMerges}). */
public static final long DEFAULT_MAX_COMMIT_MERGE_WAIT_MILLIS = 0;
// indicates whether this config instance is already attached to a writer.
// not final so that it can be cloned properly.
private SetOnce<IndexWriter> writer = new SetOnce<>();
@ -459,6 +462,21 @@ public final class IndexWriterConfig extends LiveIndexWriterConfig {
return this;
}
/**
* Expert: sets the amount of time to wait for merges (during {@link IndexWriter#commit}) returned by
* MergePolicy.findFullFlushMerges(...).
* If this time is reached, we proceed with the commit based on segments merged up to that point.
* The merges are not cancelled, and will still run to completion independent of the commit,
* like natural segment merges. The default is <code>{@value IndexWriterConfig#DEFAULT_MAX_COMMIT_MERGE_WAIT_MILLIS}</code>.
*
* Note: This settings has no effect unless {@link MergePolicy#findFullFlushMerges(MergeTrigger, SegmentInfos, MergePolicy.MergeContext)}
* has an implementation that actually returns merges which by default doesn't return any merges.
*/
public IndexWriterConfig setMaxCommitMergeWaitMillis(long maxCommitMergeWaitMillis) {
this.maxCommitMergeWaitMillis = maxCommitMergeWaitMillis;
return this;
}
/**
* Set the {@link Sort} order to use for all (flushed and merged) segments.
*/

View File

@ -109,6 +109,8 @@ public class LiveIndexWriterConfig {
/** soft deletes field */
protected String softDeletesField = null;
/** Amount of time to wait for merges returned by MergePolicy.findFullFlushMerges(...) */
protected volatile long maxCommitMergeWaitMillis;
// used by IndexWriterConfig
LiveIndexWriterConfig(Analyzer analyzer) {
@ -132,6 +134,7 @@ public class LiveIndexWriterConfig {
flushPolicy = new FlushByRamOrCountsPolicy();
readerPooling = IndexWriterConfig.DEFAULT_READER_POOLING;
perThreadHardLimitMB = IndexWriterConfig.DEFAULT_RAM_PER_THREAD_HARD_LIMIT_MB;
maxCommitMergeWaitMillis = IndexWriterConfig.DEFAULT_MAX_COMMIT_MERGE_WAIT_MILLIS;
}
/** Returns the default analyzer to use for indexing documents. */
@ -461,6 +464,15 @@ public class LiveIndexWriterConfig {
return softDeletesField;
}
/**
* Expert: return the amount of time to wait for merges returned by by MergePolicy.findFullFlushMerges(...).
* If this time is reached, we proceed with the commit based on segments merged up to that point.
* The merges are not cancelled, and may still run to completion independent of the commit.
*/
public long getMaxCommitMergeWaitMillis() {
return maxCommitMergeWaitMillis;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
@ -484,6 +496,7 @@ public class LiveIndexWriterConfig {
sb.append("indexSort=").append(getIndexSort()).append("\n");
sb.append("checkPendingFlushOnUpdate=").append(isCheckPendingFlushOnUpdate()).append("\n");
sb.append("softDeletesField=").append(getSoftDeletesField()).append("\n");
sb.append("maxCommitMergeWaitMillis=").append(getMaxCommitMergeWaitMillis()).append("\n");
return sb.toString();
}
}

View File

@ -41,6 +41,7 @@ import org.apache.lucene.store.Directory;
import org.apache.lucene.store.MergeInfo;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.IOSupplier;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.InfoStream;
import org.apache.lucene.util.ThreadInterruptedException;
@ -215,8 +216,7 @@ public abstract class MergePolicy {
// Sum of sizeInBytes of all SegmentInfos; set by IW.mergeInit
volatile long totalMergeBytes;
List<SegmentReader> readers; // used by IndexWriter
List<Bits> hardLiveDocs; // used by IndexWriter
private List<MergeReader> mergeReaders; // used by IndexWriter
/** Segments to be merged. */
public final List<SegmentCommitInfo> segments;
@ -243,6 +243,7 @@ public abstract class MergePolicy {
this.segments = List.copyOf(segments);
totalMaxDoc = segments.stream().mapToInt(i -> i.info.maxDoc()).sum();
mergeProgress = new OneMergeProgress();
mergeReaders = List.of();
}
/**
@ -254,11 +255,27 @@ public abstract class MergePolicy {
}
/** Called by {@link IndexWriter} after the merge is done and all readers have been closed.
* @param success true iff the merge finished successfully ie. was committed */
public void mergeFinished(boolean success) throws IOException {
* @param success true iff the merge finished successfully ie. was committed
* @param segmentDropped true iff the merged segment was dropped since it was fully deleted
*/
public void mergeFinished(boolean success, boolean segmentDropped) throws IOException {
}
/**
* Closes this merge and releases all merge readers
*/
final void close(boolean success, boolean segmentDropped, IOUtils.IOConsumer<MergeReader> readerConsumer) throws IOException {
// this method is final to ensure we never miss a super call to cleanup and finish the merge
if (mergeCompleted.complete(success) == false) {
throw new IllegalStateException("merge has already finished");
}
try {
mergeFinished(success, segmentDropped);
} finally {
final List<MergeReader> readers = mergeReaders;
mergeReaders = List.of();
IOUtils.applyToAll(readers, readerConsumer);
}
}
/** Wrap the reader in order to add/remove information to the merged segment. */
@ -399,6 +416,40 @@ public abstract class MergePolicy {
Optional<Boolean> hasCompletedSuccessfully() {
return Optional.ofNullable(mergeCompleted.getNow(null));
}
/**
* Called just before the merge is applied to IndexWriter's SegmentInfos
*/
void onMergeComplete() {
}
/**
* Sets the merge readers for this merge.
*/
void initMergeReaders(IOUtils.IOFunction<SegmentCommitInfo, MergeReader> readerFactory) throws IOException {
assert mergeReaders.isEmpty() : "merge readers must be empty";
assert mergeCompleted.isDone() == false : "merge is already done";
final ArrayList<MergeReader> readers = new ArrayList<>(segments.size());
try {
for (final SegmentCommitInfo info : segments) {
// Hold onto the "live" reader; we will use this to
// commit merged deletes
readers.add(readerFactory.apply(info));
}
} finally {
// ensure we assign this to close them in the case of an exception
this.mergeReaders = List.copyOf(readers); // we do a copy here to ensure that mergeReaders are an immutable list
}
}
/**
* Returns the merge readers or an empty list if the readers were not initialized yet.
*/
List<MergeReader> getMergeReader() {
return mergeReaders;
}
}
/**
@ -553,7 +604,7 @@ public abstract class MergePolicy {
* an original segment present in the
* to-be-merged index; else, it was a segment
* produced by a cascaded merge.
* @param mergeContext the IndexWriter to find the merges on
* @param mergeContext the MergeContext to find the merges on
*/
public abstract MergeSpecification findForcedMerges(
SegmentInfos segmentInfos, int maxSegmentCount, Map<SegmentCommitInfo,Boolean> segmentsToMerge, MergeContext mergeContext)
@ -564,11 +615,35 @@ public abstract class MergePolicy {
* deletes from the index.
* @param segmentInfos
* the total set of segments in the index
* @param mergeContext the IndexWriter to find the merges on
* @param mergeContext the MergeContext to find the merges on
*/
public abstract MergeSpecification findForcedDeletesMerges(
SegmentInfos segmentInfos, MergeContext mergeContext) throws IOException;
/**
* Identifies merges that we want to execute (synchronously) on commit. By default, this will do no merging on commit.
* If you implement this method in your {@code MergePolicy} you must also set a non-zero timeout using
* {@link IndexWriterConfig#setMaxCommitMergeWaitMillis}.
*
* Any merges returned here will make {@link IndexWriter#commit()} or {@link IndexWriter#prepareCommit()} block until
* the merges complete or until {@link IndexWriterConfig#getMaxCommitMergeWaitMillis()} has elapsed. This may be
* used to merge small segments that have just been flushed as part of the commit, reducing the number of segments in
* the commit. If a merge does not complete in the allotted time, it will continue to execute, and eventually finish and
* apply to future commits, but will not be reflected in the current commit.
*
* If a {@link OneMerge} in the returned {@link MergeSpecification} includes a segment already included in a registered
* merge, then {@link IndexWriter#commit()} or {@link IndexWriter#prepareCommit()} will throw a {@link IllegalStateException}.
* Use {@link MergeContext#getMergingSegments()} to determine which segments are currently registered to merge.
*
* @param mergeTrigger the event that triggered the merge (COMMIT or FULL_FLUSH).
* @param segmentInfos the total set of segments in the index (while preparing the commit)
* @param mergeContext the MergeContext to find the merges on, which should be used to determine which segments are
* already in a registered merge (see {@link MergeContext#getMergingSegments()}).
*/
public MergeSpecification findFullFlushMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext) throws IOException {
return null;
}
/**
* Returns true if a new segment (regardless of its origin) should use the
* compound file format. The default implementation returns <code>true</code>
@ -745,4 +820,14 @@ public abstract class MergePolicy {
*/
Set<SegmentCommitInfo> getMergingSegments();
}
final static class MergeReader {
final SegmentReader reader;
final Bits hardLiveDocs;
MergeReader(SegmentReader reader, Bits hardLiveDocs) {
this.reader = reader;
this.hardLiveDocs = hardLiveDocs;
}
}
}

View File

@ -47,5 +47,10 @@ public enum MergeTrigger {
/**
* Merge was triggered by a closing IndexWriter.
*/
CLOSING
CLOSING,
/**
* Merge was triggered on commit.
*/
COMMIT,
}

View File

@ -45,6 +45,9 @@ public final class NoMergePolicy extends MergePolicy {
@Override
public MergeSpecification findForcedDeletesMerges(SegmentInfos segmentInfos, MergeContext mergeContext) { return null; }
@Override
public MergeSpecification findFullFlushMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext) { return null; }
@Override
public boolean useCompoundFile(SegmentInfos segments, SegmentCommitInfo newSegment, MergeContext mergeContext) {
return newSegment.info.getUseCompoundFile();

View File

@ -59,6 +59,11 @@ public class OneMergeWrappingMergePolicy extends FilterMergePolicy {
return wrapSpec(in.findForcedDeletesMerges(segmentInfos, mergeContext));
}
@Override
public MergeSpecification findFullFlushMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext) throws IOException {
return wrapSpec(in.findFullFlushMerges(mergeTrigger, segmentInfos, mergeContext));
}
private MergeSpecification wrapSpec(MergeSpecification spec) {
MergeSpecification wrapped = spec == null ? null : new MergeSpecification();
if (wrapped != null) {

View File

@ -695,18 +695,8 @@ final class ReadersAndUpdates {
return isMerging;
}
final static class MergeReader {
final SegmentReader reader;
final Bits hardLiveDocs;
MergeReader(SegmentReader reader, Bits hardLiveDocs) {
this.reader = reader;
this.hardLiveDocs = hardLiveDocs;
}
}
/** Returns a reader for merge, with the latest doc values updates and deletions. */
synchronized MergeReader getReaderForMerge(IOContext context) throws IOException {
synchronized MergePolicy.MergeReader getReaderForMerge(IOContext context) throws IOException {
// We must carry over any still-pending DV updates because they were not
// successfully written, e.g. because there was a hole in the delGens,
@ -728,7 +718,7 @@ final class ReadersAndUpdates {
reader = createNewReaderWithLatestLiveDocs(reader);
}
assert pendingDeletes.verifyDocCounts(reader);
return new MergeReader(reader, pendingDeletes.getHardLiveDocs());
return new MergePolicy.MergeReader(reader, pendingDeletes.getHardLiveDocs());
}
/**

View File

@ -656,4 +656,14 @@ public final class IOUtils {
*/
void accept(T input) throws IOException;
}
/**
* A Function that may throw an IOException
* @see java.util.function.Function
*/
@FunctionalInterface
public interface IOFunction<T, R> {
R apply(T t) throws IOException;
}
}

View File

@ -530,7 +530,7 @@ public class TestDemoParallelLeafReader extends LuceneTestCase {
@Override
public CodecReader wrapForMerge(CodecReader reader) throws IOException {
LeafReader wrapped = getCurrentReader((SegmentReader)reader, schemaGen);
LeafReader wrapped = getCurrentReader(reader, schemaGen);
if (wrapped instanceof ParallelLeafReader) {
parallelReaders.add((ParallelLeafReader) wrapped);
}
@ -538,7 +538,8 @@ public class TestDemoParallelLeafReader extends LuceneTestCase {
}
@Override
public void mergeFinished(boolean success) throws IOException {
public void mergeFinished(boolean success, boolean segmentDropped) throws IOException {
super.mergeFinished(success, segmentDropped);
Throwable th = null;
for (ParallelLeafReader r : parallelReaders) {
try {

View File

@ -4181,7 +4181,8 @@ public class TestIndexWriter extends LuceneTestCase {
SetOnce<Boolean> onlyFinishOnce = new SetOnce<>();
return new MergePolicy.OneMerge(merge.segments) {
@Override
public void mergeFinished(boolean success) {
public void mergeFinished(boolean success, boolean segmentDropped) throws IOException {
super.mergeFinished(success, segmentDropped);
onlyFinishOnce.set(true);
}
};
@ -4201,4 +4202,42 @@ public class TestIndexWriter extends LuceneTestCase {
}
}
}
public void testMergeOnCommitKeepFullyDeletedSegments() throws Exception {
Directory dir = newDirectory();
IndexWriterConfig iwc = newIndexWriterConfig();
iwc.setMaxCommitMergeWaitMillis(30 * 1000);
iwc.mergePolicy = new FilterMergePolicy(newMergePolicy()) {
@Override
public boolean keepFullyDeletedSegment(IOSupplier<CodecReader> readerIOSupplier) {
return true;
}
@Override
public MergeSpecification findFullFlushMerges(MergeTrigger mergeTrigger,
SegmentInfos segmentInfos,
MergeContext mergeContext) {
List<SegmentCommitInfo> fullyDeletedSegments = segmentInfos.asList().stream()
.filter(s -> s.info.maxDoc() - s.getDelCount() == 0)
.collect(Collectors.toList());
if (fullyDeletedSegments.isEmpty()) {
return null;
}
MergeSpecification spec = new MergeSpecification();
spec.add(new OneMerge(fullyDeletedSegments));
return spec;
}
};
IndexWriter w = new IndexWriter(dir, iwc);
Document d = new Document();
d.add(new StringField("id", "1", Field.Store.YES));
w.addDocument(d);
w.commit();
w.updateDocument(new Term("id", "1"), d);
w.commit();
try (DirectoryReader reader = w.getReader()) {
assertEquals(1, reader.numDocs());
}
IOUtils.close(w, dir);
}
}

View File

@ -18,17 +18,47 @@ package org.apache.lucene.index;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lucene.analysis.MockAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.document.StringField;
import org.apache.lucene.index.IndexWriterConfig.OpenMode;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.LuceneTestCase;
public class TestIndexWriterMergePolicy extends LuceneTestCase {
private static final MergePolicy MERGE_ON_COMMIT_POLICY = new LogDocMergePolicy() {
@Override
public MergeSpecification findFullFlushMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext) {
// Optimize down to a single segment on commit
if (mergeTrigger == MergeTrigger.COMMIT && segmentInfos.size() > 1) {
List<SegmentCommitInfo> nonMergingSegments = new ArrayList<>();
for (SegmentCommitInfo sci : segmentInfos) {
if (mergeContext.getMergingSegments().contains(sci) == false) {
nonMergingSegments.add(sci);
}
}
if (nonMergingSegments.size() > 1) {
MergeSpecification mergeSpecification = new MergeSpecification();
mergeSpecification.add(new OneMerge(nonMergingSegments));
return mergeSpecification;
}
}
return null;
}
};
// Test the normal case
public void testNormalCase() throws IOException {
Directory dir = newDirectory();
@ -278,6 +308,50 @@ public class TestIndexWriterMergePolicy extends LuceneTestCase {
assertSetters(new LogDocMergePolicy());
}
// Test basic semantics of merge on commit
public void testMergeOnCommit() throws IOException {
Directory dir = newDirectory();
IndexWriter firstWriter = new IndexWriter(dir, newIndexWriterConfig(new MockAnalyzer(random()))
.setMergePolicy(NoMergePolicy.INSTANCE));
for (int i = 0; i < 5; i++) {
TestIndexWriter.addDoc(firstWriter);
firstWriter.flush();
}
DirectoryReader firstReader = DirectoryReader.open(firstWriter);
assertEquals(5, firstReader.leaves().size());
firstReader.close();
firstWriter.close(); // When this writer closes, it does not merge on commit.
IndexWriterConfig iwc = newIndexWriterConfig(new MockAnalyzer(random()))
.setMergePolicy(MERGE_ON_COMMIT_POLICY).setMaxCommitMergeWaitMillis(Integer.MAX_VALUE);
IndexWriter writerWithMergePolicy = new IndexWriter(dir, iwc);
writerWithMergePolicy.commit(); // No changes. Commit doesn't trigger a merge.
DirectoryReader unmergedReader = DirectoryReader.open(writerWithMergePolicy);
assertEquals(5, unmergedReader.leaves().size());
unmergedReader.close();
TestIndexWriter.addDoc(writerWithMergePolicy);
writerWithMergePolicy.commit(); // Doc added, do merge on commit.
assertEquals(1, writerWithMergePolicy.getSegmentCount()); //
DirectoryReader mergedReader = DirectoryReader.open(writerWithMergePolicy);
assertEquals(1, mergedReader.leaves().size());
mergedReader.close();
try (IndexReader reader = writerWithMergePolicy.getReader()) {
IndexSearcher searcher = new IndexSearcher(reader);
assertEquals(6, reader.numDocs());
assertEquals(6, searcher.count(new MatchAllDocsQuery()));
}
writerWithMergePolicy.close();
dir.close();
}
private void assertSetters(MergePolicy lmp) {
lmp.setMaxCFSSegmentSizeMB(2.0);
assertEquals(2.0, lmp.getMaxCFSSegmentSizeMB(), EPSILON);
@ -294,4 +368,168 @@ public class TestIndexWriterMergePolicy extends LuceneTestCase {
// TODO: Add more checks for other non-double setters!
}
public void testCarryOverNewDeletes() throws IOException, InterruptedException {
try (Directory directory = newDirectory()) {
boolean useSoftDeletes = random().nextBoolean();
CountDownLatch waitForMerge = new CountDownLatch(1);
CountDownLatch waitForUpdate = new CountDownLatch(1);
try (IndexWriter writer = new IndexWriter(directory, newIndexWriterConfig()
.setMergePolicy(MERGE_ON_COMMIT_POLICY).setMaxCommitMergeWaitMillis(30 * 1000)
.setSoftDeletesField("soft_delete")
.setMergeScheduler(new ConcurrentMergeScheduler())) {
@Override
protected void merge(MergePolicy.OneMerge merge) throws IOException {
waitForMerge.countDown();
try {
waitForUpdate.await();
} catch (InterruptedException e) {
throw new AssertionError(e);
}
super.merge(merge);
}
}) {
Document d1 = new Document();
d1.add(new StringField("id", "1", Field.Store.NO));
Document d2 = new Document();
d2.add(new StringField("id", "2", Field.Store.NO));
Document d3 = new Document();
d3.add(new StringField("id", "3", Field.Store.NO));
writer.addDocument(d1);
writer.flush();
writer.addDocument(d2);
boolean addThreeDocs = random().nextBoolean();
int expectedNumDocs = 2;
if (addThreeDocs) { // sometimes add another doc to ensure we don't have a fully deleted segment
expectedNumDocs = 3;
writer.addDocument(d3);
}
Thread t = new Thread(() -> {
try {
waitForMerge.await();
if (useSoftDeletes) {
writer.softUpdateDocument(new Term("id", "2"), d2, new NumericDocValuesField("soft_delete", 1));
} else {
writer.updateDocument(new Term("id", "2"), d2);
}
writer.flush();
} catch (Exception e) {
throw new AssertionError(e);
} finally {
waitForUpdate.countDown();
}
});
t.start();
writer.commit();
t.join();
try (DirectoryReader open = new SoftDeletesDirectoryReaderWrapper(DirectoryReader.open(directory), "soft_delete")) {
assertEquals(expectedNumDocs, open.numDocs());
assertEquals("we should not have any deletes", expectedNumDocs, open.maxDoc());
}
try (DirectoryReader open = DirectoryReader.open(writer)) {
assertEquals(expectedNumDocs, open.numDocs());
assertEquals("we should not have one delete", expectedNumDocs+1, open.maxDoc());
}
}
}
}
/**
* This test makes sure we release the merge readers on abort. MDW will fail if it
* can't close all files
*/
public void testAbortCommitMerge() throws IOException, InterruptedException {
try (Directory directory = newDirectory()) {
CountDownLatch waitForMerge = new CountDownLatch(1);
CountDownLatch waitForDeleteAll = new CountDownLatch(1);
try (IndexWriter writer = new IndexWriter(directory, newIndexWriterConfig()
.setMergePolicy(MERGE_ON_COMMIT_POLICY).setMaxCommitMergeWaitMillis(30 * 1000)
.setMergeScheduler(new SerialMergeScheduler() {
@Override
public synchronized void merge(MergeSource mergeSource, MergeTrigger trigger) throws IOException {
waitForMerge.countDown();
try {
waitForDeleteAll.await();
} catch (InterruptedException e) {
throw new AssertionError(e);
}
super.merge(mergeSource, trigger);
}
}))) {
Document d1 = new Document();
d1.add(new StringField("id", "1", Field.Store.NO));
Document d2 = new Document();
d2.add(new StringField("id", "2", Field.Store.NO));
Document d3 = new Document();
d3.add(new StringField("id", "3", Field.Store.NO));
writer.addDocument(d1);
writer.flush();
writer.addDocument(d2);
Thread t = new Thread(() -> {
try {
writer.commit();
} catch (IOException e) {
throw new AssertionError(e);
}
});
t.start();
waitForMerge.await();
writer.deleteAll();
waitForDeleteAll.countDown();
t.join();
}
}
}
public void testStressUpdateSameDocumentWithMergeOnCommit() throws IOException, InterruptedException {
try (Directory directory = newDirectory()) {
try (RandomIndexWriter writer = new RandomIndexWriter(random(), directory, newIndexWriterConfig()
.setMergePolicy(MERGE_ON_COMMIT_POLICY).setMaxCommitMergeWaitMillis(10 + random().nextInt(2000))
.setSoftDeletesField("soft_delete")
.setMergeScheduler(new ConcurrentMergeScheduler()))) {
Document d1 = new Document();
d1.add(new StringField("id", "1", Field.Store.NO));
writer.updateDocument(new Term("id", "1"), d1);
writer.commit();
AtomicInteger iters = new AtomicInteger(100 + random().nextInt(TEST_NIGHTLY ? 5000 : 1000));
AtomicBoolean done = new AtomicBoolean(false);
Thread[] threads = new Thread[1 + random().nextInt(4)];
for (int i = 0; i < threads.length; i++) {
Thread t = new Thread(() -> {
try {
while (iters.decrementAndGet() > 0) {
writer.updateDocument(new Term("id", "1"), d1);
}
} catch (Exception e) {
throw new AssertionError(e);
} finally {
done.set(true);
}
});
t.start();
threads[i] = t;
}
try {
while (done.get() == false) {
if (random().nextBoolean()) {
writer.commit();
}
try (DirectoryReader open = new SoftDeletesDirectoryReaderWrapper(DirectoryReader.open(directory), "___soft_deletes")) {
assertEquals(1, open.numDocs());
}
}
} finally {
for (Thread t : threads) {
t.join();
}
}
}
}
}
}

View File

@ -43,7 +43,7 @@ public class TestMergePolicy extends LuceneTestCase {
Thread t = new Thread(() -> {
try {
for (MergePolicy.OneMerge m : ms.merges) {
m.mergeFinished(true);
m.close(true, false, mr -> {});
}
} catch (IOException e) {
throw new AssertionError(e);
@ -66,7 +66,7 @@ public class TestMergePolicy extends LuceneTestCase {
}
Thread t = new Thread(() -> {
try {
ms.merges.get(0).mergeFinished(true);
ms.merges.get(0).close(true, false, mr -> {});
} catch (IOException e) {
throw new AssertionError(e);
}
@ -89,7 +89,7 @@ public class TestMergePolicy extends LuceneTestCase {
Thread t = new Thread(() -> {
while (stop.get() == false) {
try {
ms.merges.get(i.getAndIncrement()).mergeFinished(true);
ms.merges.get(i.getAndIncrement()).close(true, false, mr -> {});
Thread.sleep(1);
} catch (IOException | InterruptedException e) {
throw new AssertionError(e);
@ -114,8 +114,8 @@ public class TestMergePolicy extends LuceneTestCase {
try (Directory dir = newDirectory()) {
MergePolicy.MergeSpecification spec = createRandomMergeSpecification(dir, 1);
MergePolicy.OneMerge oneMerge = spec.merges.get(0);
oneMerge.mergeFinished(true);
expectThrows(IllegalStateException.class, () -> oneMerge.mergeFinished(false));
oneMerge.close(true, false, mr -> {});
expectThrows(IllegalStateException.class, () -> oneMerge.close(false, false, mr -> {}));
}
}

View File

@ -27,6 +27,7 @@ import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.index.RandomIndexWriter;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.Terms;
@ -66,12 +67,16 @@ public class TestPhraseWildcardQuery extends LuceneTestCase {
public void setUp() throws Exception {
super.setUp();
directory = newDirectory();
RandomIndexWriter iw = new RandomIndexWriter(random(), directory);
RandomIndexWriter iw = new RandomIndexWriter(random(), directory,
newIndexWriterConfig().setMergePolicy(NoMergePolicy.INSTANCE)); // do not accidentally merge
// the two segments we create
// here
iw.setDoRandomForceMerge(false); // Keep the segments separated.
addSegments(iw);
reader = iw.getReader();
iw.close();
searcher = newSearcher(reader);
assertEquals("test test relies on 2 segments", 2, searcher.getIndexReader().leaves().size());
}
@Override

View File

@ -128,6 +128,38 @@ public class MockRandomMergePolicy extends MergePolicy {
return findMerges(null, segmentInfos, mergeContext);
}
@Override
public MergeSpecification findFullFlushMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext) throws IOException {
MergeSpecification mergeSpecification = findMerges(null, segmentInfos, mergeContext);
if (mergeSpecification == null) {
return null;
}
// Do not return any merges involving already-merging segments.
MergeSpecification filteredMergeSpecification = new MergeSpecification();
for (OneMerge oneMerge : mergeSpecification.merges) {
boolean filtered = false;
List<SegmentCommitInfo> nonMergingSegments = new ArrayList<>();
for (SegmentCommitInfo sci : oneMerge.segments) {
if (mergeContext.getMergingSegments().contains(sci) == false) {
nonMergingSegments.add(sci);
} else {
filtered = true;
}
}
if (filtered == true) {
if (nonMergingSegments.size() > 0) {
filteredMergeSpecification.add(new OneMerge(nonMergingSegments));
}
} else {
filteredMergeSpecification.add(oneMerge);
}
}
if (filteredMergeSpecification.merges.size() > 0) {
return filteredMergeSpecification;
}
return null;
}
@Override
public boolean useCompoundFile(SegmentInfos infos, SegmentCommitInfo mergedInfo, MergeContext mergeContext) throws IOException {
// 80% of the time we create CFS:

View File

@ -1003,6 +1003,7 @@ public abstract class LuceneTestCase extends Assert {
if (rarely(r)) {
c.setCheckPendingFlushUpdate(false);
}
c.setMaxCommitMergeWaitMillis(rarely() ? atLeast(r, 1000) : atLeast(r, 200));
return c;
}