LUCENE-8962: add ability to selectively merge on commit (#1552)

Co-authored-by: Michael Froh <msfroh@apache.org>
Co-authored-by: Simon Willnauer <simonw@apache.org>
This commit is contained in:
Michael Sokolov 2020-06-18 16:56:29 -04:00 committed by GitHub
parent 56febf05c3
commit 972c84022e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 351 additions and 73 deletions

View File

@ -405,6 +405,10 @@ Improvements
* LUCENE-9253: KoreanTokenizer now supports custom dictionaries(system, unknown). (Namgyu Kim)
* LUCENE-8962: Add IndexWriter merge-on-commit feature to selectively merge small segments on commit,
subject to a configurable timeout, to improve search performance by reducing the number of small
segments for searching (Michael Froh, Mike Mccandless, Simon Willnauer)
* LUCENE-9171: QueryBuilder can now use BoostAttributes on input token streams to selectively
boost particular terms or synonyms in parsed queries. (Alessandro Benedetti, Alan Woodward)

View File

@ -57,6 +57,11 @@ public class FilterMergePolicy extends MergePolicy {
return in.findForcedDeletesMerges(segmentInfos, mergeContext);
}
@Override
public MergeSpecification findFullFlushMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext) throws IOException {
return in.findFullFlushMerges(mergeTrigger, segmentInfos, mergeContext);
}
@Override
public boolean useCompoundFile(SegmentInfos infos, SegmentCommitInfo mergedInfo, MergeContext mergeContext)
throws IOException {

View File

@ -33,6 +33,7 @@ import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@ -2166,7 +2167,13 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
}
}
} else {
spec = mergePolicy.findMerges(trigger, segmentInfos, this);
switch (trigger) {
case COMMIT:
spec = mergePolicy.findFullFlushMerges(trigger, segmentInfos, this);
break;
default:
spec = mergePolicy.findMerges(trigger, segmentInfos, this);
}
}
if (spec != null) {
final int numMerges = spec.merges.size();
@ -3172,7 +3179,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
SegmentInfos toCommit = null;
boolean anyChanges = false;
long seqNo;
MergePolicy.MergeSpecification onCommitMerges = null;
AtomicBoolean includeInCommit = new AtomicBoolean(true);
final long maxCommitMergeWaitSeconds = config.getMaxCommitMergeWaitSeconds();
// This is copied from doFlush, except it's modified to
// clone & incRef the flushed SegmentInfos inside the
// sync block:
@ -3226,6 +3235,45 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
// sneak into the commit point:
toCommit = segmentInfos.clone();
if (anyChanges && maxCommitMergeWaitSeconds > 0) {
SegmentInfos committingSegmentInfos = toCommit;
onCommitMerges = updatePendingMerges(new OneMergeWrappingMergePolicy(config.getMergePolicy(), toWrap ->
new MergePolicy.OneMerge(toWrap.segments) {
@Override
public void mergeFinished(boolean committed) throws IOException {
assert Thread.holdsLock(IndexWriter.this);
if (committed && includeInCommit.get()) {
deleter.incRef(info.files());
Set<String> mergedSegmentNames = new HashSet<>();
for (SegmentCommitInfo sci : segments) {
mergedSegmentNames.add(sci.info.name);
}
List<SegmentCommitInfo> toCommitMergedAwaySegments = new ArrayList<>();
for (SegmentCommitInfo sci : committingSegmentInfos) {
if (mergedSegmentNames.contains(sci.info.name)) {
toCommitMergedAwaySegments.add(sci);
deleter.decRef(sci.files());
}
}
// Construct a OneMerge that applies to toCommit
MergePolicy.OneMerge applicableMerge = new MergePolicy.OneMerge(toCommitMergedAwaySegments);
applicableMerge.info = info.clone();
long segmentCounter = Long.parseLong(info.info.name.substring(1), Character.MAX_RADIX);
committingSegmentInfos.counter = Math.max(committingSegmentInfos.counter, segmentCounter + 1);
committingSegmentInfos.applyMergeChanges(applicableMerge, false);
}
toWrap.mergeFinished(committed);
super.mergeFinished(committed);
}
@Override
public CodecReader wrapForMerge(CodecReader reader) throws IOException {
return toWrap.wrapForMerge(reader);
}
}
), MergeTrigger.COMMIT, UNBOUNDED_MAX_MERGE_SEGMENTS);
}
pendingCommitChangeCount = changeCount.get();
// This protects the segmentInfos we are now going
@ -3233,8 +3281,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
// we are trying to sync all referenced files, a
// merge completes which would otherwise have
// removed the files we are now syncing.
filesToCommit = toCommit.files(false);
deleter.incRef(filesToCommit);
deleter.incRef(toCommit.files(false));
}
success = true;
} finally {
@ -3256,6 +3303,15 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
maybeCloseOnTragicEvent();
}
if (onCommitMerges != null) {
mergeScheduler.merge(mergeSource, MergeTrigger.COMMIT);
onCommitMerges.await(maxCommitMergeWaitSeconds, TimeUnit.SECONDS);
synchronized (this) {
// we need to call this under lock since mergeFinished above is also called under the IW lock
includeInCommit.set(false);
}
}
filesToCommit = toCommit.files(false);
try {
if (anyChanges) {
maybeMerge.set(true);

View File

@ -19,6 +19,7 @@ package org.apache.lucene.index;
import java.io.PrintStream;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.stream.Collectors;
import org.apache.lucene.analysis.Analyzer;
@ -110,6 +111,9 @@ public final class IndexWriterConfig extends LiveIndexWriterConfig {
/** Default value for whether calls to {@link IndexWriter#close()} include a commit. */
public final static boolean DEFAULT_COMMIT_ON_CLOSE = true;
/** Default value for time to wait for merges on commit (when using a {@link MergePolicy} that implements findFullFlushMerges). */
public static final long DEFAULT_MAX_COMMIT_MERGE_WAIT_SECONDS = 0;
// indicates whether this config instance is already attached to a writer.
// not final so that it can be cloned properly.
private SetOnce<IndexWriter> writer = new SetOnce<>();
@ -459,6 +463,27 @@ public final class IndexWriterConfig extends LiveIndexWriterConfig {
return this;
}
/**
* Expert: sets the amount of time to wait for merges returned by MergePolicy.findFullFlushMerges(...).
* If this time is reached, we proceed with the commit based on segments merged up to that point.
* The merges are not cancelled, and will still run to completion independent of the commit
* like normal segment merges. The default is <code>{@value IndexWriterConfig#DEFAULT_MAX_COMMIT_MERGE_WAIT_SECONDS}</code>.
*
* Note: This settings has no effect unless {@link MergePolicy#findFullFlushMerges(MergeTrigger, SegmentInfos, MergePolicy.MergeContext)}
* has an implementation that actually returns merges which by default doesn't return any merges.
*/
public IndexWriterConfig setMaxCommitMergeWaitSeconds(long maxCommitMergeWaitSeconds) {
this.maxCommitMergeWaitSeconds = maxCommitMergeWaitSeconds;
return this;
}
/** We only allow sorting on these types */
private static final EnumSet<SortField.Type> ALLOWED_INDEX_SORT_TYPES = EnumSet.of(SortField.Type.STRING,
SortField.Type.LONG,
SortField.Type.INT,
SortField.Type.DOUBLE,
SortField.Type.FLOAT);
/**
* Set the {@link Sort} order to use for all (flushed and merged) segments.
*/

View File

@ -109,6 +109,8 @@ public class LiveIndexWriterConfig {
/** soft deletes field */
protected String softDeletesField = null;
/** Amount of time to wait for merges returned by MergePolicy.findFullFlushMerges(...) */
protected volatile long maxCommitMergeWaitSeconds;
// used by IndexWriterConfig
LiveIndexWriterConfig(Analyzer analyzer) {
@ -132,6 +134,7 @@ public class LiveIndexWriterConfig {
flushPolicy = new FlushByRamOrCountsPolicy();
readerPooling = IndexWriterConfig.DEFAULT_READER_POOLING;
perThreadHardLimitMB = IndexWriterConfig.DEFAULT_RAM_PER_THREAD_HARD_LIMIT_MB;
maxCommitMergeWaitSeconds = IndexWriterConfig.DEFAULT_MAX_COMMIT_MERGE_WAIT_SECONDS;
}
/** Returns the default analyzer to use for indexing documents. */
@ -461,6 +464,15 @@ public class LiveIndexWriterConfig {
return softDeletesField;
}
/**
* Expert: return the amount of time to wait for merges returned by by MergePolicy.findFullFlushMerges(...).
* If this time is reached, we proceed with the commit based on segments merged up to that point.
* The merges are not cancelled, and may still run to completion independent of the commit.
*/
public long getMaxCommitMergeWaitSeconds() {
return maxCommitMergeWaitSeconds;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
@ -484,6 +496,7 @@ public class LiveIndexWriterConfig {
sb.append("indexSort=").append(getIndexSort()).append("\n");
sb.append("checkPendingFlushOnUpdate=").append(isCheckPendingFlushOnUpdate()).append("\n");
sb.append("softDeletesField=").append(getSoftDeletesField()).append("\n");
sb.append("maxCommitMergeWaitSeconds=").append(getMaxCommitMergeWaitSeconds()).append("\n");
return sb.toString();
}
}

View File

@ -555,7 +555,7 @@ public abstract class MergePolicy {
* an original segment present in the
* to-be-merged index; else, it was a segment
* produced by a cascaded merge.
* @param mergeContext the IndexWriter to find the merges on
* @param mergeContext the MergeContext to find the merges on
*/
public abstract MergeSpecification findForcedMerges(
SegmentInfos segmentInfos, int maxSegmentCount, Map<SegmentCommitInfo,Boolean> segmentsToMerge, MergeContext mergeContext)
@ -566,11 +566,33 @@ public abstract class MergePolicy {
* deletes from the index.
* @param segmentInfos
* the total set of segments in the index
* @param mergeContext the IndexWriter to find the merges on
* @param mergeContext the MergeContext to find the merges on
*/
public abstract MergeSpecification findForcedDeletesMerges(
SegmentInfos segmentInfos, MergeContext mergeContext) throws IOException;
/**
* Identifies merges that we want to execute (synchronously) on commit. By default, do not synchronously merge on commit.
*
* Any merges returned here will make {@link IndexWriter#commit()} or {@link IndexWriter#prepareCommit()} block until
* the merges complete or until {@link IndexWriterConfig#getMaxCommitMergeWaitSeconds()} have elapsed. This may be
* used to merge small segments that have just been flushed as part of the commit, reducing the number of segments in
* the commit. If a merge does not complete in the allotted time, it will continue to execute, but will not be reflected
* in the commit.
*
* If a {@link OneMerge} in the returned {@link MergeSpecification} includes a segment already included in a registered
* merge, then {@link IndexWriter#commit()} or {@link IndexWriter#prepareCommit()} will throw a {@link IllegalStateException}.
* Use {@link MergeContext#getMergingSegments()} to determine which segments are currently registered to merge.
*
* @param mergeTrigger the event that triggered the merge (COMMIT or FULL_FLUSH).
* @param segmentInfos the total set of segments in the index (while preparing the commit)
* @param mergeContext the MergeContext to find the merges on, which should be used to determine which segments are
* already in a registered merge (see {@link MergeContext#getMergingSegments()}).
*/
public MergeSpecification findFullFlushMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext) throws IOException {
return null;
}
/**
* Returns true if a new segment (regardless of its origin) should use the
* compound file format. The default implementation returns <code>true</code>

View File

@ -47,5 +47,10 @@ public enum MergeTrigger {
/**
* Merge was triggered by a closing IndexWriter.
*/
CLOSING
CLOSING,
/**
* Merge was triggered on commit.
*/
COMMIT,
}

View File

@ -45,6 +45,9 @@ public final class NoMergePolicy extends MergePolicy {
@Override
public MergeSpecification findForcedDeletesMerges(SegmentInfos segmentInfos, MergeContext mergeContext) { return null; }
@Override
public MergeSpecification findFullFlushMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext) { return null; }
@Override
public boolean useCompoundFile(SegmentInfos segments, SegmentCommitInfo newSegment, MergeContext mergeContext) {
return newSegment.info.getUseCompoundFile();

View File

@ -59,6 +59,11 @@ public class OneMergeWrappingMergePolicy extends FilterMergePolicy {
return wrapSpec(in.findForcedDeletesMerges(segmentInfos, mergeContext));
}
@Override
public MergeSpecification findFullFlushMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext) throws IOException {
return wrapSpec(in.findFullFlushMerges(mergeTrigger, segmentInfos, mergeContext));
}
private MergeSpecification wrapSpec(MergeSpecification spec) {
MergeSpecification wrapped = spec == null ? null : new MergeSpecification();
if (wrapped != null) {

View File

@ -4186,19 +4186,57 @@ public class TestIndexWriter extends LuceneTestCase {
}
};
})))) {
Document doc = new Document();
doc.add(new StringField("id", "1", Field.Store.NO));
writer.addDocument(doc);
writer.flush();
writer.addDocument(doc);
writer.flush();
writer.deleteDocuments(new Term("id", "1"));
writer.flush();
assertEquals(2, writer.getSegmentCount());
assertEquals(0, writer.getDocStats().numDocs);
assertEquals(2, writer.getDocStats().maxDoc);
writer.forceMerge(1);
Document doc = new Document();
doc.add(new StringField("id", "1", Field.Store.NO));
writer.addDocument(doc);
writer.flush();
writer.addDocument(doc);
writer.flush();
writer.deleteDocuments(new Term("id", "1"));
writer.flush();
assertEquals(2, writer.getSegmentCount());
assertEquals(0, writer.getDocStats().numDocs);
assertEquals(2, writer.getDocStats().maxDoc);
writer.forceMerge(1);
}
}
}
public void testMergeOnCommitKeepFullyDeletedSegments() throws Exception {
Directory dir = newDirectory();
IndexWriterConfig iwc = newIndexWriterConfig();
iwc.setMaxCommitMergeWaitSeconds(30);
iwc.mergePolicy = new FilterMergePolicy(newMergePolicy()) {
@Override
public boolean keepFullyDeletedSegment(IOSupplier<CodecReader> readerIOSupplier) {
return true;
}
@Override
public MergeSpecification findFullFlushMerges(MergeTrigger mergeTrigger,
SegmentInfos segmentInfos,
MergeContext mergeContext) {
List<SegmentCommitInfo> fullyDeletedSegments = segmentInfos.asList().stream()
.filter(s -> s.info.maxDoc() - s.getDelCount() == 0)
.collect(Collectors.toList());
if (fullyDeletedSegments.isEmpty()) {
return null;
}
MergeSpecification spec = new MergeSpecification();
spec.add(new OneMerge(fullyDeletedSegments));
return spec;
}
};
IndexWriter w = new IndexWriter(dir, iwc);
Document d = new Document();
d.add(new StringField("id", "1", Field.Store.YES));
w.addDocument(d);
w.commit();
w.updateDocument(new Term("id", "1"), d);
w.commit();
try (DirectoryReader reader = w.getReader()) {
assertEquals(1, reader.numDocs());
}
IOUtils.close(w, dir);
}
}

