mirror of https://github.com/apache/lucene.git
LUCENE-8962: Add ability to selectively merge on commit (#1155)
* LUCENE-8962: Add ability to selectively merge on commit This adds a new "findCommitMerges" method to MergePolicy, which can specify merges to be executed before the IndexWriter.prepareCommitInternal method returns. If we have many index writer threads, they will flush their DWPT buffers on commit, resulting in many small segments, which can be merged before the commit returns. * Add missing Javadoc * Fix incorrect comment * Refactoring and fix intermittent test failure 1. Made some changes to the callback to update toCommit, leveraging SegmentInfos.applyMergeChanges. 2. I realized that we'll never end up with 0 registered merges, because we throw an exception if we fail to register a merge. 3. Moved the IndexWriterEvents.beginMergeOnCommit notification to before we call MergeScheduler.merge, since we may not be merging on another thread. 4. There was an intermittent test failure due to randomness in the time it takes for merges to complete. Before doing the final commit, we wait for pending merges to finish. We may still end up abandoning the final merge, but we can detect that and assert that either the merge was abandoned (and we have > 1 segment) or we did merge down to 1 segment. * Fix typo * Fix/improve comments based on PR feedback * More comment improvements from PR feedback * Rename method and add new MergeTrigger 1. Renamed findCommitMerges -> findFullFlushMerges. 2. Added MergeTrigger.COMMIT, passed to findFullFlushMerges and to MergeScheduler when merging on commit. * Update renamed method name in strings and comments
This commit is contained in:
parent
f2a6ff1494
commit
a1791e7714
|
@ -57,6 +57,11 @@ public class FilterMergePolicy extends MergePolicy {
|
|||
return in.findForcedDeletesMerges(segmentInfos, mergeContext);
|
||||
}
|
||||
|
||||
@Override
|
||||
public MergeSpecification findFullFlushMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext) throws IOException {
|
||||
return in.findFullFlushMerges(mergeTrigger, segmentInfos, mergeContext);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean useCompoundFile(SegmentInfos infos, SegmentCommitInfo mergedInfo, MergeContext mergeContext)
|
||||
throws IOException {
|
||||
|
|
|
@ -32,6 +32,8 @@ import java.util.Map;
|
|||
import java.util.Queue;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
@ -3164,6 +3166,42 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
|
|||
}
|
||||
}
|
||||
|
||||
private MergePolicy.OneMerge updateSegmentInfosOnMergeFinish(MergePolicy.OneMerge merge, final SegmentInfos toCommit,
|
||||
AtomicReference<CountDownLatch> mergeLatchRef) {
|
||||
return new MergePolicy.OneMerge(merge.segments) {
|
||||
public void mergeFinished() throws IOException {
|
||||
super.mergeFinished();
|
||||
CountDownLatch mergeAwaitLatch = mergeLatchRef.get();
|
||||
if (mergeAwaitLatch == null) {
|
||||
// Commit thread timed out waiting for this merge and moved on. No need to manipulate toCommit.
|
||||
return;
|
||||
}
|
||||
if (isAborted() == false) {
|
||||
deleter.incRef(this.info.files());
|
||||
// Resolve "live" SegmentInfos segments to their toCommit cloned equivalents, based on segment name.
|
||||
Set<String> mergedSegmentNames = new HashSet<>();
|
||||
for (SegmentCommitInfo sci : this.segments) {
|
||||
deleter.decRef(sci.files());
|
||||
mergedSegmentNames.add(sci.info.name);
|
||||
}
|
||||
List<SegmentCommitInfo> toCommitMergedAwaySegments = new ArrayList<>();
|
||||
for (SegmentCommitInfo sci : toCommit) {
|
||||
if (mergedSegmentNames.contains(sci.info.name)) {
|
||||
toCommitMergedAwaySegments.add(sci);
|
||||
}
|
||||
}
|
||||
// Construct a OneMerge that applies to toCommit
|
||||
MergePolicy.OneMerge applicableMerge = new MergePolicy.OneMerge(toCommitMergedAwaySegments);
|
||||
applicableMerge.info = this.info.clone();
|
||||
long segmentCounter = Long.parseLong(this.info.info.name.substring(1), Character.MAX_RADIX);
|
||||
toCommit.counter = Math.max(toCommit.counter, segmentCounter + 1);
|
||||
toCommit.applyMergeChanges(applicableMerge, false);
|
||||
}
|
||||
mergeAwaitLatch.countDown();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private long prepareCommitInternal() throws IOException {
|
||||
startCommitTime = System.nanoTime();
|
||||
synchronized(commitLock) {
|
||||
|
@ -3186,6 +3224,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
|
|||
SegmentInfos toCommit = null;
|
||||
boolean anyChanges = false;
|
||||
long seqNo;
|
||||
List<MergePolicy.OneMerge> commitMerges = null;
|
||||
AtomicReference<CountDownLatch> mergeAwaitLatchRef = null;
|
||||
|
||||
// This is copied from doFlush, except it's modified to
|
||||
// clone & incRef the flushed SegmentInfos inside the
|
||||
|
@ -3240,6 +3280,30 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
|
|||
// sneak into the commit point:
|
||||
toCommit = segmentInfos.clone();
|
||||
|
||||
if (anyChanges) {
|
||||
// Find any merges that can execute on commit (per MergePolicy).
|
||||
MergePolicy.MergeSpecification mergeSpec =
|
||||
config.getMergePolicy().findFullFlushMerges(MergeTrigger.COMMIT, segmentInfos, this);
|
||||
if (mergeSpec != null && mergeSpec.merges.size() > 0) {
|
||||
int mergeCount = mergeSpec.merges.size();
|
||||
commitMerges = new ArrayList<>(mergeCount);
|
||||
mergeAwaitLatchRef = new AtomicReference<>(new CountDownLatch(mergeCount));
|
||||
for (MergePolicy.OneMerge oneMerge : mergeSpec.merges) {
|
||||
MergePolicy.OneMerge trackedMerge =
|
||||
updateSegmentInfosOnMergeFinish(oneMerge, toCommit, mergeAwaitLatchRef);
|
||||
if (registerMerge(trackedMerge) == false) {
|
||||
throw new IllegalStateException("MergePolicy " + config.getMergePolicy().getClass() +
|
||||
" returned merging segments from findFullFlushMerges");
|
||||
}
|
||||
commitMerges.add(trackedMerge);
|
||||
}
|
||||
if (infoStream.isEnabled("IW")) {
|
||||
infoStream.message("IW", "Registered " + mergeCount + " commit merges");
|
||||
infoStream.message("IW", "Before executing commit merges, had " + toCommit.size() + " segments");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pendingCommitChangeCount = changeCount.get();
|
||||
|
||||
// This protects the segmentInfos we are now going
|
||||
|
@ -3247,8 +3311,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
|
|||
// we are trying to sync all referenced files, a
|
||||
// merge completes which would otherwise have
|
||||
// removed the files we are now syncing.
|
||||
filesToCommit = toCommit.files(false);
|
||||
deleter.incRef(filesToCommit);
|
||||
deleter.incRef(toCommit.files(false));
|
||||
}
|
||||
success = true;
|
||||
} finally {
|
||||
|
@ -3269,6 +3332,53 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
|
|||
} finally {
|
||||
maybeCloseOnTragicEvent();
|
||||
}
|
||||
|
||||
if (mergeAwaitLatchRef != null) {
|
||||
CountDownLatch mergeAwaitLatch = mergeAwaitLatchRef.get();
|
||||
// If we found and registered any merges above, within the flushLock, then we want to ensure that they
|
||||
// complete execution. Note that since we released the lock, other merges may have been scheduled. We will
|
||||
// block until the merges that we registered complete. As they complete, they will update toCommit to
|
||||
// replace merged segments with the result of each merge.
|
||||
config.getIndexWriterEvents().beginMergeOnCommit();
|
||||
mergeScheduler.merge(this, MergeTrigger.COMMIT, true);
|
||||
long mergeWaitStart = System.nanoTime();
|
||||
int abandonedCount = 0;
|
||||
long waitTimeMillis = (long) (config.getMaxCommitMergeWaitSeconds() * 1000.0);
|
||||
try {
|
||||
if (mergeAwaitLatch.await(waitTimeMillis, TimeUnit.MILLISECONDS) == false) {
|
||||
synchronized (this) {
|
||||
// Need to do this in a synchronized block, to make sure none of our commit merges are currently
|
||||
// executing mergeFinished (since mergeFinished itself is called from within the IndexWriter lock).
|
||||
// After we clear the value from mergeAwaitLatchRef, the merges we schedule will still execute as
|
||||
// usual, but when they finish, they won't attempt to update toCommit or modify segment reference
|
||||
// counts.
|
||||
mergeAwaitLatchRef.set(null);
|
||||
for (MergePolicy.OneMerge commitMerge : commitMerges) {
|
||||
if (runningMerges.contains(commitMerge) || pendingMerges.contains(commitMerge)) {
|
||||
abandonedCount++;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
Thread.interrupted();
|
||||
throw new IOException("Interrupted waiting for merges");
|
||||
} finally {
|
||||
if (infoStream.isEnabled("IW")) {
|
||||
infoStream.message("IW", String.format(Locale.ROOT, "Waited %.1f ms for commit merges",
|
||||
(System.nanoTime() - mergeWaitStart)/1_000_000.0));
|
||||
infoStream.message("IW", "After executing commit merges, had " + toCommit.size() + " segments");
|
||||
if (abandonedCount > 0) {
|
||||
infoStream.message("IW", "Abandoned " + abandonedCount + " commit merges after " + waitTimeMillis + " ms");
|
||||
}
|
||||
}
|
||||
if (abandonedCount > 0) {
|
||||
config.getIndexWriterEvents().abandonedMergesOnCommit(abandonedCount);
|
||||
}
|
||||
config.getIndexWriterEvents().finishMergeOnCommit();
|
||||
}
|
||||
}
|
||||
filesToCommit = toCommit.files(false);
|
||||
|
||||
try {
|
||||
if (anyChanges) {
|
||||
|
|
|
@ -114,6 +114,9 @@ public final class IndexWriterConfig extends LiveIndexWriterConfig {
|
|||
|
||||
/** Default value for whether calls to {@link IndexWriter#close()} include a commit. */
|
||||
public final static boolean DEFAULT_COMMIT_ON_CLOSE = true;
|
||||
|
||||
/** Default value for time to wait for merges on commit (when using a {@link MergePolicy} that implements findFullFlushMerges). */
|
||||
public static final double DEFAULT_MAX_COMMIT_MERGE_WAIT_SECONDS = 30.0;
|
||||
|
||||
// indicates whether this config instance is already attached to a writer.
|
||||
// not final so that it can be cloned properly.
|
||||
|
@ -486,6 +489,24 @@ public final class IndexWriterConfig extends LiveIndexWriterConfig {
|
|||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Expert: sets the amount of time to wait for merges returned by MergePolicy.findFullFlushMerges(...).
|
||||
* If this time is reached, we proceed with the commit based on segments merged up to that point.
|
||||
* The merges are not cancelled, and may still run to completion independent of the commit.
|
||||
*/
|
||||
public IndexWriterConfig setMaxCommitMergeWaitSeconds(double maxCommitMergeWaitSeconds) {
|
||||
this.maxCommitMergeWaitSeconds = maxCommitMergeWaitSeconds;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the callback that gets invoked when IndexWriter performs various actions.
|
||||
*/
|
||||
public IndexWriterConfig setIndexWriterEvents(IndexWriterEvents indexWriterEvents) {
|
||||
this.indexWriterEvents = indexWriterEvents;
|
||||
return this;
|
||||
}
|
||||
|
||||
/** We only allow sorting on these types */
|
||||
private static final EnumSet<SortField.Type> ALLOWED_INDEX_SORT_TYPES = EnumSet.of(SortField.Type.STRING,
|
||||
SortField.Type.LONG,
|
||||
|
|
|
@ -0,0 +1,59 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
|
||||
package org.apache.lucene.index;
|
||||
|
||||
/**
|
||||
* Callback interface to signal various actions taken by IndexWriter.
|
||||
*
|
||||
* @lucene.experimental
|
||||
*/
|
||||
public interface IndexWriterEvents {
|
||||
/**
|
||||
* A default implementation that ignores all events.
|
||||
*/
|
||||
IndexWriterEvents NULL_EVENTS = new IndexWriterEvents() {
|
||||
@Override
|
||||
public void beginMergeOnCommit() { }
|
||||
|
||||
@Override
|
||||
public void finishMergeOnCommit() { }
|
||||
|
||||
@Override
|
||||
public void abandonedMergesOnCommit(int abandonedCount) { }
|
||||
};
|
||||
|
||||
/**
|
||||
* Signals the start of waiting for a merge on commit, returned from
|
||||
* {@link MergePolicy#findFullFlushMerges(MergeTrigger, SegmentInfos, MergePolicy.MergeContext)}.
|
||||
*/
|
||||
void beginMergeOnCommit();
|
||||
|
||||
/**
|
||||
* Signals the end of waiting for merges on commit. This may be either because the merges completed, or because we timed out according
|
||||
* to the limit set in {@link IndexWriterConfig#setMaxCommitMergeWaitSeconds(double)}.
|
||||
*/
|
||||
void finishMergeOnCommit();
|
||||
|
||||
/**
|
||||
* Called to signal that we abandoned some merges on commit upon reaching the timeout specified in
|
||||
* {@link IndexWriterConfig#setMaxCommitMergeWaitSeconds(double)}.
|
||||
*/
|
||||
void abandonedMergesOnCommit(int abandonedCount);
|
||||
}
|
|
@ -117,6 +117,12 @@ public class LiveIndexWriterConfig {
|
|||
/** the attributes for the NRT readers */
|
||||
protected Map<String, String> readerAttributes = Collections.emptyMap();
|
||||
|
||||
/** Amount of time to wait for merges returned by MergePolicy.findFullFlushMerges(...) */
|
||||
protected volatile double maxCommitMergeWaitSeconds;
|
||||
|
||||
/** Callback interface called on index writer actions. */
|
||||
protected IndexWriterEvents indexWriterEvents;
|
||||
|
||||
|
||||
// used by IndexWriterConfig
|
||||
LiveIndexWriterConfig(Analyzer analyzer) {
|
||||
|
@ -141,6 +147,8 @@ public class LiveIndexWriterConfig {
|
|||
readerPooling = IndexWriterConfig.DEFAULT_READER_POOLING;
|
||||
indexerThreadPool = new DocumentsWriterPerThreadPool();
|
||||
perThreadHardLimitMB = IndexWriterConfig.DEFAULT_RAM_PER_THREAD_HARD_LIMIT_MB;
|
||||
maxCommitMergeWaitSeconds = IndexWriterConfig.DEFAULT_MAX_COMMIT_MERGE_WAIT_SECONDS;
|
||||
indexWriterEvents = IndexWriterEvents.NULL_EVENTS;
|
||||
}
|
||||
|
||||
/** Returns the default analyzer to use for indexing documents. */
|
||||
|
@ -480,6 +488,22 @@ public class LiveIndexWriterConfig {
|
|||
return softDeletesField;
|
||||
}
|
||||
|
||||
/**
|
||||
* Expert: return the amount of time to wait for merges returned by by MergePolicy.findFullFlushMerges(...).
|
||||
* If this time is reached, we proceed with the commit based on segments merged up to that point.
|
||||
* The merges are not cancelled, and may still run to completion independent of the commit.
|
||||
*/
|
||||
public double getMaxCommitMergeWaitSeconds() {
|
||||
return maxCommitMergeWaitSeconds;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a callback used to signal actions taken by the {@link IndexWriter}.
|
||||
*/
|
||||
public IndexWriterEvents getIndexWriterEvents() {
|
||||
return indexWriterEvents;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
|
@ -505,6 +529,8 @@ public class LiveIndexWriterConfig {
|
|||
sb.append("checkPendingFlushOnUpdate=").append(isCheckPendingFlushOnUpdate()).append("\n");
|
||||
sb.append("softDeletesField=").append(getSoftDeletesField()).append("\n");
|
||||
sb.append("readerAttributes=").append(getReaderAttributes()).append("\n");
|
||||
sb.append("maxCommitMergeWaitSeconds=").append(getMaxCommitMergeWaitSeconds()).append("\n");
|
||||
sb.append("indexWriterEvents=").append(getIndexWriterEvents().getClass().getName()).append("\n");
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
|
|
|
@ -510,7 +510,7 @@ public abstract class MergePolicy {
|
|||
* an original segment present in the
|
||||
* to-be-merged index; else, it was a segment
|
||||
* produced by a cascaded merge.
|
||||
* @param mergeContext the IndexWriter to find the merges on
|
||||
* @param mergeContext the MergeContext to find the merges on
|
||||
*/
|
||||
public abstract MergeSpecification findForcedMerges(
|
||||
SegmentInfos segmentInfos, int maxSegmentCount, Map<SegmentCommitInfo,Boolean> segmentsToMerge, MergeContext mergeContext)
|
||||
|
@ -521,11 +521,33 @@ public abstract class MergePolicy {
|
|||
* deletes from the index.
|
||||
* @param segmentInfos
|
||||
* the total set of segments in the index
|
||||
* @param mergeContext the IndexWriter to find the merges on
|
||||
* @param mergeContext the MergeContext to find the merges on
|
||||
*/
|
||||
public abstract MergeSpecification findForcedDeletesMerges(
|
||||
SegmentInfos segmentInfos, MergeContext mergeContext) throws IOException;
|
||||
|
||||
/**
|
||||
* Identifies merges that we want to execute (synchronously) on commit. By default, do not synchronously merge on commit.
|
||||
*
|
||||
* Any merges returned here will make {@link IndexWriter#commit()} or {@link IndexWriter#prepareCommit()} block until
|
||||
* the merges complete or until {@link IndexWriterConfig#getMaxCommitMergeWaitSeconds()} have elapsed. This may be
|
||||
* used to merge small segments that have just been flushed as part of the commit, reducing the number of segments in
|
||||
* the commit. If a merge does not complete in the allotted time, it will continue to execute, but will not be reflected
|
||||
* in the commit.
|
||||
*
|
||||
* If a {@link OneMerge} in the returned {@link MergeSpecification} includes a segment already included in a registered
|
||||
* merge, then {@link IndexWriter#commit()} or {@link IndexWriter#prepareCommit()} will throw a {@link IllegalStateException}.
|
||||
* Use {@link MergeContext#getMergingSegments()} to determine which segments are currently registered to merge.
|
||||
*
|
||||
* @param mergeTrigger the event that triggered the merge (COMMIT or FULL_FLUSH).
|
||||
* @param segmentInfos the total set of segments in the index (while preparing the commit)
|
||||
* @param mergeContext the MergeContext to find the merges on, which should be used to determine which segments are
|
||||
* already in a registered merge (see {@link MergeContext#getMergingSegments()}).
|
||||
*/
|
||||
public MergeSpecification findFullFlushMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext) throws IOException {
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if a new segment (regardless of its origin) should use the
|
||||
* compound file format. The default implementation returns <code>true</code>
|
||||
|
|
|
@ -47,5 +47,10 @@ public enum MergeTrigger {
|
|||
/**
|
||||
* Merge was triggered by a closing IndexWriter.
|
||||
*/
|
||||
CLOSING
|
||||
CLOSING,
|
||||
|
||||
/**
|
||||
* Merge was triggered on commit.
|
||||
*/
|
||||
COMMIT,
|
||||
}
|
||||
|
|
|
@ -45,6 +45,9 @@ public final class NoMergePolicy extends MergePolicy {
|
|||
@Override
|
||||
public MergeSpecification findForcedDeletesMerges(SegmentInfos segmentInfos, MergeContext mergeContext) { return null; }
|
||||
|
||||
@Override
|
||||
public MergeSpecification findFullFlushMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext) { return null; }
|
||||
|
||||
@Override
|
||||
public boolean useCompoundFile(SegmentInfos segments, SegmentCommitInfo newSegment, MergeContext mergeContext) {
|
||||
return newSegment.info.getUseCompoundFile();
|
||||
|
|
|
@ -59,6 +59,11 @@ public class OneMergeWrappingMergePolicy extends FilterMergePolicy {
|
|||
return wrapSpec(in.findForcedDeletesMerges(segmentInfos, mergeContext));
|
||||
}
|
||||
|
||||
@Override
|
||||
public MergeSpecification findFullFlushMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext) throws IOException {
|
||||
return wrapSpec(in.findFullFlushMerges(mergeTrigger, segmentInfos, mergeContext));
|
||||
}
|
||||
|
||||
private MergeSpecification wrapSpec(MergeSpecification spec) {
|
||||
MergeSpecification wrapped = spec == null ? null : new MergeSpecification();
|
||||
if (wrapped != null) {
|
||||
|
|
|
@ -18,13 +18,21 @@ package org.apache.lucene.index;
|
|||
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.lucene.analysis.MockAnalyzer;
|
||||
import org.apache.lucene.document.Document;
|
||||
import org.apache.lucene.document.Field;
|
||||
import org.apache.lucene.index.IndexWriterConfig.OpenMode;
|
||||
import org.apache.lucene.search.IndexSearcher;
|
||||
import org.apache.lucene.search.MatchAllDocsQuery;
|
||||
import org.apache.lucene.store.Directory;
|
||||
|
||||
import org.apache.lucene.util.LineFileDocs;
|
||||
import org.apache.lucene.util.LuceneTestCase;
|
||||
|
||||
public class TestIndexWriterMergePolicy extends LuceneTestCase {
|
||||
|
@ -278,6 +286,130 @@ public class TestIndexWriterMergePolicy extends LuceneTestCase {
|
|||
assertSetters(new LogDocMergePolicy());
|
||||
}
|
||||
|
||||
public void testMergeOnCommit() throws IOException, InterruptedException {
|
||||
Directory dir = newDirectory();
|
||||
|
||||
IndexWriter firstWriter = new IndexWriter(dir, newIndexWriterConfig(new MockAnalyzer(random()))
|
||||
.setMergePolicy(NoMergePolicy.INSTANCE));
|
||||
for (int i = 0; i < 5; i++) {
|
||||
TestIndexWriter.addDoc(firstWriter);
|
||||
firstWriter.flush();
|
||||
}
|
||||
DirectoryReader firstReader = DirectoryReader.open(firstWriter);
|
||||
assertEquals(5, firstReader.leaves().size());
|
||||
firstReader.close();
|
||||
firstWriter.close();
|
||||
|
||||
MergePolicy mergeOnCommitPolicy = new LogDocMergePolicy() {
|
||||
@Override
|
||||
public MergeSpecification findFullFlushMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext) {
|
||||
// Optimize down to a single segment on commit
|
||||
if (mergeTrigger == MergeTrigger.COMMIT && segmentInfos.size() > 1) {
|
||||
List<SegmentCommitInfo> nonMergingSegments = new ArrayList<>();
|
||||
for (SegmentCommitInfo sci : segmentInfos) {
|
||||
if (mergeContext.getMergingSegments().contains(sci) == false) {
|
||||
nonMergingSegments.add(sci);
|
||||
}
|
||||
}
|
||||
if (nonMergingSegments.size() > 1) {
|
||||
MergeSpecification mergeSpecification = new MergeSpecification();
|
||||
mergeSpecification.add(new OneMerge(nonMergingSegments));
|
||||
return mergeSpecification;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
};
|
||||
|
||||
AtomicInteger abandonedMerges = new AtomicInteger(0);
|
||||
IndexWriterConfig iwc = newIndexWriterConfig(new MockAnalyzer(random()))
|
||||
.setMergePolicy(mergeOnCommitPolicy)
|
||||
.setIndexWriterEvents(new IndexWriterEvents() {
|
||||
@Override
|
||||
public void beginMergeOnCommit() {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void finishMergeOnCommit() {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void abandonedMergesOnCommit(int abandonedCount) {
|
||||
abandonedMerges.incrementAndGet();
|
||||
}
|
||||
});
|
||||
IndexWriter writerWithMergePolicy = new IndexWriter(dir, iwc);
|
||||
|
||||
writerWithMergePolicy.commit();
|
||||
|
||||
DirectoryReader unmergedReader = DirectoryReader.open(writerWithMergePolicy);
|
||||
assertEquals(5, unmergedReader.leaves().size()); // Don't merge unless there's a change
|
||||
unmergedReader.close();
|
||||
|
||||
TestIndexWriter.addDoc(writerWithMergePolicy);
|
||||
writerWithMergePolicy.commit();
|
||||
|
||||
DirectoryReader mergedReader = DirectoryReader.open(writerWithMergePolicy);
|
||||
assertEquals(1, mergedReader.leaves().size()); // Now we merge on commit
|
||||
mergedReader.close();
|
||||
|
||||
LineFileDocs lineFileDocs = new LineFileDocs(random());
|
||||
int docCount = atLeast(1000);
|
||||
AtomicInteger indexedDocs = new AtomicInteger(0);
|
||||
int numIndexingThreads = atLeast(2);
|
||||
CountDownLatch startingGun = new CountDownLatch(1);
|
||||
Collection<Thread> indexingThreads = new ArrayList<>();
|
||||
for (int i = 0; i < numIndexingThreads; i++) {
|
||||
Thread t = new Thread(() -> {
|
||||
try {
|
||||
while (indexedDocs.getAndIncrement() < docCount) {
|
||||
writerWithMergePolicy.addDocument(lineFileDocs.nextDoc());
|
||||
if (rarely()) {
|
||||
writerWithMergePolicy.commit();
|
||||
}
|
||||
}
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
fail();
|
||||
}
|
||||
});
|
||||
t.start();
|
||||
indexingThreads.add(t);
|
||||
}
|
||||
startingGun.countDown();
|
||||
for (Thread t : indexingThreads) {
|
||||
t.join();
|
||||
}
|
||||
for (int i = 0; i < 50; i++) {
|
||||
// Wait for pending merges to finish
|
||||
synchronized (writerWithMergePolicy) {
|
||||
if (writerWithMergePolicy.getMergingSegments().isEmpty()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
Thread.sleep(100);
|
||||
}
|
||||
abandonedMerges.set(0);
|
||||
writerWithMergePolicy.commit();
|
||||
if (abandonedMerges.get() == 0) {
|
||||
assertEquals(1, writerWithMergePolicy.listOfSegmentCommitInfos().size());
|
||||
} else {
|
||||
assertNotEquals(1, writerWithMergePolicy.listOfSegmentCommitInfos().size());
|
||||
}
|
||||
|
||||
try (IndexReader reader = writerWithMergePolicy.getReader()) {
|
||||
IndexSearcher searcher = new IndexSearcher(reader);
|
||||
assertEquals(docCount + 6, reader.numDocs());
|
||||
assertEquals(docCount + 6, searcher.count(new MatchAllDocsQuery()));
|
||||
}
|
||||
|
||||
writerWithMergePolicy.close();
|
||||
|
||||
dir.close();
|
||||
}
|
||||
|
||||
private void assertSetters(MergePolicy lmp) {
|
||||
lmp.setMaxCFSSegmentSizeMB(2.0);
|
||||
assertEquals(2.0, lmp.getMaxCFSSegmentSizeMB(), EPSILON);
|
||||
|
|
|
@ -128,6 +128,38 @@ public class MockRandomMergePolicy extends MergePolicy {
|
|||
return findMerges(null, segmentInfos, mergeContext);
|
||||
}
|
||||
|
||||
@Override
|
||||
public MergeSpecification findFullFlushMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext) throws IOException {
|
||||
MergeSpecification mergeSpecification = findMerges(null, segmentInfos, mergeContext);
|
||||
if (mergeSpecification == null) {
|
||||
return null;
|
||||
}
|
||||
// Do not return any merges involving already-merging segments.
|
||||
MergeSpecification filteredMergeSpecification = new MergeSpecification();
|
||||
for (OneMerge oneMerge : mergeSpecification.merges) {
|
||||
boolean filtered = false;
|
||||
List<SegmentCommitInfo> nonMergingSegments = new ArrayList<>();
|
||||
for (SegmentCommitInfo sci : oneMerge.segments) {
|
||||
if (mergeContext.getMergingSegments().contains(sci) == false) {
|
||||
nonMergingSegments.add(sci);
|
||||
} else {
|
||||
filtered = true;
|
||||
}
|
||||
}
|
||||
if (filtered == true) {
|
||||
if (nonMergingSegments.size() > 0) {
|
||||
filteredMergeSpecification.add(new OneMerge(nonMergingSegments));
|
||||
}
|
||||
} else {
|
||||
filteredMergeSpecification.add(oneMerge);
|
||||
}
|
||||
}
|
||||
if (filteredMergeSpecification.merges.size() > 0) {
|
||||
return filteredMergeSpecification;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean useCompoundFile(SegmentInfos infos, SegmentCommitInfo mergedInfo, MergeContext mergeContext) throws IOException {
|
||||
// 80% of the time we create CFS:
|
||||
|
|
|
@ -1006,6 +1006,7 @@ public abstract class LuceneTestCase extends Assert {
|
|||
if (rarely(r)) {
|
||||
c.setCheckPendingFlushUpdate(false);
|
||||
}
|
||||
c.setMaxCommitMergeWaitSeconds(atLeast(r, 1));
|
||||
return c;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue