LUCENE-2904: LogMP pays attention to which segments are already being merged

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1067299 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael McCandless 2011-02-04 21:54:32 +00:00
parent cdb47ea15f
commit 2de3b26a09
2 changed files with 51 additions and 20 deletions

View File

@ -1829,6 +1829,18 @@ public class IndexWriter implements Closeable {
} }
} }
/** Expert: to be used by a {@link MergePolicy} to avoid
* selecting merges for segments already being merged.
* The returned collection is not cloned, and thus is
* only safe to access if you hold IndexWriter's lock
* (which you do when IndexWriter invokes the
* MergePolicy).
*
* <p>Do not alter the returned collection! */
public synchronized Collection<SegmentInfo> getMergingSegments() {
return mergingSegments;
}
/** Expert: the {@link MergeScheduler} calls this method /** Expert: the {@link MergeScheduler} calls this method
* to retrieve the next merge requested by the * to retrieve the next merge requested by the
* MergePolicy */ * MergePolicy */

View File

@ -18,9 +18,12 @@ package org.apache.lucene.index;
*/ */
import java.io.IOException; import java.io.IOException;
import java.util.Set; import java.util.ArrayList;
import java.util.Arrays; import java.util.Collection;
import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
import java.util.List;
import java.util.Set;
/** <p>This class implements a {@link MergePolicy} that tries /** <p>This class implements a {@link MergePolicy} that tries
* to merge segments into levels of exponentially * to merge segments into levels of exponentially
@ -474,7 +477,7 @@ public abstract class LogMergePolicy extends MergePolicy {
return spec; return spec;
} }
private static class SegmentInfoAndLevel implements Comparable { private static class SegmentInfoAndLevel implements Comparable<SegmentInfoAndLevel> {
SegmentInfo info; SegmentInfo info;
float level; float level;
int index; int index;
@ -486,8 +489,7 @@ public abstract class LogMergePolicy extends MergePolicy {
} }
// Sorts largest to smallest // Sorts largest to smallest
public int compareTo(Object o) { public int compareTo(SegmentInfoAndLevel other) {
SegmentInfoAndLevel other = (SegmentInfoAndLevel) o;
if (level < other.level) if (level < other.level)
return 1; return 1;
else if (level > other.level) else if (level > other.level)
@ -521,22 +523,37 @@ public abstract class LogMergePolicy extends MergePolicy {
// Compute levels, which is just log (base mergeFactor) // Compute levels, which is just log (base mergeFactor)
// of the size of each segment // of the size of each segment
SegmentInfoAndLevel[] levels = new SegmentInfoAndLevel[numSegments]; final List<SegmentInfoAndLevel> levels = new ArrayList<SegmentInfoAndLevel>();
final float norm = (float) Math.log(mergeFactor); final float norm = (float) Math.log(mergeFactor);
final Collection<SegmentInfo> mergingSegments = writer.get().getMergingSegments();
for(int i=0;i<numSegments;i++) { for(int i=0;i<numSegments;i++) {
final SegmentInfo info = infos.info(i); final SegmentInfo info = infos.info(i);
long size = size(info); long size = size(info);
// When we require contiguous merge, we still add the
// segment to levels to avoid merging "across" a set
// of segment being merged:
if (!requireContiguousMerge && mergingSegments.contains(info)) {
if (verbose()) {
message("seg " + info.name + " already being merged; skip");
}
continue;
}
// Floor tiny segments // Floor tiny segments
if (size < 1) if (size < 1) {
size = 1; size = 1;
levels[i] = new SegmentInfoAndLevel(info, (float) Math.log(size)/norm, i); }
message("seg " + info.name + " level=" + levels[i].level + " size=" + size); levels.add(new SegmentInfoAndLevel(info, (float) Math.log(size)/norm, i));
if (verbose()) {
message("seg " + info.name + " level=" + levels.get(i).level + " size=" + size);
}
} }
if (!requireContiguousMerge) { if (!requireContiguousMerge) {
Arrays.sort(levels); Collections.sort(levels);
} }
final float levelFloor; final float levelFloor;
@ -554,14 +571,16 @@ public abstract class LogMergePolicy extends MergePolicy {
MergeSpecification spec = null; MergeSpecification spec = null;
final int numMergeableSegments = levels.size();
int start = 0; int start = 0;
while(start < numSegments) { while(start < numMergeableSegments) {
// Find max level of all segments not already // Find max level of all segments not already
// quantized. // quantized.
float maxLevel = levels[start].level; float maxLevel = levels.get(start).level;
for(int i=1+start;i<numSegments;i++) { for(int i=1+start;i<numMergeableSegments;i++) {
final float level = levels[i].level; final float level = levels.get(i).level;
if (level > maxLevel) if (level > maxLevel)
maxLevel = level; maxLevel = level;
} }
@ -580,9 +599,9 @@ public abstract class LogMergePolicy extends MergePolicy {
levelBottom = levelFloor; levelBottom = levelFloor;
} }
int upto = numSegments-1; int upto = numMergeableSegments-1;
while(upto >= start) { while(upto >= start) {
if (levels[upto].level >= levelBottom) { if (levels.get(upto).level >= levelBottom) {
break; break;
} }
upto--; upto--;
@ -595,7 +614,7 @@ public abstract class LogMergePolicy extends MergePolicy {
while(end <= 1+upto) { while(end <= 1+upto) {
boolean anyTooLarge = false; boolean anyTooLarge = false;
for(int i=start;i<end;i++) { for(int i=start;i<end;i++) {
final SegmentInfo info = levels[i].info; final SegmentInfo info = levels.get(i).info;
anyTooLarge |= (size(info) >= maxMergeSize || sizeDocs(info) >= maxMergeDocs); anyTooLarge |= (size(info) >= maxMergeSize || sizeDocs(info) >= maxMergeDocs);
} }
@ -605,11 +624,11 @@ public abstract class LogMergePolicy extends MergePolicy {
if (verbose()) { if (verbose()) {
message(" " + start + " to " + end + ": add this merge"); message(" " + start + " to " + end + ": add this merge");
} }
Arrays.sort(levels, start, end, sortByIndex); Collections.sort(levels.subList(start, end), sortByIndex);
final SegmentInfos mergeInfos = new SegmentInfos(); final SegmentInfos mergeInfos = new SegmentInfos();
for(int i=start;i<end;i++) { for(int i=start;i<end;i++) {
mergeInfos.add(levels[i].info); mergeInfos.add(levels.get(i).info);
assert infos.contains(levels[i].info); assert infos.contains(levels.get(i).info);
} }
spec.add(new OneMerge(mergeInfos)); spec.add(new OneMerge(mergeInfos));
} else if (verbose()) { } else if (verbose()) {