View File

@ -18,17 +18,42 @@ package org.apache.lucene.index;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.lucene.analysis.MockAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.index.IndexWriterConfig.OpenMode;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.LuceneTestCase;
public class TestIndexWriterMergePolicy extends LuceneTestCase {
private static final MergePolicy MERGE_ON_COMMIT_POLICY = new LogDocMergePolicy() {
@Override
public MergeSpecification findFullFlushMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext) {
// Optimize down to a single segment on commit
if (mergeTrigger == MergeTrigger.COMMIT && segmentInfos.size() > 1) {
List<SegmentCommitInfo> nonMergingSegments = new ArrayList<>();
for (SegmentCommitInfo sci : segmentInfos) {
if (mergeContext.getMergingSegments().contains(sci) == false) {
nonMergingSegments.add(sci);
}
}
if (nonMergingSegments.size() > 1) {
MergeSpecification mergeSpecification = new MergeSpecification();
mergeSpecification.add(new OneMerge(nonMergingSegments));
return mergeSpecification;
}
}
return null;
}
};
// Test the normal case
public void testNormalCase() throws IOException {
Directory dir = newDirectory();
@ -278,6 +303,50 @@ public class TestIndexWriterMergePolicy extends LuceneTestCase {
assertSetters(new LogDocMergePolicy());
}
// Test basic semantics of merge on commit
public void testMergeOnCommit() throws IOException {
Directory dir = newDirectory();
IndexWriter firstWriter = new IndexWriter(dir, newIndexWriterConfig(new MockAnalyzer(random()))
.setMergePolicy(NoMergePolicy.INSTANCE));
for (int i = 0; i < 5; i++) {
TestIndexWriter.addDoc(firstWriter);
firstWriter.flush();
}
DirectoryReader firstReader = DirectoryReader.open(firstWriter);
assertEquals(5, firstReader.leaves().size());
firstReader.close();
firstWriter.close(); // When this writer closes, it does not merge on commit.
IndexWriterConfig iwc = newIndexWriterConfig(new MockAnalyzer(random()))
.setMergePolicy(MERGE_ON_COMMIT_POLICY).setMaxCommitMergeWaitSeconds(30);
IndexWriter writerWithMergePolicy = new IndexWriter(dir, iwc);
writerWithMergePolicy.commit(); // No changes. Commit doesn't trigger a merge.
DirectoryReader unmergedReader = DirectoryReader.open(writerWithMergePolicy);
assertEquals(5, unmergedReader.leaves().size());
unmergedReader.close();
TestIndexWriter.addDoc(writerWithMergePolicy);
writerWithMergePolicy.commit(); // Doc added, do merge on commit.
assertEquals(1, writerWithMergePolicy.getSegmentCount()); //
DirectoryReader mergedReader = DirectoryReader.open(writerWithMergePolicy);
assertEquals(1, mergedReader.leaves().size());
mergedReader.close();
try (IndexReader reader = writerWithMergePolicy.getReader()) {
IndexSearcher searcher = new IndexSearcher(reader);
assertEquals(6, reader.numDocs());
assertEquals(6, searcher.count(new MatchAllDocsQuery()));
}
writerWithMergePolicy.close();
dir.close();
}
private void assertSetters(MergePolicy lmp) {
lmp.setMaxCFSSegmentSizeMB(2.0);
assertEquals(2.0, lmp.getMaxCFSSegmentSizeMB(), EPSILON);

View File

@ -128,6 +128,38 @@ public class MockRandomMergePolicy extends MergePolicy {
return findMerges(null, segmentInfos, mergeContext);
}
@Override
public MergeSpecification findFullFlushMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext) throws IOException {
MergeSpecification mergeSpecification = findMerges(null, segmentInfos, mergeContext);
if (mergeSpecification == null) {
return null;
}
// Do not return any merges involving already-merging segments.
MergeSpecification filteredMergeSpecification = new MergeSpecification();
for (OneMerge oneMerge : mergeSpecification.merges) {
boolean filtered = false;
List<SegmentCommitInfo> nonMergingSegments = new ArrayList<>();
for (SegmentCommitInfo sci : oneMerge.segments) {
if (mergeContext.getMergingSegments().contains(sci) == false) {
nonMergingSegments.add(sci);
} else {
filtered = true;
}
}
if (filtered == true) {
if (nonMergingSegments.size() > 0) {
filteredMergeSpecification.add(new OneMerge(nonMergingSegments));
}
} else {
filteredMergeSpecification.add(oneMerge);
}
}
if (filteredMergeSpecification.merges.size() > 0) {
return filteredMergeSpecification;
}
return null;
}
@Override
public boolean useCompoundFile(SegmentInfos infos, SegmentCommitInfo mergedInfo, MergeContext mergeContext) throws IOException {
// 80% of the time we create CFS:

View File

@ -1003,6 +1003,7 @@ public abstract class LuceneTestCase extends Assert {
if (rarely(r)) {
c.setCheckPendingFlushUpdate(false);
}
c.setMaxCommitMergeWaitSeconds(atLeast(r, 1));
return c;
}