LUCENE-8330: Detach IndexWriter from MergePolicy

This change introduces a new MergePolicy.MergeContext interface
that is easy to mock and cuts over all instances of IW to MergeContext.
Since IW now implements MergeContext the cut over is straight forward.
This reduces the exposed API available in MP dramatically and allows
efficient testing without relying on IW to improve the coverage and
testability of our MP implementations.
This commit is contained in:
Simon Willnauer 2018-05-24 14:01:22 +02:00
parent 54a63d0d0c
commit c93f628317
20 changed files with 362 additions and 240 deletions

View File

@ -119,6 +119,10 @@ Changes in Runtime Behavior
* LUCENE-8309: Live docs are no longer backed by a FixedBitSet. (Adrien Grand)
* LUCENE-8330: Detach IndexWriter from MergePolicy. MergePolicy now instead of
requiring IndexWriter as a hard dependency expects a MergeContext which
IndexWriter implements. (Simon Willnauer, Robert Muir, Dawid Weiss, Mike McCandless)
New Features
* LUCENE-8200: Allow doc-values to be updated atomically together

View File

@ -41,31 +41,31 @@ public class FilterMergePolicy extends MergePolicy {
}
@Override
public MergeSpecification findMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, IndexWriter writer)
public MergeSpecification findMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext)
throws IOException {
return in.findMerges(mergeTrigger, segmentInfos, writer);
return in.findMerges(mergeTrigger, segmentInfos, mergeContext);
}
@Override
public MergeSpecification findForcedMerges(SegmentInfos segmentInfos, int maxSegmentCount,
Map<SegmentCommitInfo,Boolean> segmentsToMerge, IndexWriter writer) throws IOException {
return in.findForcedMerges(segmentInfos, maxSegmentCount, segmentsToMerge, writer);
Map<SegmentCommitInfo,Boolean> segmentsToMerge, MergeContext mergeContext) throws IOException {
return in.findForcedMerges(segmentInfos, maxSegmentCount, segmentsToMerge, mergeContext);
}
@Override
public MergeSpecification findForcedDeletesMerges(SegmentInfos segmentInfos, IndexWriter writer) throws IOException {
return in.findForcedDeletesMerges(segmentInfos, writer);
public MergeSpecification findForcedDeletesMerges(SegmentInfos segmentInfos, MergeContext mergeContext) throws IOException {
return in.findForcedDeletesMerges(segmentInfos, mergeContext);
}
@Override
public boolean useCompoundFile(SegmentInfos infos, SegmentCommitInfo mergedInfo, IndexWriter writer)
public boolean useCompoundFile(SegmentInfos infos, SegmentCommitInfo mergedInfo, MergeContext mergeContext)
throws IOException {
return in.useCompoundFile(infos, mergedInfo, writer);
return in.useCompoundFile(infos, mergedInfo, mergeContext);
}
@Override
protected long size(SegmentCommitInfo info, IndexWriter writer) throws IOException {
return in.size(info, writer);
protected long size(SegmentCommitInfo info, MergeContext context) throws IOException {
return in.size(info, context);
}
@Override

View File

@ -39,6 +39,7 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.IntPredicate;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.codecs.Codec;
@ -207,7 +208,8 @@ import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS;
* referenced by the "front" of the index). For this, IndexFileDeleter
* keeps track of the last non commit checkpoint.
*/
public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
MergePolicy.MergeContext {
/** Hard limit on maximum number of documents that may be added to the
* index. If you try to add more than this you'll hit {@code IllegalArgumentException}. */
@ -629,8 +631,10 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* If the reader isn't being pooled, the segmentInfo's
* delCount is returned.
*/
@Override
public int numDeletedDocs(SegmentCommitInfo info) {
ensureOpen(false);
validate(info);
int delCount = info.getDelCount();
final ReadersAndUpdates rld = getPooledInstance(info, false);
@ -1089,6 +1093,11 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
return directoryOrig;
}
@Override
public InfoStream getInfoStream() {
return infoStream;
}
/** Returns the analyzer used by this index. */
public Analyzer getAnalyzer() {
ensureOpen();
@ -4587,26 +4596,16 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
return segString(segmentInfos);
}
/** Returns a string description of the specified
* segments, for debugging.
*
* @lucene.internal */
synchronized String segString(Iterable<SegmentCommitInfo> infos) {
final StringBuilder buffer = new StringBuilder();
for(final SegmentCommitInfo info : infos) {
if (buffer.length() > 0) {
buffer.append(' ');
}
buffer.append(segString(info));
}
return buffer.toString();
return StreamSupport.stream(infos.spliterator(), false)
.map(this::segString).collect(Collectors.joining(" "));
}
/** Returns a string description of the specified
* segment, for debugging.
*
* @lucene.internal */
synchronized String segString(SegmentCommitInfo info) {
private synchronized String segString(SegmentCommitInfo info) {
return info.toString(numDeletedDocs(info) - info.getDelCount());
}
@ -5130,8 +5129,10 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* @param info the segment to get the number of deletes for
* @lucene.experimental
*/
@Override
public final int numDeletesToMerge(SegmentCommitInfo info) throws IOException {
ensureOpen(false);
validate(info);
MergePolicy mergePolicy = config.getMergePolicy();
final ReadersAndUpdates rld = getPooledInstance(info, false);
int numDeletesToMerge;
@ -5178,4 +5179,10 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
protected boolean isEnableTestPoints() {
return false;
}
private void validate(SegmentCommitInfo info) {
if (info.info.dir != directoryOrig) {
throw new IllegalArgumentException("SegmentCommitInfo must be from the same directory");
}
}
}

View File

@ -44,8 +44,8 @@ public class LogByteSizeMergePolicy extends LogMergePolicy {
}
@Override
protected long size(SegmentCommitInfo info, IndexWriter writer) throws IOException {
return sizeBytes(info, writer);
protected long size(SegmentCommitInfo info, MergeContext mergeContext) throws IOException {
return sizeBytes(info, mergeContext);
}
/** <p>Determines the largest segment (measured by total

View File

@ -40,8 +40,8 @@ public class LogDocMergePolicy extends LogMergePolicy {
}
@Override
protected long size(SegmentCommitInfo info, IndexWriter writer) throws IOException {
return sizeDocs(info, writer);
protected long size(SegmentCommitInfo info, MergeContext mergeContext) throws IOException {
return sizeDocs(info, mergeContext);
}
/** Sets the minimum size for the lowest level segments.

View File

@ -19,6 +19,7 @@ package org.apache.lucene.index;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
@ -96,20 +97,6 @@ public abstract class LogMergePolicy extends MergePolicy {
super(DEFAULT_NO_CFS_RATIO, MergePolicy.DEFAULT_MAX_CFS_SEGMENT_SIZE);
}
/** Returns true if {@code LMP} is enabled in {@link
* IndexWriter}'s {@code infoStream}. */
protected boolean verbose(IndexWriter writer) {
return writer != null && writer.infoStream.isEnabled("LMP");
}
/** Print a debug message to {@link IndexWriter}'s {@code
* infoStream}. */
protected void message(String message, IndexWriter writer) {
if (verbose(writer)) {
writer.infoStream.message("LMP", message);
}
}
/** <p>Returns the number of segments that are merged at
* once and also controls the total number of segments
* allowed to accumulate in the index.</p> */
@ -148,10 +135,10 @@ public abstract class LogMergePolicy extends MergePolicy {
* SegmentCommitInfo}, pro-rated by percentage of
* non-deleted documents if {@link
* #setCalibrateSizeByDeletes} is set. */
protected long sizeDocs(SegmentCommitInfo info, IndexWriter writer) throws IOException {
protected long sizeDocs(SegmentCommitInfo info, MergeContext mergeContext) throws IOException {
if (calibrateSizeByDeletes) {
int delCount = writer.numDeletesToMerge(info);
assert delCount <= info.info.maxDoc();
int delCount = mergeContext.numDeletesToMerge(info);
assert assertDelCount(delCount, info);
return (info.info.maxDoc() - (long)delCount);
} else {
return info.info.maxDoc();
@ -162,9 +149,9 @@ public abstract class LogMergePolicy extends MergePolicy {
* SegmentCommitInfo}, pro-rated by percentage of
* non-deleted documents if {@link
* #setCalibrateSizeByDeletes} is set. */
protected long sizeBytes(SegmentCommitInfo info, IndexWriter writer) throws IOException {
protected long sizeBytes(SegmentCommitInfo info, MergeContext mergeContext) throws IOException {
if (calibrateSizeByDeletes) {
return super.size(info, writer);
return super.size(info, mergeContext);
}
return info.sizeInBytes();
}
@ -172,7 +159,7 @@ public abstract class LogMergePolicy extends MergePolicy {
/** Returns true if the number of segments eligible for
* merging is less than or equal to the specified {@code
* maxNumSegments}. */
protected boolean isMerged(SegmentInfos infos, int maxNumSegments, Map<SegmentCommitInfo,Boolean> segmentsToMerge, IndexWriter writer) throws IOException {
protected boolean isMerged(SegmentInfos infos, int maxNumSegments, Map<SegmentCommitInfo,Boolean> segmentsToMerge, MergeContext mergeContext) throws IOException {
final int numSegments = infos.size();
int numToMerge = 0;
SegmentCommitInfo mergeInfo = null;
@ -188,7 +175,7 @@ public abstract class LogMergePolicy extends MergePolicy {
}
return numToMerge <= maxNumSegments &&
(numToMerge != 1 || !segmentIsOriginal || isMerged(infos, mergeInfo, writer));
(numToMerge != 1 || !segmentIsOriginal || isMerged(infos, mergeInfo, mergeContext));
}
/**
@ -200,20 +187,20 @@ public abstract class LogMergePolicy extends MergePolicy {
* maxNumSegments} will remain, but &lt;= that number.
*/
private MergeSpecification findForcedMergesSizeLimit(
SegmentInfos infos, int maxNumSegments, int last, IndexWriter writer) throws IOException {
SegmentInfos infos, int last, MergeContext mergeContext) throws IOException {
MergeSpecification spec = new MergeSpecification();
final List<SegmentCommitInfo> segments = infos.asList();
int start = last - 1;
while (start >= 0) {
SegmentCommitInfo info = infos.info(start);
if (size(info, writer) > maxMergeSizeForForcedMerge || sizeDocs(info, writer) > maxMergeDocs) {
if (verbose(writer)) {
message("findForcedMergesSizeLimit: skip segment=" + info + ": size is > maxMergeSize (" + maxMergeSizeForForcedMerge + ") or sizeDocs is > maxMergeDocs (" + maxMergeDocs + ")", writer);
if (size(info, mergeContext) > maxMergeSizeForForcedMerge || sizeDocs(info, mergeContext) > maxMergeDocs) {
if (verbose(mergeContext)) {
message("findForcedMergesSizeLimit: skip segment=" + info + ": size is > maxMergeSize (" + maxMergeSizeForForcedMerge + ") or sizeDocs is > maxMergeDocs (" + maxMergeDocs + ")", mergeContext);
}
// need to skip that segment + add a merge for the 'right' segments,
// unless there is only 1 which is merged.
if (last - start - 1 > 1 || (start != last - 1 && !isMerged(infos, infos.info(start + 1), writer))) {
if (last - start - 1 > 1 || (start != last - 1 && !isMerged(infos, infos.info(start + 1), mergeContext))) {
// there is more than 1 segment to the right of
// this one, or a mergeable single segment.
spec.add(new OneMerge(segments.subList(start + 1, last)));
@ -229,7 +216,7 @@ public abstract class LogMergePolicy extends MergePolicy {
// Add any left-over segments, unless there is just 1
// already fully merged
if (last > 0 && (++start + 1 < last || !isMerged(infos, infos.info(start), writer))) {
if (last > 0 && (++start + 1 < last || !isMerged(infos, infos.info(start), mergeContext))) {
spec.add(new OneMerge(segments.subList(start, last)));
}
@ -241,7 +228,7 @@ public abstract class LogMergePolicy extends MergePolicy {
* the returned merges only by the {@code maxNumSegments} parameter, and
* guaranteed that exactly that number of segments will remain in the index.
*/
private MergeSpecification findForcedMergesMaxNumSegments(SegmentInfos infos, int maxNumSegments, int last, IndexWriter writer) throws IOException {
private MergeSpecification findForcedMergesMaxNumSegments(SegmentInfos infos, int maxNumSegments, int last, MergeContext mergeContext) throws IOException {
MergeSpecification spec = new MergeSpecification();
final List<SegmentCommitInfo> segments = infos.asList();
@ -259,7 +246,7 @@ public abstract class LogMergePolicy extends MergePolicy {
// Since we must merge down to 1 segment, the
// choice is simple:
if (last > 1 || !isMerged(infos, infos.info(0), writer)) {
if (last > 1 || !isMerged(infos, infos.info(0), mergeContext)) {
spec.add(new OneMerge(segments.subList(0, last)));
}
} else if (last > maxNumSegments) {
@ -282,9 +269,9 @@ public abstract class LogMergePolicy extends MergePolicy {
for(int i=0;i<last-finalMergeSize+1;i++) {
long sumSize = 0;
for(int j=0;j<finalMergeSize;j++) {
sumSize += size(infos.info(j+i), writer);
sumSize += size(infos.info(j+i), mergeContext);
}
if (i == 0 || (sumSize < 2*size(infos.info(i-1), writer) && sumSize < bestSize)) {
if (i == 0 || (sumSize < 2*size(infos.info(i-1), mergeContext) && sumSize < bestSize)) {
bestStart = i;
bestSize = sumSize;
}
@ -308,18 +295,18 @@ public abstract class LogMergePolicy extends MergePolicy {
* in use may make use of concurrency. */
@Override
public MergeSpecification findForcedMerges(SegmentInfos infos,
int maxNumSegments, Map<SegmentCommitInfo,Boolean> segmentsToMerge, IndexWriter writer) throws IOException {
int maxNumSegments, Map<SegmentCommitInfo,Boolean> segmentsToMerge, MergeContext mergeContext) throws IOException {
assert maxNumSegments > 0;
if (verbose(writer)) {
message("findForcedMerges: maxNumSegs=" + maxNumSegments + " segsToMerge="+ segmentsToMerge, writer);
if (verbose(mergeContext)) {
message("findForcedMerges: maxNumSegs=" + maxNumSegments + " segsToMerge="+ segmentsToMerge, mergeContext);
}
// If the segments are already merged (e.g. there's only 1 segment), or
// there are <maxNumSegments:.
if (isMerged(infos, maxNumSegments, segmentsToMerge, writer)) {
if (verbose(writer)) {
message("already merged; skip", writer);
if (isMerged(infos, maxNumSegments, segmentsToMerge, mergeContext)) {
if (verbose(mergeContext)) {
message("already merged; skip", mergeContext);
}
return null;
}
@ -337,16 +324,16 @@ public abstract class LogMergePolicy extends MergePolicy {
}
if (last == 0) {
if (verbose(writer)) {
message("last == 0; skip", writer);
if (verbose(mergeContext)) {
message("last == 0; skip", mergeContext);
}
return null;
}
// There is only one segment already, and it is merged
if (maxNumSegments == 1 && last == 1 && isMerged(infos, infos.info(0), writer)) {
if (verbose(writer)) {
message("already 1 seg; skip", writer);
if (maxNumSegments == 1 && last == 1 && isMerged(infos, infos.info(0), mergeContext)) {
if (verbose(mergeContext)) {
message("already 1 seg; skip", mergeContext);
}
return null;
}
@ -355,16 +342,16 @@ public abstract class LogMergePolicy extends MergePolicy {
boolean anyTooLarge = false;
for (int i = 0; i < last; i++) {
SegmentCommitInfo info = infos.info(i);
if (size(info, writer) > maxMergeSizeForForcedMerge || sizeDocs(info, writer) > maxMergeDocs) {
if (size(info, mergeContext) > maxMergeSizeForForcedMerge || sizeDocs(info, mergeContext) > maxMergeDocs) {
anyTooLarge = true;
break;
}
}
if (anyTooLarge) {
return findForcedMergesSizeLimit(infos, maxNumSegments, last, writer);
return findForcedMergesSizeLimit(infos, last, mergeContext);
} else {
return findForcedMergesMaxNumSegments(infos, maxNumSegments, last, writer);
return findForcedMergesMaxNumSegments(infos, maxNumSegments, last, mergeContext);
}
}
@ -374,32 +361,33 @@ public abstract class LogMergePolicy extends MergePolicy {
* deletes, up to mergeFactor at a time.
*/
@Override
public MergeSpecification findForcedDeletesMerges(SegmentInfos segmentInfos, IndexWriter writer)
public MergeSpecification findForcedDeletesMerges(SegmentInfos segmentInfos, MergeContext mergeContext)
throws IOException {
final List<SegmentCommitInfo> segments = segmentInfos.asList();
final int numSegments = segments.size();
if (verbose(writer)) {
message("findForcedDeleteMerges: " + numSegments + " segments", writer);
if (verbose(mergeContext)) {
message("findForcedDeleteMerges: " + numSegments + " segments", mergeContext);
}
MergeSpecification spec = new MergeSpecification();
int firstSegmentWithDeletions = -1;
assert writer != null;
assert mergeContext != null;
for(int i=0;i<numSegments;i++) {
final SegmentCommitInfo info = segmentInfos.info(i);
int delCount = writer.numDeletesToMerge(info);
int delCount = mergeContext.numDeletesToMerge(info);
assert assertDelCount(delCount, info);
if (delCount > 0) {
if (verbose(writer)) {
message(" segment " + info.info.name + " has deletions", writer);
if (verbose(mergeContext)) {
message(" segment " + info.info.name + " has deletions", mergeContext);
}
if (firstSegmentWithDeletions == -1)
firstSegmentWithDeletions = i;
else if (i - firstSegmentWithDeletions == mergeFactor) {
// We've seen mergeFactor segments in a row with
// deletions, so force a merge now:
if (verbose(writer)) {
message(" add merge " + firstSegmentWithDeletions + " to " + (i-1) + " inclusive", writer);
if (verbose(mergeContext)) {
message(" add merge " + firstSegmentWithDeletions + " to " + (i-1) + " inclusive", mergeContext);
}
spec.add(new OneMerge(segments.subList(firstSegmentWithDeletions, i)));
firstSegmentWithDeletions = i;
@ -408,8 +396,8 @@ public abstract class LogMergePolicy extends MergePolicy {
// End of a sequence of segments with deletions, so,
// merge those past segments even if it's fewer than
// mergeFactor segments
if (verbose(writer)) {
message(" add merge " + firstSegmentWithDeletions + " to " + (i-1) + " inclusive", writer);
if (verbose(mergeContext)) {
message(" add merge " + firstSegmentWithDeletions + " to " + (i-1) + " inclusive", mergeContext);
}
spec.add(new OneMerge(segments.subList(firstSegmentWithDeletions, i)));
firstSegmentWithDeletions = -1;
@ -417,8 +405,8 @@ public abstract class LogMergePolicy extends MergePolicy {
}
if (firstSegmentWithDeletions != -1) {
if (verbose(writer)) {
message(" add merge " + firstSegmentWithDeletions + " to " + (numSegments-1) + " inclusive", writer);
if (verbose(mergeContext)) {
message(" add merge " + firstSegmentWithDeletions + " to " + (numSegments-1) + " inclusive", mergeContext);
}
spec.add(new OneMerge(segments.subList(firstSegmentWithDeletions, numSegments)));
}
@ -450,11 +438,11 @@ public abstract class LogMergePolicy extends MergePolicy {
* will return multiple merges, allowing the {@link
* MergeScheduler} to use concurrency. */
@Override
public MergeSpecification findMerges(MergeTrigger mergeTrigger, SegmentInfos infos, IndexWriter writer) throws IOException {
public MergeSpecification findMerges(MergeTrigger mergeTrigger, SegmentInfos infos, MergeContext mergeContext) throws IOException {
final int numSegments = infos.size();
if (verbose(writer)) {
message("findMerges: " + numSegments + " segments", writer);
if (verbose(mergeContext)) {
message("findMerges: " + numSegments + " segments", mergeContext);
}
// Compute levels, which is just log (base mergeFactor)
@ -462,11 +450,11 @@ public abstract class LogMergePolicy extends MergePolicy {
final List<SegmentInfoAndLevel> levels = new ArrayList<>(numSegments);
final float norm = (float) Math.log(mergeFactor);
final Set<SegmentCommitInfo> mergingSegments = writer.getMergingSegments();
final Set<SegmentCommitInfo> mergingSegments = mergeContext.getMergingSegments();
for(int i=0;i<numSegments;i++) {
final SegmentCommitInfo info = infos.info(i);
long size = size(info, writer);
long size = size(info, mergeContext);
// Floor tiny segments
if (size < 1) {
@ -476,13 +464,13 @@ public abstract class LogMergePolicy extends MergePolicy {
final SegmentInfoAndLevel infoLevel = new SegmentInfoAndLevel(info, (float) Math.log(size)/norm);
levels.add(infoLevel);
if (verbose(writer)) {
final long segBytes = sizeBytes(info, writer);
if (verbose(mergeContext)) {
final long segBytes = sizeBytes(info, mergeContext);
String extra = mergingSegments.contains(info) ? " [merging]" : "";
if (size >= maxMergeSize) {
extra += " [skip: too large]";
}
message("seg=" + writer.segString(info) + " level=" + infoLevel.level + " size=" + String.format(Locale.ROOT, "%.3f MB", segBytes/1024/1024.) + extra, writer);
message("seg=" + segString(mergeContext, Collections.singleton(info)) + " level=" + infoLevel.level + " size=" + String.format(Locale.ROOT, "%.3f MB", segBytes/1024/1024.) + extra, mergeContext);
}
}
@ -538,8 +526,8 @@ public abstract class LogMergePolicy extends MergePolicy {
}
upto--;
}
if (verbose(writer)) {
message(" level " + levelBottom + " to " + maxLevel + ": " + (1+upto-start) + " segments", writer);
if (verbose(mergeContext)) {
message(" level " + levelBottom + " to " + maxLevel + ": " + (1+upto-start) + " segments", mergeContext);
}
// Finally, record all merges that are viable at this level:
@ -549,7 +537,7 @@ public abstract class LogMergePolicy extends MergePolicy {
boolean anyMerging = false;
for(int i=start;i<end;i++) {
final SegmentCommitInfo info = levels.get(i).info;
anyTooLarge |= (size(info, writer) >= maxMergeSize || sizeDocs(info, writer) >= maxMergeDocs);
anyTooLarge |= (size(info, mergeContext) >= maxMergeSize || sizeDocs(info, mergeContext) >= maxMergeDocs);
if (mergingSegments.contains(info)) {
anyMerging = true;
break;
@ -566,12 +554,12 @@ public abstract class LogMergePolicy extends MergePolicy {
mergeInfos.add(levels.get(i).info);
assert infos.contains(levels.get(i).info);
}
if (verbose(writer)) {
message(" add merge=" + writer.segString(mergeInfos) + " start=" + start + " end=" + end, writer);
if (verbose(mergeContext)) {
message(" add merge=" + segString(mergeContext, mergeInfos) + " start=" + start + " end=" + end, mergeContext);
}
spec.add(new OneMerge(mergeInfos));
} else if (verbose(writer)) {
message(" " + start + " to " + end + ": contains segment over maxMergeSize or maxMergeDocs; skipping", writer);
} else if (verbose(mergeContext)) {
message(" " + start + " to " + end + ": contains segment over maxMergeSize or maxMergeDocs; skipping", mergeContext);
}
start = end;

View File

@ -29,12 +29,14 @@ import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BooleanSupplier;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.lucene.document.Field;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.MergeInfo;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.IOSupplier;
import org.apache.lucene.util.InfoStream;
/**
* <p>Expert: a MergePolicy determines the sequence of
@ -50,7 +52,7 @@ import org.apache.lucene.util.IOSupplier;
* {@link MergeSpecification} instance describing the set of
* merges that should be done, or null if no merges are
* necessary. When IndexWriter.forceMerge is called, it calls
* {@link #findForcedMerges(SegmentInfos,int,Map, IndexWriter)} and the MergePolicy should
* {@link #findForcedMerges(SegmentInfos, int, Map, MergeContext)} and the MergePolicy should
* then return the necessary merges.</p>
*
* <p>Note that the policy can return more than one merge at
@ -65,6 +67,7 @@ import org.apache.lucene.util.IOSupplier;
* @lucene.experimental
*/
public abstract class MergePolicy {
/**
* Progress and state for an executing merge. This class
* encapsulates the logic to pause and resume the merge thread
@ -483,9 +486,9 @@ public abstract class MergePolicy {
* @param mergeTrigger the event that triggered the merge
* @param segmentInfos
* the total set of segments in the index
* @param writer the IndexWriter to find the merges on
* @param mergeContext the IndexWriter to find the merges on
*/
public abstract MergeSpecification findMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, IndexWriter writer)
public abstract MergeSpecification findMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext)
throws IOException;
/**
@ -494,36 +497,34 @@ public abstract class MergePolicy {
* {@link IndexWriter#forceMerge} method is called. This call is always
* synchronized on the {@link IndexWriter} instance so only one thread at a
* time will call this method.
*
* @param segmentInfos
* @param segmentInfos
* the total set of segments in the index
* @param maxSegmentCount
* requested maximum number of segments in the index (currently this
* is always 1)
* @param segmentsToMerge
* contains the specific SegmentInfo instances that must be merged
* away. This may be a subset of all
* SegmentInfos. If the value is True for a
* given SegmentInfo, that means this segment was
* an original segment present in the
* to-be-merged index; else, it was a segment
* produced by a cascaded merge.
* @param writer the IndexWriter to find the merges on
* contains the specific SegmentInfo instances that must be merged
* away. This may be a subset of all
* SegmentInfos. If the value is True for a
* given SegmentInfo, that means this segment was
* an original segment present in the
* to-be-merged index; else, it was a segment
* produced by a cascaded merge.
* @param mergeContext the IndexWriter to find the merges on
*/
public abstract MergeSpecification findForcedMerges(
SegmentInfos segmentInfos, int maxSegmentCount, Map<SegmentCommitInfo,Boolean> segmentsToMerge, IndexWriter writer)
SegmentInfos segmentInfos, int maxSegmentCount, Map<SegmentCommitInfo,Boolean> segmentsToMerge, MergeContext mergeContext)
throws IOException;
/**
* Determine what set of merge operations is necessary in order to expunge all
* deletes from the index.
*
* @param segmentInfos
* @param segmentInfos
* the total set of segments in the index
* @param writer the IndexWriter to find the merges on
* @param mergeContext the IndexWriter to find the merges on
*/
public abstract MergeSpecification findForcedDeletesMerges(
SegmentInfos segmentInfos, IndexWriter writer) throws IOException;
SegmentInfos segmentInfos, MergeContext mergeContext) throws IOException;
/**
* Returns true if a new segment (regardless of its origin) should use the
@ -532,11 +533,11 @@ public abstract class MergePolicy {
* {@link #getMaxCFSSegmentSizeMB()} and the size is less or equal to the
* TotalIndexSize * {@link #getNoCFSRatio()} otherwise <code>false</code>.
*/
public boolean useCompoundFile(SegmentInfos infos, SegmentCommitInfo mergedInfo, IndexWriter writer) throws IOException {
public boolean useCompoundFile(SegmentInfos infos, SegmentCommitInfo mergedInfo, MergeContext mergeContext) throws IOException {
if (getNoCFSRatio() == 0.0) {
return false;
}
long mergedInfoSize = size(mergedInfo, writer);
long mergedInfoSize = size(mergedInfo, mergeContext);
if (mergedInfoSize > maxCFSSegmentSize) {
return false;
}
@ -545,7 +546,7 @@ public abstract class MergePolicy {
}
long totalSize = 0;
for (SegmentCommitInfo info : infos) {
totalSize += size(info, writer);
totalSize += size(info, mergeContext);
}
return mergedInfoSize <= getNoCFSRatio() * totalSize;
}
@ -553,23 +554,34 @@ public abstract class MergePolicy {
/** Return the byte size of the provided {@link
* SegmentCommitInfo}, pro-rated by percentage of
* non-deleted documents is set. */
protected long size(SegmentCommitInfo info, IndexWriter writer) throws IOException {
protected long size(SegmentCommitInfo info, MergeContext mergeContext) throws IOException {
long byteSize = info.sizeInBytes();
int delCount = writer.numDeletesToMerge(info);
int delCount = mergeContext.numDeletesToMerge(info);
assert assertDelCount(delCount, info);
double delRatio = info.info.maxDoc() <= 0 ? 0.0f : (float) delCount / (float) info.info.maxDoc();
assert delRatio <= 1.0;
return (info.info.maxDoc() <= 0 ? byteSize : (long) (byteSize * (1.0 - delRatio)));
}
/**
* Asserts that the delCount for this SegmentCommitInfo is valid
*/
protected final boolean assertDelCount(int delCount, SegmentCommitInfo info) {
assert delCount >= 0: "delCount must be positive: " + delCount;
assert delCount <= info.info.maxDoc() : "delCount: " + delCount
+ " must be leq than maxDoc: " + info.info.maxDoc();
return true;
}
/** Returns true if this single info is already fully merged (has no
* pending deletes, is in the same dir as the
* writer, and matches the current compound file setting */
protected final boolean isMerged(SegmentInfos infos, SegmentCommitInfo info, IndexWriter writer) throws IOException {
assert writer != null;
boolean hasDeletions = writer.numDeletesToMerge(info) > 0;
return !hasDeletions &&
info.info.dir == writer.getDirectory() &&
useCompoundFile(infos, info, writer) == info.info.getUseCompoundFile();
protected final boolean isMerged(SegmentInfos infos, SegmentCommitInfo info, MergeContext mergeContext) throws IOException {
assert mergeContext != null;
int delCount = mergeContext.numDeletesToMerge(info);
assert assertDelCount(delCount, info);
return delCount == 0 &&
useCompoundFile(infos, info, mergeContext) == info.info.getUseCompoundFile();
}
/** Returns current {@code noCFSRatio}.
@ -633,4 +645,61 @@ public abstract class MergePolicy {
IOSupplier<CodecReader> readerSupplier) throws IOException {
return info.getDelCount() + pendingDeleteCount;
}
/**
* Builds a String representation of the given SegmentCommitInfo instances
*/
protected final String segString(MergeContext mergeContext, Iterable<SegmentCommitInfo> infos) {
return StreamSupport.stream(infos.spliterator(), false)
.map(info -> info.toString(mergeContext.numDeletedDocs(info) - info.getDelCount()))
.collect(Collectors.joining(" "));
}
/** Print a debug message to {@link MergeContext}'s {@code
* infoStream}. */
protected final void message(String message, MergeContext mergeContext) {
if (verbose(mergeContext)) {
mergeContext.getInfoStream().message("MP", message);
}
}
/**
* Returns <code>true</code> if the info-stream is in verbose mode
* @see #message(String, MergeContext)
*/
protected final boolean verbose(MergeContext mergeContext) {
return mergeContext.getInfoStream().isEnabled("MP");
}
/**
* This interface represents the current context of the merge selection process.
* It allows to access real-time information like the currently merging segments or
* how many deletes a segment would claim back if merged. This context might be stateful
* and change during the execution of a merge policy's selection processes.
* @lucene.experimental
*/
public interface MergeContext {
/**
* Returns the number of deletes a merge would claim back if the given segment is merged.
* @see MergePolicy#numDeletesToMerge(SegmentCommitInfo, int, org.apache.lucene.util.IOSupplier)
* @param info the segment to get the number of deletes for
*/
int numDeletesToMerge(SegmentCommitInfo info) throws IOException;
/**
* Returns the number of deleted documents in the given segments.
*/
int numDeletedDocs(SegmentCommitInfo info);
/**
* Returns the info stream that can be used to log messages
*/
InfoStream getInfoStream();
/**
* Returns an unmodifiable set of segments that are currently merging.
*/
Set<SegmentCommitInfo> getMergingSegments();
}
}

View File

@ -19,7 +19,7 @@ package org.apache.lucene.index;
/**
* MergeTrigger is passed to
* {@link org.apache.lucene.index.MergePolicy#findMerges(MergeTrigger, org.apache.lucene.index.SegmentInfos, IndexWriter)} to indicate the
* {@link MergePolicy#findMerges(MergeTrigger, SegmentInfos, MergePolicy.MergeContext)} to indicate the
* event that triggered the merge.
*/
public enum MergeTrigger {

View File

@ -36,22 +36,22 @@ public final class NoMergePolicy extends MergePolicy {
}
@Override
public MergeSpecification findMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, IndexWriter writer) { return null; }
public MergeSpecification findMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext) { return null; }
@Override
public MergeSpecification findForcedMerges(SegmentInfos segmentInfos,
int maxSegmentCount, Map<SegmentCommitInfo,Boolean> segmentsToMerge, IndexWriter writer) { return null; }
int maxSegmentCount, Map<SegmentCommitInfo,Boolean> segmentsToMerge, MergeContext mergeContext) { return null; }
@Override
public MergeSpecification findForcedDeletesMerges(SegmentInfos segmentInfos, IndexWriter writer) { return null; }
public MergeSpecification findForcedDeletesMerges(SegmentInfos segmentInfos, MergeContext mergeContext) { return null; }
@Override
public boolean useCompoundFile(SegmentInfos segments, SegmentCommitInfo newSegment, IndexWriter writer) {
public boolean useCompoundFile(SegmentInfos segments, SegmentCommitInfo newSegment, MergeContext mergeContext) {
return newSegment.info.getUseCompoundFile();
}
@Override
protected long size(SegmentCommitInfo info, IndexWriter writer) throws IOException {
protected long size(SegmentCommitInfo info, MergeContext context) throws IOException {
return Long.MAX_VALUE;
}

View File

@ -42,21 +42,21 @@ public class OneMergeWrappingMergePolicy extends FilterMergePolicy {
}
@Override
public MergeSpecification findMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, IndexWriter writer)
public MergeSpecification findMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext)
throws IOException {
return wrapSpec(in.findMerges(mergeTrigger, segmentInfos, writer));
return wrapSpec(in.findMerges(mergeTrigger, segmentInfos, mergeContext));
}
@Override
public MergeSpecification findForcedMerges(SegmentInfos segmentInfos, int maxSegmentCount,
Map<SegmentCommitInfo,Boolean> segmentsToMerge, IndexWriter writer) throws IOException {
return wrapSpec(in.findForcedMerges(segmentInfos, maxSegmentCount, segmentsToMerge, writer));
Map<SegmentCommitInfo,Boolean> segmentsToMerge, MergeContext mergeContext) throws IOException {
return wrapSpec(in.findForcedMerges(segmentInfos, maxSegmentCount, segmentsToMerge, mergeContext));
}
@Override
public MergeSpecification findForcedDeletesMerges(SegmentInfos segmentInfos, IndexWriter writer)
public MergeSpecification findForcedDeletesMerges(SegmentInfos segmentInfos, MergeContext mergeContext)
throws IOException {
return wrapSpec(in.findForcedDeletesMerges(segmentInfos, writer));
return wrapSpec(in.findForcedDeletesMerges(segmentInfos, mergeContext));
}
private MergeSpecification wrapSpec(MergeSpecification spec) {

View File

@ -20,6 +20,7 @@ package org.apache.lucene.index;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
@ -272,23 +273,23 @@ public class TieredMergePolicy extends MergePolicy {
abstract String getExplanation();
}
private Map<SegmentCommitInfo,Long> getSegmentSizes(IndexWriter writer, Collection<SegmentCommitInfo> infos) throws IOException {
private Map<SegmentCommitInfo,Long> getSegmentSizes(MergeContext mergeContext, Collection<SegmentCommitInfo> infos) throws IOException {
Map<SegmentCommitInfo,Long> sizeInBytes = new HashMap<>();
for (SegmentCommitInfo info : infos) {
sizeInBytes.put(info, size(info, writer));
sizeInBytes.put(info, size(info, mergeContext));
}
return sizeInBytes;
}
@Override
public MergeSpecification findMerges(MergeTrigger mergeTrigger, SegmentInfos infos, IndexWriter writer) throws IOException {
if (verbose(writer)) {
message("findMerges: " + infos.size() + " segments", writer);
public MergeSpecification findMerges(MergeTrigger mergeTrigger, SegmentInfos infos, MergeContext mergeContext) throws IOException {
if (verbose(mergeContext)) {
message("findMerges: " + infos.size() + " segments", mergeContext);
}
if (infos.size() == 0) {
return null;
}
final Set<SegmentCommitInfo> merging = writer.getMergingSegments();
final Set<SegmentCommitInfo> merging = mergeContext.getMergingSegments();
final Set<SegmentCommitInfo> toBeMerged = new HashSet<>();
final List<SegmentCommitInfo> infosSorted = new ArrayList<>(infos.asList());
@ -296,7 +297,7 @@ public class TieredMergePolicy extends MergePolicy {
// The size can change concurrently while we are running here, because deletes
// are now applied concurrently, and this can piss off TimSort! So we
// call size() once per segment and sort by that:
Map<SegmentCommitInfo,Long> sizeInBytes = getSegmentSizes(writer, infos.asList());
Map<SegmentCommitInfo,Long> sizeInBytes = getSegmentSizes(mergeContext, infos.asList());
infosSorted.sort(new SegmentByteSizeDescending(sizeInBytes));
@ -305,14 +306,14 @@ public class TieredMergePolicy extends MergePolicy {
long minSegmentBytes = Long.MAX_VALUE;
for(SegmentCommitInfo info : infosSorted) {
final long segBytes = sizeInBytes.get(info);
if (verbose(writer)) {
if (verbose(mergeContext)) {
String extra = merging.contains(info) ? " [merging]" : "";
if (segBytes >= maxMergedSegmentBytes/2.0) {
extra += " [skip: too large]";
} else if (segBytes < floorSegmentBytes) {
extra += " [floored]";
}
message(" seg=" + writer.segString(info) + " size=" + String.format(Locale.ROOT, "%.3f", segBytes/1024/1024.) + " MB" + extra, writer);
message(" seg=" + segString(mergeContext, Collections.singleton(info)) + " size=" + String.format(Locale.ROOT, "%.3f", segBytes/1024/1024.) + " MB" + extra, mergeContext);
}
minSegmentBytes = Math.min(segBytes, minSegmentBytes);
@ -372,8 +373,8 @@ public class TieredMergePolicy extends MergePolicy {
final boolean maxMergeIsRunning = mergingBytes >= maxMergedSegmentBytes;
if (verbose(writer)) {
message(" allowedSegmentCount=" + allowedSegCountInt + " vs count=" + infosSorted.size() + " (eligible count=" + eligible.size() + ") tooBigCount=" + tooBigCount, writer);
if (verbose(mergeContext)) {
message(" allowedSegmentCount=" + allowedSegCountInt + " vs count=" + infosSorted.size() + " (eligible count=" + eligible.size() + ") tooBigCount=" + tooBigCount, mergeContext);
}
if (eligible.size() == 0) {
@ -417,9 +418,9 @@ public class TieredMergePolicy extends MergePolicy {
// segments, and already pre-excluded the too-large segments:
assert candidate.size() > 0;
final MergeScore score = score(candidate, hitTooLarge, mergingBytes, writer, sizeInBytes);
if (verbose(writer)) {
message(" maybe=" + writer.segString(candidate) + " score=" + score.getScore() + " " + score.getExplanation() + " tooLarge=" + hitTooLarge + " size=" + String.format(Locale.ROOT, "%.3f MB", totAfterMergeBytes/1024./1024.), writer);
final MergeScore score = score(candidate, hitTooLarge, sizeInBytes);
if (verbose(mergeContext)) {
message(" maybe=" + segString(mergeContext, candidate) + " score=" + score.getScore() + " " + score.getExplanation() + " tooLarge=" + hitTooLarge + " size=" + String.format(Locale.ROOT, "%.3f MB", totAfterMergeBytes/1024./1024.), mergeContext);
}
// If we are already running a max sized merge
@ -441,8 +442,8 @@ public class TieredMergePolicy extends MergePolicy {
spec.add(merge);
toBeMerged.addAll(merge.segments);
if (verbose(writer)) {
message(" add merge=" + writer.segString(merge.segments) + " size=" + String.format(Locale.ROOT, "%.3f MB", bestMergeBytes/1024./1024.) + " score=" + String.format(Locale.ROOT, "%.3f", bestScore.getScore()) + " " + bestScore.getExplanation() + (bestTooLarge ? " [max merge]" : ""), writer);
if (verbose(mergeContext)) {
message(" add merge=" + segString(mergeContext, merge.segments) + " size=" + String.format(Locale.ROOT, "%.3f MB", bestMergeBytes/1024./1024.) + " score=" + String.format(Locale.ROOT, "%.3f", bestScore.getScore()) + " " + bestScore.getExplanation() + (bestTooLarge ? " [max merge]" : ""), mergeContext);
}
} else {
return spec;
@ -454,7 +455,7 @@ public class TieredMergePolicy extends MergePolicy {
}
/** Expert: scores one merge; subclasses can override. */
protected MergeScore score(List<SegmentCommitInfo> candidate, boolean hitTooLarge, long mergingBytes, IndexWriter writer, Map<SegmentCommitInfo, Long> sizeInBytes) throws IOException {
protected MergeScore score(List<SegmentCommitInfo> candidate, boolean hitTooLarge, Map<SegmentCommitInfo, Long> sizeInBytes) throws IOException {
long totBeforeMergeBytes = 0;
long totAfterMergeBytes = 0;
long totAfterMergeBytesFloored = 0;
@ -513,14 +514,14 @@ public class TieredMergePolicy extends MergePolicy {
}
@Override
public MergeSpecification findForcedMerges(SegmentInfos infos, int maxSegmentCount, Map<SegmentCommitInfo,Boolean> segmentsToMerge, IndexWriter writer) throws IOException {
if (verbose(writer)) {
message("findForcedMerges maxSegmentCount=" + maxSegmentCount + " infos=" + writer.segString(infos) + " segmentsToMerge=" + segmentsToMerge, writer);
public MergeSpecification findForcedMerges(SegmentInfos infos, int maxSegmentCount, Map<SegmentCommitInfo,Boolean> segmentsToMerge, MergeContext mergeContext) throws IOException {
if (verbose(mergeContext)) {
message("findForcedMerges maxSegmentCount=" + maxSegmentCount + " infos=" + segString(mergeContext, infos) + " segmentsToMerge=" + segmentsToMerge, mergeContext);
}
List<SegmentCommitInfo> eligible = new ArrayList<>();
boolean forceMergeRunning = false;
final Set<SegmentCommitInfo> merging = writer.getMergingSegments();
final Set<SegmentCommitInfo> merging = mergeContext.getMergingSegments();
boolean segmentIsOriginal = false;
for(SegmentCommitInfo info : infos) {
final Boolean isOriginal = segmentsToMerge.get(info);
@ -541,21 +542,21 @@ public class TieredMergePolicy extends MergePolicy {
// The size can change concurrently while we are running here, because deletes
// are now applied concurrently, and this can piss off TimSort! So we
// call size() once per segment and sort by that:
Map<SegmentCommitInfo,Long> sizeInBytes = getSegmentSizes(writer, eligible);
Map<SegmentCommitInfo,Long> sizeInBytes = getSegmentSizes(mergeContext, eligible);
if ((maxSegmentCount > 1 && eligible.size() <= maxSegmentCount) ||
(maxSegmentCount == 1 && eligible.size() == 1 && (!segmentIsOriginal || isMerged(infos, eligible.get(0), writer)))) {
if (verbose(writer)) {
message("already merged", writer);
(maxSegmentCount == 1 && eligible.size() == 1 && (!segmentIsOriginal || isMerged(infos, eligible.get(0), mergeContext)))) {
if (verbose(mergeContext)) {
message("already merged", mergeContext);
}
return null;
}
eligible.sort(new SegmentByteSizeDescending(sizeInBytes));
if (verbose(writer)) {
message("eligible=" + eligible, writer);
message("forceMergeRunning=" + forceMergeRunning, writer);
if (verbose(mergeContext)) {
message("eligible=" + eligible, mergeContext);
message("forceMergeRunning=" + forceMergeRunning, mergeContext);
}
int end = eligible.size();
@ -568,8 +569,8 @@ public class TieredMergePolicy extends MergePolicy {
spec = new MergeSpecification();
}
final OneMerge merge = new OneMerge(eligible.subList(end-maxMergeAtOnceExplicit, end));
if (verbose(writer)) {
message("add merge=" + writer.segString(merge.segments), writer);
if (verbose(mergeContext)) {
message("add merge=" + segString(mergeContext, merge.segments), mergeContext);
}
spec.add(merge);
end -= maxMergeAtOnceExplicit;
@ -579,8 +580,8 @@ public class TieredMergePolicy extends MergePolicy {
// Do final merge
final int numToMerge = end - maxSegmentCount + 1;
final OneMerge merge = new OneMerge(eligible.subList(end-numToMerge, end));
if (verbose(writer)) {
message("add final merge=" + merge.segString(), writer);
if (verbose(mergeContext)) {
message("add final merge=" + merge.segString(), mergeContext);
}
spec = new MergeSpecification();
spec.add(merge);
@ -590,14 +591,16 @@ public class TieredMergePolicy extends MergePolicy {
}
@Override
public MergeSpecification findForcedDeletesMerges(SegmentInfos infos, IndexWriter writer) throws IOException {
if (verbose(writer)) {
message("findForcedDeletesMerges infos=" + writer.segString(infos) + " forceMergeDeletesPctAllowed=" + forceMergeDeletesPctAllowed, writer);
public MergeSpecification findForcedDeletesMerges(SegmentInfos infos, MergeContext mergeContext) throws IOException {
if (verbose(mergeContext)) {
message("findForcedDeletesMerges infos=" + segString(mergeContext, infos) + " forceMergeDeletesPctAllowed=" + forceMergeDeletesPctAllowed, mergeContext);
}
final List<SegmentCommitInfo> eligible = new ArrayList<>();
final Set<SegmentCommitInfo> merging = writer.getMergingSegments();
final Set<SegmentCommitInfo> merging = mergeContext.getMergingSegments();
for(SegmentCommitInfo info : infos) {
double pctDeletes = 100.*((double) writer.numDeletesToMerge(info))/info.info.maxDoc();
int delCount = mergeContext.numDeletesToMerge(info);
assert assertDelCount(delCount, info);
double pctDeletes = 100.*((double) delCount)/info.info.maxDoc();
if (pctDeletes > forceMergeDeletesPctAllowed && !merging.contains(info)) {
eligible.add(info);
}
@ -610,12 +613,12 @@ public class TieredMergePolicy extends MergePolicy {
// The size can change concurrently while we are running here, because deletes
// are now applied concurrently, and this can piss off TimSort! So we
// call size() once per segment and sort by that:
Map<SegmentCommitInfo,Long> sizeInBytes = getSegmentSizes(writer, infos.asList());
Map<SegmentCommitInfo,Long> sizeInBytes = getSegmentSizes(mergeContext, infos.asList());
eligible.sort(new SegmentByteSizeDescending(sizeInBytes));
if (verbose(writer)) {
message("eligible=" + eligible, writer);
if (verbose(mergeContext)) {
message("eligible=" + eligible, mergeContext);
}
int start = 0;
@ -631,8 +634,8 @@ public class TieredMergePolicy extends MergePolicy {
}
final OneMerge merge = new OneMerge(eligible.subList(start, end));
if (verbose(writer)) {
message("add merge=" + writer.segString(merge.segments), writer);
if (verbose(mergeContext)) {
message("add merge=" + segString(mergeContext, merge.segments), mergeContext);
}
spec.add(merge);
start = end;
@ -645,14 +648,6 @@ public class TieredMergePolicy extends MergePolicy {
return Math.max(floorSegmentBytes, bytes);
}
private boolean verbose(IndexWriter writer) {
return writer != null && writer.infoStream.isEnabled("TMP");
}
private void message(String message, IndexWriter writer) {
writer.infoStream.message("TMP", message);
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder("[" + getClass().getSimpleName() + ": ");

View File

@ -66,12 +66,12 @@ public class UpgradeIndexMergePolicy extends FilterMergePolicy {
}
@Override
public MergeSpecification findMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, IndexWriter writer) throws IOException {
return in.findMerges(null, segmentInfos, writer);
public MergeSpecification findMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext) throws IOException {
return in.findMerges(null, segmentInfos, mergeContext);
}
@Override
public MergeSpecification findForcedMerges(SegmentInfos segmentInfos, int maxSegmentCount, Map<SegmentCommitInfo,Boolean> segmentsToMerge, IndexWriter writer) throws IOException {
public MergeSpecification findForcedMerges(SegmentInfos segmentInfos, int maxSegmentCount, Map<SegmentCommitInfo,Boolean> segmentsToMerge, MergeContext mergeContext) throws IOException {
// first find all old segments
final Map<SegmentCommitInfo,Boolean> oldSegments = new HashMap<>();
for (final SegmentCommitInfo si : segmentInfos) {
@ -81,14 +81,14 @@ public class UpgradeIndexMergePolicy extends FilterMergePolicy {
}
}
if (verbose(writer)) {
message("findForcedMerges: segmentsToUpgrade=" + oldSegments, writer);
if (verbose(mergeContext)) {
message("findForcedMerges: segmentsToUpgrade=" + oldSegments, mergeContext);
}
if (oldSegments.isEmpty())
return null;
MergeSpecification spec = in.findForcedMerges(segmentInfos, maxSegmentCount, oldSegments, writer);
MergeSpecification spec = in.findForcedMerges(segmentInfos, maxSegmentCount, oldSegments, mergeContext);
if (spec != null) {
// remove all segments that are in merge specification from oldSegments,
@ -100,9 +100,9 @@ public class UpgradeIndexMergePolicy extends FilterMergePolicy {
}
if (!oldSegments.isEmpty()) {
if (verbose(writer)) {
if (verbose(mergeContext)) {
message("findForcedMerges: " + in.getClass().getSimpleName() +
" does not want to merge all old segments, merge remaining ones into new segment: " + oldSegments, writer);
" does not want to merge all old segments, merge remaining ones into new segment: " + oldSegments, mergeContext);
}
final List<SegmentCommitInfo> newInfos = new ArrayList<>();
for (final SegmentCommitInfo si : segmentInfos) {
@ -120,11 +120,4 @@ public class UpgradeIndexMergePolicy extends FilterMergePolicy {
return spec;
}
private boolean verbose(IndexWriter writer) {
return writer != null && writer.infoStream.isEnabled("UPGMP");
}
private void message(String message, IndexWriter writer) {
writer.infoStream.message("UPGMP", message);
}
}

View File

@ -593,28 +593,28 @@ public class TestDemoParallelLeafReader extends LuceneTestCase {
@Override
public MergeSpecification findMerges(MergeTrigger mergeTrigger,
SegmentInfos segmentInfos, IndexWriter writer) throws IOException {
return wrap(in.findMerges(mergeTrigger, segmentInfos, writer));
SegmentInfos segmentInfos, MergeContext mergeContext) throws IOException {
return wrap(in.findMerges(mergeTrigger, segmentInfos, mergeContext));
}
@Override
public MergeSpecification findForcedMerges(SegmentInfos segmentInfos,
int maxSegmentCount, Map<SegmentCommitInfo,Boolean> segmentsToMerge, IndexWriter writer)
int maxSegmentCount, Map<SegmentCommitInfo,Boolean> segmentsToMerge, MergeContext mergeContext)
throws IOException {
// TODO: do we need to force-force this? Ie, wrapped MP may think index is already optimized, yet maybe its schemaGen is old? need test!
return wrap(in.findForcedMerges(segmentInfos, maxSegmentCount, segmentsToMerge, writer));
return wrap(in.findForcedMerges(segmentInfos, maxSegmentCount, segmentsToMerge, mergeContext));
}
@Override
public MergeSpecification findForcedDeletesMerges(SegmentInfos segmentInfos, IndexWriter writer)
public MergeSpecification findForcedDeletesMerges(SegmentInfos segmentInfos, MergeContext mergeContext)
throws IOException {
return wrap(in.findForcedDeletesMerges(segmentInfos, writer));
return wrap(in.findForcedDeletesMerges(segmentInfos, mergeContext));
}
@Override
public boolean useCompoundFile(SegmentInfos segments,
SegmentCommitInfo newSegment, IndexWriter writer) throws IOException {
return in.useCompoundFile(segments, newSegment, writer);
SegmentCommitInfo newSegment, MergeContext mergeContext) throws IOException {
return in.useCompoundFile(segments, newSegment, mergeContext);
}
@Override

View File

@ -23,9 +23,9 @@ import org.apache.lucene.util.LuceneTestCase;
public class TestFilterMergePolicy extends LuceneTestCase {
public void testMethodsOverridden() throws Exception {
public void testMethodsOverridden() {
for (Method m : MergePolicy.class.getDeclaredMethods()) {
if (Modifier.isFinal(m.getModifiers())) continue;
if (Modifier.isFinal(m.getModifiers()) || Modifier.isPrivate(m.getModifiers())) continue;
try {
FilterMergePolicy.class.getDeclaredMethod(m.getName(), m.getParameterTypes());
} catch (NoSuchMethodException e) {

View File

@ -48,19 +48,19 @@ public class TestOneMergeWrappingMergePolicy extends LuceneTestCase {
}
@Override
public MergePolicy.MergeSpecification findMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, IndexWriter writer)
public MergePolicy.MergeSpecification findMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext)
throws IOException {
return merges;
}
@Override
public MergePolicy.MergeSpecification findForcedMerges(SegmentInfos segmentInfos, int maxSegmentCount,
Map<SegmentCommitInfo,Boolean> segmentsToMerge, IndexWriter writer) throws IOException {
Map<SegmentCommitInfo,Boolean> segmentsToMerge, MergeContext mergeContext) throws IOException {
return forcedMerges;
}
@Override
public MergePolicy.MergeSpecification findForcedDeletesMerges(SegmentInfos segmentInfos, IndexWriter writer)
public MergePolicy.MergeSpecification findForcedDeletesMerges(SegmentInfos segmentInfos, MergeContext mergeContext)
throws IOException {
return forcedDeletesMerges;
}

View File

@ -253,7 +253,7 @@ public class TestPerSegmentDeletes extends LuceneTestCase {
}
@Override
public MergeSpecification findMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, IndexWriter writer)
public MergeSpecification findMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext)
throws IOException {
MergeSpecification ms = new MergeSpecification();
if (doMerge) {
@ -267,19 +267,19 @@ public class TestPerSegmentDeletes extends LuceneTestCase {
@Override
public MergeSpecification findForcedMerges(SegmentInfos segmentInfos,
int maxSegmentCount, Map<SegmentCommitInfo,Boolean> segmentsToMerge, IndexWriter writer)
int maxSegmentCount, Map<SegmentCommitInfo,Boolean> segmentsToMerge, MergeContext mergeContext)
throws IOException {
return null;
}
@Override
public MergeSpecification findForcedDeletesMerges(
SegmentInfos segmentInfos, IndexWriter writer) throws IOException {
SegmentInfos segmentInfos, MergeContext mergeContext) throws IOException {
return null;
}
@Override
public boolean useCompoundFile(SegmentInfos segments, SegmentCommitInfo newSegment, IndexWriter writer) {
public boolean useCompoundFile(SegmentInfos segments, SegmentCommitInfo newSegment, MergeContext mergeContext) {
return useCompoundFile;
}
}

View File

@ -53,7 +53,7 @@ public class AlcoholicMergePolicy extends LogMergePolicy {
@Override
//@BlackMagic(level=Voodoo);
protected long size(SegmentCommitInfo info, IndexWriter writer) throws IOException {
protected long size(SegmentCommitInfo info, MergeContext mergeContext) throws IOException {
int hourOfDay = calendar.get(Calendar.HOUR_OF_DAY);
if (hourOfDay < 6 ||
hourOfDay > 20 ||

View File

@ -19,11 +19,19 @@ package org.apache.lucene.index;
import org.apache.lucene.analysis.MockAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.InfoStream;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.NullInfoStream;
import org.apache.lucene.util.StringHelper;
import org.apache.lucene.util.TestUtil;
import org.apache.lucene.util.Version;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.ToIntFunction;
/**
* Base test case for {@link MergePolicy}.
@ -80,5 +88,69 @@ public abstract class BaseMergePolicyTestCase extends LuceneTestCase {
writer.close();
}
}
public void testFindForcedDeletesMerges() throws IOException {
MergePolicy mp = mergePolicy();
SegmentInfos infos = new SegmentInfos(Version.LATEST.major);
try (Directory directory = newDirectory()) {
MergePolicy.MergeContext context = new MockMergeContext(s -> 0);
int numSegs = random().nextInt(10);
for (int i = 0; i < numSegs; i++) {
SegmentInfo info = new SegmentInfo(
directory, // dir
Version.LATEST, // version
Version.LATEST, // min version
TestUtil.randomSimpleString(random()), // name
random().nextInt(Integer.MAX_VALUE), // maxDoc
random().nextBoolean(), // isCompoundFile
null, // codec
Collections.emptyMap(), // diagnostics
TestUtil.randomSimpleString(// id
random(),
StringHelper.ID_LENGTH,
StringHelper.ID_LENGTH).getBytes(StandardCharsets.US_ASCII),
Collections.emptyMap(), // attributes
null /* indexSort */);
info.setFiles(Collections.emptyList());
infos.add(new SegmentCommitInfo(info, random().nextInt(1), -1, -1, -1));
}
MergePolicy.MergeSpecification forcedDeletesMerges = mp.findForcedDeletesMerges(infos, context);
if (forcedDeletesMerges != null) {
assertEquals(0, forcedDeletesMerges.merges.size());
}
}
}
/**
* Simple mock merge context for tests
*/
public static final class MockMergeContext implements MergePolicy.MergeContext {
private final ToIntFunction<SegmentCommitInfo> numDeletesFunc;
private final InfoStream infoStream = new NullInfoStream();
public MockMergeContext(ToIntFunction<SegmentCommitInfo> numDeletesFunc) {
this.numDeletesFunc = numDeletesFunc;
}
@Override
public int numDeletesToMerge(SegmentCommitInfo info) {
return numDeletesFunc.applyAsInt(info);
}
@Override
public int numDeletedDocs(SegmentCommitInfo info) {
return numDeletesToMerge(info);
}
@Override
public InfoStream getInfoStream() {
return infoStream;
}
@Override
public Set<SegmentCommitInfo> getMergingSegments() {
return Collections.emptySet();
}
}
}

View File

@ -17,10 +17,6 @@
package org.apache.lucene.index;
import java.io.IOException;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.MergeTrigger;
import org.apache.lucene.index.SegmentInfos;
/**
* A {@link MergePolicy} that only returns forced merges.
@ -38,7 +34,7 @@ public final class ForceMergePolicy extends FilterMergePolicy {
}
@Override
public MergeSpecification findMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, IndexWriter writer)
public MergeSpecification findMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext)
throws IOException {
return null;
}

View File

@ -49,14 +49,12 @@ public class MockRandomMergePolicy extends MergePolicy {
}
@Override
public MergeSpecification findMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, IndexWriter writer) {
public MergeSpecification findMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext) {
MergeSpecification mergeSpec = null;
//System.out.println("MRMP: findMerges sis=" + segmentInfos);
int numSegments = segmentInfos.size();
List<SegmentCommitInfo> segments = new ArrayList<>();
final Set<SegmentCommitInfo> merging = writer.getMergingSegments();
final Set<SegmentCommitInfo> merging = mergeContext.getMergingSegments();
for(SegmentCommitInfo sipc : segmentInfos) {
if (!merging.contains(sipc)) {
@ -64,7 +62,7 @@ public class MockRandomMergePolicy extends MergePolicy {
}
}
numSegments = segments.size();
int numSegments = segments.size();
if (numSegments > 1 && (numSegments > 30 || random.nextInt(5) == 3)) {
@ -85,7 +83,7 @@ public class MockRandomMergePolicy extends MergePolicy {
@Override
public MergeSpecification findForcedMerges(
SegmentInfos segmentInfos, int maxSegmentCount, Map<SegmentCommitInfo,Boolean> segmentsToMerge, IndexWriter writer)
SegmentInfos segmentInfos, int maxSegmentCount, Map<SegmentCommitInfo,Boolean> segmentsToMerge, MergeContext mergeContext)
throws IOException {
final List<SegmentCommitInfo> eligibleSegments = new ArrayList<>();
@ -97,7 +95,7 @@ public class MockRandomMergePolicy extends MergePolicy {
//System.out.println("MRMP: findMerges sis=" + segmentInfos + " eligible=" + eligibleSegments);
MergeSpecification mergeSpec = null;
if (eligibleSegments.size() > 1 || (eligibleSegments.size() == 1 && isMerged(segmentInfos, eligibleSegments.get(0), writer) == false)) {
if (eligibleSegments.size() > 1 || (eligibleSegments.size() == 1 && isMerged(segmentInfos, eligibleSegments.get(0), mergeContext) == false)) {
mergeSpec = new MergeSpecification();
// Already shuffled having come out of a set but
// shuffle again for good measure:
@ -126,12 +124,12 @@ public class MockRandomMergePolicy extends MergePolicy {
}
@Override
public MergeSpecification findForcedDeletesMerges(SegmentInfos segmentInfos, IndexWriter writer) throws IOException {
return findMerges(null, segmentInfos, writer);
public MergeSpecification findForcedDeletesMerges(SegmentInfos segmentInfos, MergeContext mergeContext) throws IOException {
return findMerges(null, segmentInfos, mergeContext);
}
@Override
public boolean useCompoundFile(SegmentInfos infos, SegmentCommitInfo mergedInfo, IndexWriter writer) throws IOException {
public boolean useCompoundFile(SegmentInfos infos, SegmentCommitInfo mergedInfo, MergeContext mergeContext) throws IOException {
// 80% of the time we create CFS:
return random.nextInt(5) != 1;
}