mirror of https://github.com/apache/lucene.git
LUCENE-982: add new method optimize(int maxNumSegments) to IndexWriter
git-svn-id: https://svn.apache.org/repos/asf/lucene/java/trunk@599766 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
261382fd4e
commit
97fbfb5427
|
@ -72,6 +72,11 @@ API Changes
|
|||
setData(byte[] data, int offset, int length), getData(), getOffset()
|
||||
and clone() methods to o.a.l.index.Payload. Also add the field name
|
||||
as arg to Similarity.scorePayload(). (Michael Busch)
|
||||
|
||||
9. LUCENE-982: Add IndexWriter.optimize(int maxNumSegments) method to
|
||||
"partially optimize" an index down to maxNumSegments segments.
|
||||
(Mike McCandless)
|
||||
|
||||
|
||||
Bug fixes
|
||||
|
||||
|
|
|
@ -1654,14 +1654,37 @@ public class IndexWriter {
|
|||
optimize(true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Optimize the index down to <= maxNumSegments. If
|
||||
* maxNumSegments==1 then this is the same as {@link
|
||||
* #optimize()}.
|
||||
* @param maxNumSegments maximum number of segments left
|
||||
* in the index after optimization finishes
|
||||
*/
|
||||
public void optimize(int maxNumSegments) throws CorruptIndexException, IOException {
|
||||
optimize(maxNumSegments, true);
|
||||
}
|
||||
|
||||
/** Just like {@link #optimize()}, except you can specify
|
||||
* whether the call should block until the optimize
|
||||
* completes. This is only meaningful with a
|
||||
* {@link MergeScheduler} that is able to run merges in
|
||||
* background threads. */
|
||||
public void optimize(boolean doWait) throws CorruptIndexException, IOException {
|
||||
optimize(1, true);
|
||||
}
|
||||
|
||||
/** Just like {@link #optimize(int)}, except you can
|
||||
* specify whether the call should block until the
|
||||
* optimize completes. This is only meaningful with a
|
||||
* {@link MergeScheduler} that is able to run merges in
|
||||
* background threads. */
|
||||
public void optimize(int maxNumSegments, boolean doWait) throws CorruptIndexException, IOException {
|
||||
ensureOpen();
|
||||
|
||||
if (maxNumSegments < 1)
|
||||
throw new IllegalArgumentException("maxNumSegments must be >= 1; got " + maxNumSegments);
|
||||
|
||||
if (infoStream != null)
|
||||
message("optimize: index now " + segString());
|
||||
|
||||
|
@ -1677,15 +1700,21 @@ public class IndexWriter {
|
|||
// Now mark all pending & running merges as optimize
|
||||
// merge:
|
||||
Iterator it = pendingMerges.iterator();
|
||||
while(it.hasNext())
|
||||
((MergePolicy.OneMerge) it.next()).optimize = true;
|
||||
while(it.hasNext()) {
|
||||
final MergePolicy.OneMerge merge = (MergePolicy.OneMerge) it.next();
|
||||
merge.optimize = true;
|
||||
merge.maxNumSegmentsOptimize = maxNumSegments;
|
||||
}
|
||||
|
||||
it = runningMerges.iterator();
|
||||
while(it.hasNext())
|
||||
((MergePolicy.OneMerge) it.next()).optimize = true;
|
||||
while(it.hasNext()) {
|
||||
final MergePolicy.OneMerge merge = (MergePolicy.OneMerge) it.next();
|
||||
merge.optimize = true;
|
||||
merge.maxNumSegmentsOptimize = maxNumSegments;
|
||||
}
|
||||
}
|
||||
|
||||
maybeMerge(true);
|
||||
maybeMerge(maxNumSegments, true);
|
||||
|
||||
if (doWait) {
|
||||
synchronized(this) {
|
||||
|
@ -1748,25 +1777,29 @@ public class IndexWriter {
|
|||
}
|
||||
|
||||
private final void maybeMerge(boolean optimize) throws CorruptIndexException, IOException {
|
||||
updatePendingMerges(optimize);
|
||||
maybeMerge(1, optimize);
|
||||
}
|
||||
|
||||
private final void maybeMerge(int maxNumSegmentsOptimize, boolean optimize) throws CorruptIndexException, IOException {
|
||||
updatePendingMerges(maxNumSegmentsOptimize, optimize);
|
||||
mergeScheduler.merge(this);
|
||||
}
|
||||
|
||||
private synchronized void updatePendingMerges(boolean optimize)
|
||||
private synchronized void updatePendingMerges(int maxNumSegmentsOptimize, boolean optimize)
|
||||
throws CorruptIndexException, IOException {
|
||||
assert !optimize || maxNumSegmentsOptimize > 0;
|
||||
|
||||
final MergePolicy.MergeSpecification spec;
|
||||
if (optimize) {
|
||||
// Currently hardwired to 1, but once we add method to
|
||||
// IndexWriter to allow "optimizing to <= N segments"
|
||||
// then we will change this.
|
||||
final int maxSegmentCount = 1;
|
||||
spec = mergePolicy.findMergesForOptimize(segmentInfos, this, maxSegmentCount, segmentsToOptimize);
|
||||
spec = mergePolicy.findMergesForOptimize(segmentInfos, this, maxNumSegmentsOptimize, segmentsToOptimize);
|
||||
|
||||
if (spec != null) {
|
||||
final int numMerges = spec.merges.size();
|
||||
for(int i=0;i<numMerges;i++)
|
||||
((MergePolicy.OneMerge) spec.merges.get(i)).optimize = true;
|
||||
for(int i=0;i<numMerges;i++) {
|
||||
final MergePolicy.OneMerge merge = ((MergePolicy.OneMerge) spec.merges.get(i));
|
||||
merge.optimize = true;
|
||||
merge.maxNumSegmentsOptimize = maxNumSegmentsOptimize;
|
||||
}
|
||||
}
|
||||
|
||||
} else
|
||||
|
@ -2737,6 +2770,7 @@ public class IndexWriter {
|
|||
throws CorruptIndexException, IOException {
|
||||
|
||||
assert merge.registerDone;
|
||||
assert !merge.optimize || merge.maxNumSegmentsOptimize > 0;
|
||||
|
||||
boolean success = false;
|
||||
|
||||
|
@ -2753,23 +2787,24 @@ public class IndexWriter {
|
|||
success = true;
|
||||
} finally {
|
||||
synchronized(this) {
|
||||
if (!success && infoStream != null)
|
||||
message("hit exception during merge");
|
||||
try {
|
||||
if (!success && infoStream != null)
|
||||
message("hit exception during merge");
|
||||
|
||||
mergeFinish(merge);
|
||||
mergeFinish(merge);
|
||||
|
||||
// This merge (and, generally, any change to the
|
||||
// segments) may now enable new merges, so we call
|
||||
// merge policy & update pending merges.
|
||||
if (success && !merge.isAborted() && !closed && !closing)
|
||||
updatePendingMerges(merge.optimize);
|
||||
|
||||
runningMerges.remove(merge);
|
||||
|
||||
// Optimize may be waiting on the final optimize
|
||||
// merge to finish; and finishMerges() may be
|
||||
// waiting for all merges to finish:
|
||||
notifyAll();
|
||||
// This merge (and, generally, any change to the
|
||||
// segments) may now enable new merges, so we call
|
||||
// merge policy & update pending merges.
|
||||
if (success && !merge.isAborted() && !closed && !closing)
|
||||
updatePendingMerges(merge.maxNumSegmentsOptimize, merge.optimize);
|
||||
} finally {
|
||||
runningMerges.remove(merge);
|
||||
// Optimize may be waiting on the final optimize
|
||||
// merge to finish; and finishMerges() may be
|
||||
// waiting for all merges to finish:
|
||||
notifyAll();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -2992,8 +3027,7 @@ public class IndexWriter {
|
|||
SegmentInfo si = sourceSegmentsClone.info(i);
|
||||
IndexReader reader = SegmentReader.get(si, MERGE_READ_BUFFER_SIZE, merge.mergeDocStores); // no need to set deleter (yet)
|
||||
merger.add(reader);
|
||||
if (infoStream != null)
|
||||
totDocCount += reader.numDocs();
|
||||
totDocCount += reader.numDocs();
|
||||
}
|
||||
if (infoStream != null) {
|
||||
message("merge: total "+totDocCount+" docs");
|
||||
|
@ -3001,8 +3035,7 @@ public class IndexWriter {
|
|||
|
||||
mergedDocCount = merge.info.docCount = merger.merge(merge.mergeDocStores);
|
||||
|
||||
if (infoStream != null)
|
||||
assert mergedDocCount == totDocCount;
|
||||
assert mergedDocCount == totDocCount;
|
||||
|
||||
success = true;
|
||||
|
||||
|
|
|
@ -169,35 +169,74 @@ public abstract class LogMergePolicy extends MergePolicy {
|
|||
* in use may make use of concurrency. */
|
||||
public MergeSpecification findMergesForOptimize(SegmentInfos infos, IndexWriter writer, int maxNumSegments, Set segmentsToOptimize) throws IOException {
|
||||
MergeSpecification spec;
|
||||
|
||||
|
||||
assert maxNumSegments > 0;
|
||||
|
||||
if (!isOptimized(infos, writer, maxNumSegments, segmentsToOptimize)) {
|
||||
|
||||
int numSegments = infos.size();
|
||||
while(numSegments > 0) {
|
||||
final SegmentInfo info = infos.info(--numSegments);
|
||||
// Find the newest (rightmost) segment that needs to
|
||||
// be optimized (other segments may have been flushed
|
||||
// since optimize started):
|
||||
int last = infos.size();
|
||||
while(last > 0) {
|
||||
final SegmentInfo info = infos.info(--last);
|
||||
if (segmentsToOptimize.contains(info)) {
|
||||
numSegments++;
|
||||
last++;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (numSegments > 0) {
|
||||
if (last > 0) {
|
||||
|
||||
spec = new MergeSpecification();
|
||||
while (numSegments > 0) {
|
||||
|
||||
final int first;
|
||||
if (numSegments > mergeFactor)
|
||||
first = numSegments-mergeFactor;
|
||||
else
|
||||
first = 0;
|
||||
|
||||
if (numSegments > 1 || !isOptimized(writer, infos.info(0)))
|
||||
spec.add(new OneMerge(infos.range(first, numSegments), useCompoundFile));
|
||||
|
||||
numSegments -= mergeFactor;
|
||||
// First, enroll all "full" merges (size
|
||||
// mergeFactor) to potentially be run concurrently:
|
||||
while (last - maxNumSegments + 1 >= mergeFactor) {
|
||||
spec.add(new OneMerge(infos.range(last-mergeFactor, last), useCompoundFile));
|
||||
last -= mergeFactor;
|
||||
}
|
||||
|
||||
// Only if there are no full merges pending do we
|
||||
// add a final partial (< mergeFactor segments) merge:
|
||||
if (0 == spec.merges.size()) {
|
||||
if (maxNumSegments == 1) {
|
||||
|
||||
// Since we must optimize down to 1 segment, the
|
||||
// choice is simple:
|
||||
if (last > 1 || !isOptimized(writer, infos.info(0)))
|
||||
spec.add(new OneMerge(infos.range(0, last), useCompoundFile));
|
||||
} else if (last > maxNumSegments) {
|
||||
|
||||
// Take care to pick a partial merge that is
|
||||
// least cost, but does not make the index too
|
||||
// lopsided. If we always just picked the
|
||||
// partial tail then we could produce a highly
|
||||
// lopsided index over time:
|
||||
|
||||
// We must merge this many segments to leave
|
||||
// maxNumSegments in the index (from when
|
||||
// optimize was first kicked off):
|
||||
final int finalMergeSize = last - maxNumSegments + 1;
|
||||
|
||||
// Consider all possible starting points:
|
||||
long bestSize = 0;
|
||||
int bestStart = 0;
|
||||
|
||||
for(int i=0;i<last-finalMergeSize+1;i++) {
|
||||
long sumSize = 0;
|
||||
for(int j=0;j<finalMergeSize;j++)
|
||||
sumSize += size(infos.info(j+i));
|
||||
if (i == 0 || (sumSize < 2*size(infos.info(i-1)) && sumSize < bestSize)) {
|
||||
bestStart = i;
|
||||
bestSize = sumSize;
|
||||
}
|
||||
}
|
||||
|
||||
spec.add(new OneMerge(infos.range(bestStart, bestStart+finalMergeSize), useCompoundFile));
|
||||
}
|
||||
}
|
||||
|
||||
} else
|
||||
spec = null;
|
||||
} else
|
||||
|
|
|
@ -70,6 +70,7 @@ public abstract class MergePolicy {
|
|||
boolean registerDone; // used by IndexWriter
|
||||
long mergeGen; // used by IndexWriter
|
||||
boolean isExternal; // used by IndexWriter
|
||||
int maxNumSegmentsOptimize; // used by IndexWriter
|
||||
|
||||
final SegmentInfos segments;
|
||||
final boolean useCompoundFile;
|
||||
|
|
|
@ -557,6 +557,85 @@ public class TestIndexWriter extends LuceneTestCase
|
|||
dir.close();
|
||||
}
|
||||
|
||||
public void testOptimizeMaxNumSegments() throws IOException {
|
||||
|
||||
MockRAMDirectory dir = new MockRAMDirectory();
|
||||
|
||||
final Document doc = new Document();
|
||||
doc.add(new Field("content", "aaa", Field.Store.YES, Field.Index.TOKENIZED));
|
||||
|
||||
for(int numDocs=38;numDocs<500;numDocs += 38) {
|
||||
IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer(), true);
|
||||
LogDocMergePolicy ldmp = new LogDocMergePolicy();
|
||||
ldmp.setMinMergeDocs(1);
|
||||
writer.setMergePolicy(ldmp);
|
||||
writer.setMergeFactor(5);
|
||||
writer.setMaxBufferedDocs(2);
|
||||
for(int j=0;j<numDocs;j++)
|
||||
writer.addDocument(doc);
|
||||
writer.close();
|
||||
|
||||
SegmentInfos sis = new SegmentInfos();
|
||||
sis.read(dir);
|
||||
final int segCount = sis.size();
|
||||
|
||||
writer = new IndexWriter(dir, new WhitespaceAnalyzer());
|
||||
writer.setMergePolicy(ldmp);
|
||||
writer.setMergeFactor(5);
|
||||
writer.optimize(3);
|
||||
writer.close();
|
||||
|
||||
sis = new SegmentInfos();
|
||||
sis.read(dir);
|
||||
final int optSegCount = sis.size();
|
||||
|
||||
if (segCount < 3)
|
||||
assertEquals(segCount, optSegCount);
|
||||
else
|
||||
assertEquals(3, optSegCount);
|
||||
}
|
||||
}
|
||||
|
||||
public void testOptimizeMaxNumSegments2() throws IOException {
|
||||
MockRAMDirectory dir = new MockRAMDirectory();
|
||||
|
||||
final Document doc = new Document();
|
||||
doc.add(new Field("content", "aaa", Field.Store.YES, Field.Index.TOKENIZED));
|
||||
|
||||
IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer(), true);
|
||||
LogDocMergePolicy ldmp = new LogDocMergePolicy();
|
||||
ldmp.setMinMergeDocs(1);
|
||||
writer.setMergePolicy(ldmp);
|
||||
writer.setMergeFactor(4);
|
||||
writer.setMaxBufferedDocs(2);
|
||||
|
||||
for(int iter=0;iter<10;iter++) {
|
||||
|
||||
for(int i=0;i<19;i++)
|
||||
writer.addDocument(doc);
|
||||
|
||||
writer.flush();
|
||||
|
||||
SegmentInfos sis = new SegmentInfos();
|
||||
((ConcurrentMergeScheduler) writer.getMergeScheduler()).sync();
|
||||
sis.read(dir);
|
||||
|
||||
final int segCount = sis.size();
|
||||
|
||||
writer.optimize(7);
|
||||
|
||||
sis = new SegmentInfos();
|
||||
((ConcurrentMergeScheduler) writer.getMergeScheduler()).sync();
|
||||
sis.read(dir);
|
||||
final int optSegCount = sis.size();
|
||||
|
||||
if (segCount < 7)
|
||||
assertEquals(segCount, optSegCount);
|
||||
else
|
||||
assertEquals(7, optSegCount);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Make sure optimize doesn't use any more than 1X
|
||||
* starting index size as its temporary free space
|
||||
|
|
Loading…
Reference in New Issue