new segment merging policy: LUCENE-672

git-svn-id: https://svn.apache.org/repos/asf/lucene/java/trunk@443434 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Yonik Seeley 2006-09-14 18:31:21 +00:00
parent 885e92add1
commit 4465baafa5
4 changed files with 374 additions and 108 deletions

View File

@ -145,6 +145,12 @@ Optimizations
5. Modified TermScorer.explain to use TermDocs.skipTo() instead of looping through docs. (Grant Ingersoll)
6. LUCENE-672: New indexing segment merge policy flushes all buffered docs
to their own segment and delays a merge until mergeFactor segments of a
certain level have been accumulated. This increases indexing performance
in the presence of deleted docs or partially full segments as well as
enabling future optimizations. (Ning Li, Yonik Seeley)
Test Cases
1. Added TestTermScorer.java (Grant Ingersoll)

View File

@ -108,9 +108,9 @@ public class IndexWriter {
private Similarity similarity = Similarity.getDefault(); // how to normalize
private SegmentInfos segmentInfos = new SegmentInfos(); // the segments
private SegmentInfos ramSegmentInfos = new SegmentInfos(); // the segments in ramDirectory
private final Directory ramDirectory = new RAMDirectory(); // for temp segs
private int singleDocSegmentsCount = 0; // for speeding decision on merge candidates
private Lock writeLock;
private int termIndexInterval = DEFAULT_TERM_INDEX_INTERVAL;
@ -434,7 +434,6 @@ public class IndexWriter {
/** Flushes all changes to an index and closes all associated files. */
public synchronized void close() throws IOException {
flushRamSegments();
// testInvariants();
ramDirectory.close();
if (writeLock != null) {
writeLock.release(); // release write lock
@ -465,7 +464,7 @@ public class IndexWriter {
/** Returns the number of documents currently in this index. */
public synchronized int docCount() {
int count = 0;
int count = ramSegmentInfos.size();
for (int i = 0; i < segmentInfos.size(); i++) {
SegmentInfo si = segmentInfos.info(i);
count += si.docCount;
@ -507,18 +506,35 @@ public class IndexWriter {
DocumentWriter dw =
new DocumentWriter(ramDirectory, analyzer, this);
dw.setInfoStream(infoStream);
String segmentName = newSegmentName();
String segmentName = newRAMSegmentName();
dw.addDocument(segmentName, doc);
synchronized (this) {
segmentInfos.addElement(new SegmentInfo(segmentName, 1, ramDirectory));
singleDocSegmentsCount++;
maybeMergeSegments();
ramSegmentInfos.addElement(new SegmentInfo(segmentName, 1, ramDirectory));
maybeFlushRamSegments();
}
// testInvariants();
}
final int getSegmentsCounter(){
return segmentInfos.counter;
// for test purpose
final synchronized int getRAMSegmentCount() {
return ramSegmentInfos.size();
}
private final synchronized String newRAMSegmentName() {
return "_ram_" + Integer.toString(ramSegmentInfos.counter++, Character.MAX_RADIX);
}
// for test purpose
final synchronized int getSegmentCount(){
return segmentInfos.size();
}
// for test purpose
final synchronized int getDocCount(int i) {
if (i >= 0 && i < segmentInfos.size()) {
return segmentInfos.info(i).docCount;
} else {
return -1;
}
}
private final synchronized String newSegmentName() {
@ -577,9 +593,8 @@ public class IndexWriter {
(!SegmentReader.usesCompoundFile(segmentInfos.info(0)) ||
SegmentReader.hasSeparateNorms(segmentInfos.info(0))))))) {
int minSegment = segmentInfos.size() - mergeFactor;
mergeSegments(minSegment < 0 ? 0 : minSegment);
mergeSegments(segmentInfos, minSegment < 0 ? 0 : minSegment, segmentInfos.size());
}
// testInvariants();
}
/** Merges all segments from an array of indexes into this index.
@ -610,12 +625,11 @@ public class IndexWriter {
for (int base = start; base < segmentInfos.size(); base++) {
int end = Math.min(segmentInfos.size(), base+mergeFactor);
if (end-base > 1)
mergeSegments(base, end);
mergeSegments(segmentInfos, base, end);
}
}
optimize(); // final cleanup
// testInvariants();
}
/** Merges the provided indexes into this index.
@ -675,69 +689,131 @@ public class IndexWriter {
// delete now unused files of segment
deleteFiles(filesToDelete);
}
// testInvariants();
}
/** Merges all RAM-resident segments. */
private final void flushRamSegments() throws IOException {
int minSegment = segmentInfos.size()-1;
int docCount = 0;
while (minSegment >= 0 &&
(segmentInfos.info(minSegment)).dir == ramDirectory) {
docCount += segmentInfos.info(minSegment).docCount;
minSegment--;
// Overview of merge policy:
//
// A flush is triggered either by close() or by the number of ram segments
// reaching maxBufferedDocs. After a disk segment is created by the flush,
// further merges may be triggered.
//
// LowerBound and upperBound set the limits on the doc count of a segment
// which may be merged. Initially, lowerBound is set to 0 and upperBound
// to maxBufferedDocs. Starting from the rightmost* segment whose doc count
// > lowerBound and <= upperBound, count the number of consecutive segments
// whose doc count <= upperBound.
//
// Case 1: number of worthy segments < mergeFactor, no merge, done.
// Case 2: number of worthy segments == mergeFactor, merge these segments.
// If the doc count of the merged segment <= upperBound, done.
// Otherwise, set lowerBound to upperBound, and multiply upperBound
// by mergeFactor, go through the process again.
// Case 3: number of worthy segments > mergeFactor (in the case mergeFactor
// M changes), merge the leftmost* M segments. If the doc count of
// the merged segment <= upperBound, consider the merged segment for
// further merges on this same level. Merge the now leftmost* M
// segments, and so on, until number of worthy segments < mergeFactor.
// If the doc count of all the merged segments <= upperBound, done.
// Otherwise, set lowerBound to upperBound, and multiply upperBound
// by mergeFactor, go through the process again.
// Note that case 2 can be considerd as a special case of case 3.
//
// This merge policy guarantees two invariants if M does not change and
// segment doc count is not reaching maxMergeDocs:
// B for maxBufferedDocs, f(n) defined as ceil(log_M(ceil(n/B)))
// 1: If i (left*) and i+1 (right*) are two consecutive segments of doc
// counts x and y, then f(x) >= f(y).
// 2: The number of committed segments on the same level (f(n)) <= M.
private final void maybeFlushRamSegments() throws IOException {
if (ramSegmentInfos.size() >= minMergeDocs) {
flushRamSegments();
}
}
/** Merges all RAM-resident segments, then may merge segments. */
private final void flushRamSegments() throws IOException {
if (ramSegmentInfos.size() > 0) {
if (mergeSegments(ramSegmentInfos, 0, ramSegmentInfos.size()) > 0) {
maybeMergeSegments();
}
}
if (minSegment < 0 || // add one FS segment?
(docCount + segmentInfos.info(minSegment).docCount) > mergeFactor ||
!(segmentInfos.info(segmentInfos.size()-1).dir == ramDirectory))
minSegment++;
if (minSegment >= segmentInfos.size())
return; // none to merge
mergeSegments(minSegment);
}
/** Incremental segment merger. */
private final void maybeMergeSegments() throws IOException {
long targetMergeDocs = minMergeDocs;
while (targetMergeDocs <= maxMergeDocs) {
// find segments smaller than current target size
int minSegment = segmentInfos.size() - singleDocSegmentsCount; // top 1-doc segments are taken for sure
int mergeDocs = singleDocSegmentsCount;
long lowerBound = 0;
long upperBound = minMergeDocs;
while (upperBound * mergeFactor <= maxMergeDocs) {
int minSegment = segmentInfos.size();
int maxSegment = -1;
// find merge-worthy segments
while (--minSegment >= 0) {
SegmentInfo si = segmentInfos.info(minSegment);
if (si.docCount >= targetMergeDocs)
if (maxSegment == -1 && si.docCount > lowerBound && si.docCount <= upperBound) {
// start from the rightmost* segment whose doc count is in bounds
maxSegment = minSegment;
} else if (si.docCount > upperBound) {
// until the segment whose doc count exceeds upperBound
break;
mergeDocs += si.docCount;
}
}
if (mergeDocs >= targetMergeDocs) // found a merge to do
mergeSegments(minSegment+1);
else
break;
minSegment++;
maxSegment++;
int numSegments = maxSegment - minSegment;
targetMergeDocs *= mergeFactor; // increase target size
if (numSegments < mergeFactor) {
break;
} else {
boolean exceedsUpperLimit = false;
// number of merge-worthy segments may exceed mergeFactor when
// mergeFactor and/or maxBufferedDocs change(s)
while (numSegments >= mergeFactor) {
// merge the leftmost* mergeFactor segments
int docCount = mergeSegments(segmentInfos, minSegment, minSegment + mergeFactor);
numSegments -= mergeFactor;
if (docCount > upperBound) {
// continue to merge the rest of the worthy segments on this level
minSegment++;
exceedsUpperLimit = true;
} else if (docCount > 0) {
// if the merged segment does not exceed upperBound, consider
// this segment for further merges on this same level
numSegments++;
}
}
if (!exceedsUpperLimit) {
// if none of the merged segments exceed upperBound, done
break;
}
}
lowerBound = upperBound;
upperBound *= mergeFactor;
}
}
/** Pops segments off of segmentInfos stack down to minSegment, merges them,
and pushes the merged index onto the top of the segmentInfos stack. */
private final void mergeSegments(int minSegment)
throws IOException {
mergeSegments(minSegment, segmentInfos.size());
}
/** Merges the named range of segments, replacing them in the stack with a
* single segment. */
private final void mergeSegments(int minSegment, int end)
/**
* Merges the named range of segments, replacing them in the stack with a
* single segment.
*/
private final int mergeSegments(SegmentInfos sourceSegments, int minSegment, int end)
throws IOException {
final String mergedName = newSegmentName();
if (infoStream != null) infoStream.print("merging segments");
SegmentMerger merger = new SegmentMerger(this, mergedName);
boolean fromRAM = false;
final Vector segmentsToDelete = new Vector();
for (int i = minSegment; i < end; i++) {
SegmentInfo si = segmentInfos.info(i);
SegmentInfo si = sourceSegments.info(i);
if (infoStream != null)
infoStream.print(" " + si.name + " (" + si.docCount + " docs)");
IndexReader reader = SegmentReader.get(si);
@ -745,11 +821,9 @@ public class IndexWriter {
if ((reader.directory() == this.directory) || // if we own the directory
(reader.directory() == this.ramDirectory))
segmentsToDelete.addElement(reader); // queue segment for deletion
}
// update 1-doc segments counter accordin to range of merged segments
if (singleDocSegmentsCount>0) {
singleDocSegmentsCount = Math.min(singleDocSegmentsCount, segmentInfos.size()-end);
if (!fromRAM && (reader.directory() == this.ramDirectory)) {
fromRAM = true;
}
}
int mergedDocCount = merger.merge();
@ -758,10 +832,20 @@ public class IndexWriter {
infoStream.println(" into "+mergedName+" ("+mergedDocCount+" docs)");
}
for (int i = end-1; i > minSegment; i--) // remove old infos & add new
segmentInfos.remove(i);
segmentInfos.set(minSegment, new SegmentInfo(mergedName, mergedDocCount,
directory));
SegmentInfo newSegment = new SegmentInfo(mergedName, mergedDocCount,
directory);
if (fromRAM) {
sourceSegments.removeAllElements();
if (mergedDocCount > 0)
segmentInfos.addElement(newSegment);
} else {
for (int i = end-1; i > minSegment; i--) // remove old infos & add new
sourceSegments.remove(i);
if (mergedDocCount > 0)
segmentInfos.set(minSegment, newSegment);
else
sourceSegments.remove(minSegment);
}
// close readers before we attempt to delete now-obsolete segments
merger.closeReaders();
@ -792,52 +876,10 @@ public class IndexWriter {
// delete now unused files of segment
deleteFiles(filesToDelete);
}
return mergedDocCount;
}
/***
private synchronized void testInvariants() {
// index segments should decrease in size
int maxSegLevel = 0;
for (int i=segmentInfos.size()-1; i>=0; i--) {
SegmentInfo si = segmentInfos.info(i);
int segLevel = (si.docCount)/minMergeDocs;
if (segLevel < maxSegLevel) {
throw new RuntimeException("Segment #" + i + " is too small. " + segInfo());
}
maxSegLevel = Math.max(maxSegLevel,segLevel);
}
// check if merges needed
long targetMergeDocs = minMergeDocs;
int minSegment = segmentInfos.size();
while (targetMergeDocs <= maxMergeDocs && minSegment>=0) {
int mergeDocs = 0;
while (--minSegment >= 0) {
SegmentInfo si = segmentInfos.info(minSegment);
if (si.docCount >= targetMergeDocs) break;
mergeDocs += si.docCount;
}
if (mergeDocs >= targetMergeDocs) {
throw new RuntimeException("Merge needed at level "+targetMergeDocs + " :"+segInfo());
}
targetMergeDocs *= mergeFactor; // increase target size
}
}
private String segInfo() {
StringBuffer sb = new StringBuffer("minMergeDocs="+minMergeDocs+" singleDocSegmentsCount="+singleDocSegmentsCount+" segsizes:");
for (int i=0; i<segmentInfos.size(); i++) {
sb.append(segmentInfos.info(i).docCount);
sb.append(",");
}
return sb.toString();
}
***/
/*
* Some operating systems (e.g. Windows) don't permit a file to be deleted
* while it is opened for read (e.g. by another process or thread). So we

View File

@ -64,7 +64,7 @@ public class TestIndexModifier extends TestCase {
i.addDocument(getDoc());
i.addDocument(getDoc());
i.flush();
assertEquals(3, i.docCount());
// depend on merge policy - assertEquals(3, i.docCount());
i.deleteDocuments(allDocTerm);
assertEquals(0, i.docCount());
i.optimize();

View File

@ -0,0 +1,218 @@
package org.apache.lucene.index;
import java.io.IOException;
import org.apache.lucene.analysis.WhitespaceAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.RAMDirectory;
import junit.framework.TestCase;
public class TestIndexWriterMergePolicy extends TestCase {
// Test the normal case
public void testNormalCase() throws IOException {
Directory dir = new RAMDirectory();
IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer(), true);
writer.setMaxBufferedDocs(10);
writer.setMergeFactor(10);
for (int i = 0; i < 100; i++) {
addDoc(writer);
checkInvariants(writer);
}
writer.close();
}
// Test to see if there is over merge
public void testNoOverMerge() throws IOException {
Directory dir = new RAMDirectory();
IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer(), true);
writer.setMaxBufferedDocs(10);
writer.setMergeFactor(10);
boolean noOverMerge = false;
for (int i = 0; i < 100; i++) {
addDoc(writer);
checkInvariants(writer);
if (writer.getRAMSegmentCount() + writer.getSegmentCount() >= 18) {
noOverMerge = true;
}
}
assertTrue(noOverMerge);
writer.close();
}
// Test the case where flush is forced after every addDoc
public void testForceFlush() throws IOException {
Directory dir = new RAMDirectory();
IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer(), true);
writer.setMaxBufferedDocs(10);
writer.setMergeFactor(10);
for (int i = 0; i < 100; i++) {
addDoc(writer);
writer.close();
writer = new IndexWriter(dir, new WhitespaceAnalyzer(), false);
writer.setMaxBufferedDocs(10);
writer.setMergeFactor(10);
checkInvariants(writer);
}
writer.close();
}
// Test the case where mergeFactor changes
public void testMergeFactorChange() throws IOException {
Directory dir = new RAMDirectory();
IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer(), true);
writer.setMaxBufferedDocs(10);
writer.setMergeFactor(100);
for (int i = 0; i < 250; i++) {
addDoc(writer);
checkInvariants(writer);
}
writer.setMergeFactor(5);
// merge policy only fixes segments on levels where merges
// have been triggered, so check invariants after all adds
for (int i = 0; i < 10; i++) {
addDoc(writer);
}
checkInvariants(writer);
writer.close();
}
// Test the case where both mergeFactor and maxBufferedDocs change
public void testMaxBufferedDocsChange() throws IOException {
Directory dir = new RAMDirectory();
IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer(), true);
writer.setMaxBufferedDocs(101);
writer.setMergeFactor(101);
// leftmost* segment has 1 doc
// rightmost* segment has 100 docs
for (int i = 1; i <= 100; i++) {
for (int j = 0; j < i; j++) {
addDoc(writer);
checkInvariants(writer);
}
writer.close();
writer = new IndexWriter(dir, new WhitespaceAnalyzer(), false);
writer.setMaxBufferedDocs(101);
writer.setMergeFactor(101);
}
writer.setMaxBufferedDocs(10);
writer.setMergeFactor(10);
// merge policy only fixes segments on levels where merges
// have been triggered, so check invariants after all adds
for (int i = 0; i < 100; i++) {
addDoc(writer);
}
checkInvariants(writer);
for (int i = 100; i < 1000; i++) {
addDoc(writer);
}
checkInvariants(writer);
writer.close();
}
// Test the case where a merge results in no doc at all
public void testMergeDocCount0() throws IOException {
Directory dir = new RAMDirectory();
IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer(), true);
writer.setMaxBufferedDocs(10);
writer.setMergeFactor(100);
for (int i = 0; i < 250; i++) {
addDoc(writer);
checkInvariants(writer);
}
writer.close();
IndexReader reader = IndexReader.open(dir);
reader.deleteDocuments(new Term("content", "aaa"));
reader.close();
writer = new IndexWriter(dir, new WhitespaceAnalyzer(), false);
writer.setMaxBufferedDocs(10);
writer.setMergeFactor(5);
for (int i = 0; i < 10; i++) {
addDoc(writer);
checkInvariants(writer);
}
checkInvariants(writer);
assertEquals(10, writer.docCount());
writer.close();
}
private void addDoc(IndexWriter writer) throws IOException {
Document doc = new Document();
doc.add(new Field("content", "aaa", Field.Store.NO, Field.Index.TOKENIZED));
writer.addDocument(doc);
}
private void checkInvariants(IndexWriter writer) {
int maxBufferedDocs = writer.getMaxBufferedDocs();
int mergeFactor = writer.getMergeFactor();
int maxMergeDocs = writer.getMaxMergeDocs();
int ramSegmentCount = writer.getRAMSegmentCount();
assertTrue(ramSegmentCount < maxBufferedDocs);
int lowerBound = 0;
int upperBound = maxBufferedDocs;
int numSegments = 0;
int segmentCount = writer.getSegmentCount();
for (int i = segmentCount - 1; i >= 0; i--) {
int docCount = writer.getDocCount(i);
assertTrue(docCount > lowerBound);
if (docCount <= upperBound) {
segmentCount++;
} else {
if (upperBound * mergeFactor <= maxMergeDocs) {
assertTrue(numSegments < mergeFactor);
}
lowerBound = upperBound;
upperBound *= mergeFactor;
segmentCount = 1;
}
}
if (upperBound * mergeFactor <= maxMergeDocs) {
assertTrue(numSegments < mergeFactor);
}
}
private void printSegmentDocCounts(IndexWriter writer) {
int segmentCount = writer.getSegmentCount();
System.out.println("" + segmentCount + " segments total");
for (int i = 0; i < segmentCount; i++) {
System.out.println(" segment " + i + " has " + writer.getDocCount(i)
+ " docs");
}
}
}