LUCENE-5883: Move MergePolicy to LiveIndexWriterConfig

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1617910 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Shai Erera 2014-08-14 11:18:51 +00:00
parent e0c00539b5
commit c8e9209383
14 changed files with 69 additions and 92 deletions

View File

@ -168,6 +168,12 @@ API Changes
to be just a reference to a section of a larger byte[] and BytesRefBuilder
which is a StringBuilder-like class for BytesRef instances. (Adrien Grand)
* LUCENE-5883: You can now change the MergePolicy instance on a live IndexWriter,
without first closing and reopening the writer. This allows to e.g. run a special
merge with UpgradeIndexMergePolicy without reopening the writer. Also, MergePolicy
no longer implements Closeable; if you need to release your custom MegePolicy's
resources, you need to implement close() and call it explicitly. (Shai Erera)
Optimizations
* LUCENE-5780: Make OrdinalMap more memory-efficient, especially in case the

View File

@ -279,7 +279,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
// merges
private HashSet<SegmentCommitInfo> mergingSegments = new HashSet<>();
private MergePolicy mergePolicy;
private final MergeScheduler mergeScheduler;
private LinkedList<MergePolicy.OneMerge> pendingMerges = new LinkedList<>();
private Set<MergePolicy.OneMerge> runningMerges = new HashSet<>();
@ -443,7 +442,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
}
}
if (anySegmentFlushed) {
maybeMerge(MergeTrigger.FULL_FLUSH, UNBOUNDED_MAX_MERGE_SEGMENTS);
maybeMerge(config.getMergePolicy(), MergeTrigger.FULL_FLUSH, UNBOUNDED_MAX_MERGE_SEGMENTS);
}
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "getReader took " + (System.currentTimeMillis() - tStart) + " msec");
@ -743,7 +742,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
directory = d;
analyzer = config.getAnalyzer();
infoStream = config.getInfoStream();
mergePolicy = config.getMergePolicy();
mergeScheduler = config.getMergeScheduler();
codec = config.getCodec();
@ -1686,7 +1684,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
}
}
maybeMerge(MergeTrigger.EXPLICIT, maxNumSegments);
maybeMerge(config.getMergePolicy(), MergeTrigger.EXPLICIT, maxNumSegments);
if (doWait) {
synchronized(this) {
@ -1766,6 +1764,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
infoStream.message("IW", "forceMergeDeletes: index now " + segString());
}
final MergePolicy mergePolicy = config.getMergePolicy();
MergePolicy.MergeSpecification spec;
boolean newMergesFound = false;
synchronized(this) {
@ -1863,16 +1862,16 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* href="#OOME">above</a> for details.</p>
*/
public final void maybeMerge() throws IOException {
maybeMerge(MergeTrigger.EXPLICIT, UNBOUNDED_MAX_MERGE_SEGMENTS);
maybeMerge(config.getMergePolicy(), MergeTrigger.EXPLICIT, UNBOUNDED_MAX_MERGE_SEGMENTS);
}
private final void maybeMerge(MergeTrigger trigger, int maxNumSegments) throws IOException {
private final void maybeMerge(MergePolicy mergePolicy, MergeTrigger trigger, int maxNumSegments) throws IOException {
ensureOpen(false);
boolean newMergesFound = updatePendingMerges(trigger, maxNumSegments);
boolean newMergesFound = updatePendingMerges(mergePolicy, trigger, maxNumSegments);
mergeScheduler.merge(this, trigger, newMergesFound);
}
private synchronized boolean updatePendingMerges(MergeTrigger trigger, int maxNumSegments)
private synchronized boolean updatePendingMerges(MergePolicy mergePolicy, MergeTrigger trigger, int maxNumSegments)
throws IOException {
// In case infoStream was disabled on init, but then enabled at some
@ -1996,10 +1995,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
infoStream.message("IW", "rollback: done finish merges");
}
// Must pre-close these two, in case they increment
// changeCount so that we can then set it to false
// before calling closeInternal
mergePolicy.close();
// Must pre-close in case it increments changeCount so that we can then
// set it to false before calling closeInternal
mergeScheduler.close();
bufferedUpdatesStream.clear();
@ -2051,9 +2048,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
} finally {
if (!success) {
// Must not hold IW's lock while closing
// mergePolicy/Scheduler: this can lead to deadlock,
// mergeScheduler: this can lead to deadlock,
// e.g. TestIW.testThreadInterruptDeadlock
IOUtils.closeWhileHandlingException(mergePolicy, mergeScheduler);
IOUtils.closeWhileHandlingException(mergeScheduler);
}
synchronized(this) {
if (!success) {
@ -2592,6 +2589,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
setDiagnostics(info, SOURCE_ADDINDEXES_READERS);
final MergePolicy mergePolicy = config.getMergePolicy();
boolean useCompoundFile;
synchronized(this) { // Guard segmentInfos
if (stopMerges) {
@ -2752,10 +2750,10 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
@Override
public final void prepareCommit() throws IOException {
ensureOpen();
prepareCommitInternal();
prepareCommitInternal(config.getMergePolicy());
}
private void prepareCommitInternal() throws IOException {
private void prepareCommitInternal(MergePolicy mergePolicy) throws IOException {
startCommitTime = System.nanoTime();
synchronized(commitLock) {
ensureOpen(false);
@ -2837,7 +2835,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
boolean success = false;
try {
if (anySegmentsFlushed) {
maybeMerge(MergeTrigger.FULL_FLUSH, UNBOUNDED_MAX_MERGE_SEGMENTS);
maybeMerge(mergePolicy, MergeTrigger.FULL_FLUSH, UNBOUNDED_MAX_MERGE_SEGMENTS);
}
startCommit(toCommit);
success = true;
@ -2913,7 +2911,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
@Override
public final void commit() throws IOException {
ensureOpen();
commitInternal();
commitInternal(config.getMergePolicy());
}
/** Returns true if there may be changes that have not been
@ -2929,7 +2927,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
return changeCount != lastCommitChangeCount || docWriter.anyChanges() || bufferedUpdatesStream.any();
}
private final void commitInternal() throws IOException {
private final void commitInternal(MergePolicy mergePolicy) throws IOException {
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "commit: start");
@ -2946,7 +2944,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "commit: now prepare");
}
prepareCommitInternal();
prepareCommitInternal(mergePolicy);
} else {
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "commit: already prepared");
@ -3025,7 +3023,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
// We can be called during close, when closing==true, so we must pass false to ensureOpen:
ensureOpen(false);
if (doFlush(applyAllDeletes) && triggerMerge) {
maybeMerge(MergeTrigger.FULL_FLUSH, UNBOUNDED_MAX_MERGE_SEGMENTS);
maybeMerge(config.getMergePolicy(), MergeTrigger.FULL_FLUSH, UNBOUNDED_MAX_MERGE_SEGMENTS);
}
}
@ -3569,6 +3567,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
final long t0 = System.currentTimeMillis();
final MergePolicy mergePolicy = config.getMergePolicy();
try {
try {
try {
@ -3581,7 +3580,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
infoStream.message("IW", "now merge\n merge=" + segString(merge.segments) + "\n index=" + segString());
}
mergeMiddle(merge);
mergeMiddle(merge, mergePolicy);
mergeSuccess(merge);
success = true;
} catch (Throwable t) {
@ -3604,7 +3603,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
// segments) may now enable new merges, so we call
// merge policy & update pending merges.
if (success && !merge.isAborted() && (merge.maxNumSegments != -1 || (!closed && !closing))) {
updatePendingMerges(MergeTrigger.MERGE_FINISHED, merge.maxNumSegments);
updatePendingMerges(mergePolicy, MergeTrigger.MERGE_FINISHED, merge.maxNumSegments);
}
}
}
@ -3882,7 +3881,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
/** Does the actual (time-consuming) work of the merge,
* but without holding synchronized lock on IndexWriter
* instance */
private int mergeMiddle(MergePolicy.OneMerge merge) throws IOException {
private int mergeMiddle(MergePolicy.OneMerge merge, MergePolicy mergePolicy) throws IOException {
merge.checkAborted(directory);
@ -4569,12 +4568,13 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
flushCount.incrementAndGet();
}
}
final void doAfterSegmentFlushed(boolean triggerMerge, boolean forcePurge) throws IOException {
try {
purge(forcePurge);
} finally {
if (triggerMerge) {
maybeMerge(MergeTrigger.SEGMENT_FLUSH, UNBOUNDED_MAX_MERGE_SEGMENTS);
maybeMerge(config.getMergePolicy(), MergeTrigger.SEGMENT_FLUSH, UNBOUNDED_MAX_MERGE_SEGMENTS);
}
}

View File

@ -280,21 +280,6 @@ public final class IndexWriterConfig extends LiveIndexWriterConfig {
return writeLockTimeout;
}
/**
* Expert: {@link MergePolicy} is invoked whenever there are changes to the
* segments in the index. Its role is to select which merges to do, if any,
* and return a {@link MergePolicy.MergeSpecification} describing the merges.
* It also selects merges to do for forceMerge.
*
* <p>Only takes effect when IndexWriter is first created. */
public IndexWriterConfig setMergePolicy(MergePolicy mergePolicy) {
if (mergePolicy == null) {
throw new IllegalArgumentException("mergePolicy must not be null");
}
this.mergePolicy = mergePolicy;
return this;
}
/**
* Set the {@link Codec}.
*
@ -496,6 +481,11 @@ public final class IndexWriterConfig extends LiveIndexWriterConfig {
return setInfoStream(new PrintStreamInfoStream(printStream));
}
@Override
public IndexWriterConfig setMergePolicy(MergePolicy mergePolicy) {
return (IndexWriterConfig) super.setMergePolicy(mergePolicy);
}
@Override
public IndexWriterConfig setMaxBufferedDeleteTerms(int maxBufferedDeleteTerms) {
return (IndexWriterConfig) super.setMaxBufferedDeleteTerms(maxBufferedDeleteTerms);

View File

@ -280,6 +280,25 @@ public class LiveIndexWriterConfig {
return maxBufferedDocs;
}
/**
* Expert: {@link MergePolicy} is invoked whenever there are changes to the
* segments in the index. Its role is to select which merges to do, if any,
* and return a {@link MergePolicy.MergeSpecification} describing the merges.
* It also selects merges to do for forceMerge.
*
* <p>
* Takes effect on subsequent merge selections. Any merges in flight or any
* merges already registered by the previous {@link MergePolicy} are not
* affected.
*/
public LiveIndexWriterConfig setMergePolicy(MergePolicy mergePolicy) {
if (mergePolicy == null) {
throw new IllegalArgumentException("mergePolicy must not be null");
}
this.mergePolicy = mergePolicy;
return this;
}
/**
* Set the merged segment warmer. See {@link IndexReaderWarmer}.
*

View File

@ -145,10 +145,6 @@ public abstract class LogMergePolicy extends MergePolicy {
return calibrateSizeByDeletes;
}
@Override
public void close() {}
/** Return the number of documents in the provided {@link
* SegmentCommitInfo}, pro-rated by percentage of
* non-deleted documents if {@link

View File

@ -55,7 +55,7 @@ import java.util.Map;
*
* @lucene.experimental
*/
public abstract class MergePolicy implements java.io.Closeable {
public abstract class MergePolicy {
/** A map of doc IDs. */
public static abstract class DocMap {
@ -446,12 +446,6 @@ public abstract class MergePolicy implements java.io.Closeable {
public abstract MergeSpecification findForcedDeletesMerges(
SegmentInfos segmentInfos, IndexWriter writer) throws IOException;
/**
* Release all resources for the policy.
*/
@Override
public abstract void close();
/**
* Returns true if a new segment (regardless of its origin) should use the
* compound file format. The default implementation returns <code>true</code>

View File

@ -34,9 +34,6 @@ public final class NoMergePolicy extends MergePolicy {
super();
}
@Override
public void close() {}
@Override
public MergeSpecification findMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, IndexWriter writer) { return null; }

View File

@ -18,14 +18,14 @@ package org.apache.lucene.index;
*/
import java.io.IOException;
import java.util.Locale;
import java.util.Map;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.ArrayList;
import java.util.Locale;
import java.util.Map;
/**
* Merges segments of approximately equal size, subject to
@ -616,10 +616,6 @@ public class TieredMergePolicy extends MergePolicy {
return spec;
}
@Override
public void close() {
}
private long floorSize(long bytes) {
return Math.max(floorSegmentBytes, bytes);
}

View File

@ -133,11 +133,6 @@ public class UpgradeIndexMergePolicy extends MergePolicy {
return base.useCompoundFile(segments, newSegment, writer);
}
@Override
public void close() {
base.close();
}
@Override
public String toString() {
return "[" + getClass().getSimpleName() + "->" + base + "]";

View File

@ -33,7 +33,6 @@ public class TestNoMergePolicy extends LuceneTestCase {
assertNull(mp.findMerges(null, (SegmentInfos)null, null));
assertNull(mp.findForcedMerges(null, 0, null, null));
assertNull(mp.findForcedDeletesMerges(null, null));
mp.close();
}
@Test

View File

@ -252,9 +252,6 @@ public class TestPerSegmentDeletes extends LuceneTestCase {
this.useCompoundFile = useCompoundFile;
}
@Override
public void close() {}
@Override
public MergeSpecification findMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, IndexWriter writer)
throws IOException {

View File

@ -30,8 +30,8 @@ import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.MergeState;
import org.apache.lucene.index.MergeTrigger;
import org.apache.lucene.index.MultiReader;
import org.apache.lucene.index.SegmentInfo;
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentInfo;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.SegmentReader;
import org.apache.lucene.index.SlowCompositeReaderWrapper;
@ -203,11 +203,6 @@ public final class SortingMergePolicy extends MergePolicy {
return sortedMergeSpecification(in.findForcedDeletesMerges(segmentInfos, writer));
}
@Override
public void close() {
in.close();
}
@Override
public boolean useCompoundFile(SegmentInfos segments,
SegmentCommitInfo newSegment, IndexWriter writer) throws IOException {

View File

@ -130,10 +130,6 @@ public class MockRandomMergePolicy extends MergePolicy {
return findMerges(null, segmentInfos, writer);
}
@Override
public void close() {
}
@Override
public boolean useCompoundFile(SegmentInfos infos, SegmentCommitInfo mergedInfo, IndexWriter writer) throws IOException {
// 80% of the time we create CFS:

View File

@ -17,16 +17,18 @@
package org.apache.solr.util;
import org.apache.lucene.index.*;
import org.apache.lucene.index.MergePolicy.MergeSpecification;
import org.apache.lucene.util.LuceneTestCase;
import java.io.IOException;
import java.util.Map;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.MergeTrigger;
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.util.LuceneTestCase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import java.io.IOException;
/**
* A {@link MergePolicy} with a no-arg constructor that proxies to a
* wrapped instance retrieved from {@link LuceneTestCase#newMergePolicy}.
@ -55,11 +57,6 @@ public final class RandomMergePolicy extends MergePolicy {
inner.getClass(), inner);
}
@Override
public void close() {
inner.close();
}
@Override
public MergeSpecification findForcedDeletesMerges(SegmentInfos segmentInfos, IndexWriter writer)
throws IOException {