mirror of https://github.com/apache/lucene.git
Revert "LUCENE-8962: Split test case (#1313)"
This reverts commit90aced5a51
. Revert "LUCENE-8962: woops, remove leftover accidental copyright (darned IDEs)" This reverts commit3dbfd10279
. Revert "LUCENE-8962: Fix intermittent test failures" This reverts commita5475de57f
. Revert "LUCENE-8962: Add ability to selectively merge on commit (#1155)" This reverts commita1791e7714
.
This commit is contained in:
parent
320578274b
commit
4501b3d3fd
|
@ -57,11 +57,6 @@ public class FilterMergePolicy extends MergePolicy {
|
|||
return in.findForcedDeletesMerges(segmentInfos, mergeContext);
|
||||
}
|
||||
|
||||
@Override
|
||||
public MergeSpecification findFullFlushMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext) throws IOException {
|
||||
return in.findFullFlushMerges(mergeTrigger, segmentInfos, mergeContext);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean useCompoundFile(SegmentInfos infos, SegmentCommitInfo mergedInfo, MergeContext mergeContext)
|
||||
throws IOException {
|
||||
|
|
|
@ -32,8 +32,6 @@ import java.util.Map;
|
|||
import java.util.Queue;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
@ -3149,42 +3147,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
|
|||
}
|
||||
}
|
||||
|
||||
private MergePolicy.OneMerge updateSegmentInfosOnMergeFinish(MergePolicy.OneMerge merge, final SegmentInfos toCommit,
|
||||
AtomicReference<CountDownLatch> mergeLatchRef) {
|
||||
return new MergePolicy.OneMerge(merge.segments) {
|
||||
public void mergeFinished() throws IOException {
|
||||
super.mergeFinished();
|
||||
CountDownLatch mergeAwaitLatch = mergeLatchRef.get();
|
||||
if (mergeAwaitLatch == null) {
|
||||
// Commit thread timed out waiting for this merge and moved on. No need to manipulate toCommit.
|
||||
return;
|
||||
}
|
||||
if (committed) {
|
||||
deleter.incRef(this.info.files());
|
||||
// Resolve "live" SegmentInfos segments to their toCommit cloned equivalents, based on segment name.
|
||||
Set<String> mergedSegmentNames = new HashSet<>();
|
||||
for (SegmentCommitInfo sci : this.segments) {
|
||||
deleter.decRef(sci.files());
|
||||
mergedSegmentNames.add(sci.info.name);
|
||||
}
|
||||
List<SegmentCommitInfo> toCommitMergedAwaySegments = new ArrayList<>();
|
||||
for (SegmentCommitInfo sci : toCommit) {
|
||||
if (mergedSegmentNames.contains(sci.info.name)) {
|
||||
toCommitMergedAwaySegments.add(sci);
|
||||
}
|
||||
}
|
||||
// Construct a OneMerge that applies to toCommit
|
||||
MergePolicy.OneMerge applicableMerge = new MergePolicy.OneMerge(toCommitMergedAwaySegments);
|
||||
applicableMerge.info = this.info.clone();
|
||||
long segmentCounter = Long.parseLong(this.info.info.name.substring(1), Character.MAX_RADIX);
|
||||
toCommit.counter = Math.max(toCommit.counter, segmentCounter + 1);
|
||||
toCommit.applyMergeChanges(applicableMerge, false);
|
||||
}
|
||||
mergeAwaitLatch.countDown();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private long prepareCommitInternal() throws IOException {
|
||||
startCommitTime = System.nanoTime();
|
||||
synchronized(commitLock) {
|
||||
|
@ -3207,8 +3169,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
|
|||
SegmentInfos toCommit = null;
|
||||
boolean anyChanges = false;
|
||||
long seqNo;
|
||||
List<MergePolicy.OneMerge> commitMerges = null;
|
||||
AtomicReference<CountDownLatch> mergeAwaitLatchRef = null;
|
||||
|
||||
// This is copied from doFlush, except it's modified to
|
||||
// clone & incRef the flushed SegmentInfos inside the
|
||||
|
@ -3263,30 +3223,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
|
|||
// sneak into the commit point:
|
||||
toCommit = segmentInfos.clone();
|
||||
|
||||
if (anyChanges) {
|
||||
// Find any merges that can execute on commit (per MergePolicy).
|
||||
MergePolicy.MergeSpecification mergeSpec =
|
||||
config.getMergePolicy().findFullFlushMerges(MergeTrigger.COMMIT, segmentInfos, this);
|
||||
if (mergeSpec != null && mergeSpec.merges.size() > 0) {
|
||||
int mergeCount = mergeSpec.merges.size();
|
||||
commitMerges = new ArrayList<>(mergeCount);
|
||||
mergeAwaitLatchRef = new AtomicReference<>(new CountDownLatch(mergeCount));
|
||||
for (MergePolicy.OneMerge oneMerge : mergeSpec.merges) {
|
||||
MergePolicy.OneMerge trackedMerge =
|
||||
updateSegmentInfosOnMergeFinish(oneMerge, toCommit, mergeAwaitLatchRef);
|
||||
if (registerMerge(trackedMerge) == false) {
|
||||
throw new IllegalStateException("MergePolicy " + config.getMergePolicy().getClass() +
|
||||
" returned merging segments from findFullFlushMerges");
|
||||
}
|
||||
commitMerges.add(trackedMerge);
|
||||
}
|
||||
if (infoStream.isEnabled("IW")) {
|
||||
infoStream.message("IW", "Registered " + mergeCount + " commit merges");
|
||||
infoStream.message("IW", "Before executing commit merges, had " + toCommit.size() + " segments");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pendingCommitChangeCount = changeCount.get();
|
||||
|
||||
// This protects the segmentInfos we are now going
|
||||
|
@ -3294,7 +3230,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
|
|||
// we are trying to sync all referenced files, a
|
||||
// merge completes which would otherwise have
|
||||
// removed the files we are now syncing.
|
||||
deleter.incRef(toCommit.files(false));
|
||||
filesToCommit = toCommit.files(false);
|
||||
deleter.incRef(filesToCommit);
|
||||
}
|
||||
success = true;
|
||||
} finally {
|
||||
|
@ -3316,52 +3253,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
|
|||
maybeCloseOnTragicEvent();
|
||||
}
|
||||
|
||||
if (mergeAwaitLatchRef != null) {
|
||||
CountDownLatch mergeAwaitLatch = mergeAwaitLatchRef.get();
|
||||
// If we found and registered any merges above, within the flushLock, then we want to ensure that they
|
||||
// complete execution. Note that since we released the lock, other merges may have been scheduled. We will
|
||||
// block until the merges that we registered complete. As they complete, they will update toCommit to
|
||||
// replace merged segments with the result of each merge.
|
||||
config.getIndexWriterEvents().beginMergeOnCommit();
|
||||
mergeScheduler.merge(this, MergeTrigger.COMMIT, true);
|
||||
long mergeWaitStart = System.nanoTime();
|
||||
int abandonedCount = 0;
|
||||
long waitTimeMillis = (long) (config.getMaxCommitMergeWaitSeconds() * 1000.0);
|
||||
try {
|
||||
if (mergeAwaitLatch.await(waitTimeMillis, TimeUnit.MILLISECONDS) == false) {
|
||||
synchronized (this) {
|
||||
// Need to do this in a synchronized block, to make sure none of our commit merges are currently
|
||||
// executing mergeFinished (since mergeFinished itself is called from within the IndexWriter lock).
|
||||
// After we clear the value from mergeAwaitLatchRef, the merges we schedule will still execute as
|
||||
// usual, but when they finish, they won't attempt to update toCommit or modify segment reference
|
||||
// counts.
|
||||
mergeAwaitLatchRef.set(null);
|
||||
for (MergePolicy.OneMerge commitMerge : commitMerges) {
|
||||
if (runningMerges.contains(commitMerge) || pendingMerges.contains(commitMerge)) {
|
||||
abandonedCount++;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (InterruptedException ie) {
|
||||
throw new ThreadInterruptedException(ie);
|
||||
} finally {
|
||||
if (infoStream.isEnabled("IW")) {
|
||||
infoStream.message("IW", String.format(Locale.ROOT, "Waited %.1f ms for commit merges",
|
||||
(System.nanoTime() - mergeWaitStart)/1_000_000.0));
|
||||
infoStream.message("IW", "After executing commit merges, had " + toCommit.size() + " segments");
|
||||
if (abandonedCount > 0) {
|
||||
infoStream.message("IW", "Abandoned " + abandonedCount + " commit merges after " + waitTimeMillis + " ms");
|
||||
}
|
||||
}
|
||||
if (abandonedCount > 0) {
|
||||
config.getIndexWriterEvents().abandonedMergesOnCommit(abandonedCount);
|
||||
}
|
||||
config.getIndexWriterEvents().finishMergeOnCommit();
|
||||
}
|
||||
}
|
||||
filesToCommit = toCommit.files(false);
|
||||
|
||||
try {
|
||||
if (anyChanges) {
|
||||
maybeMerge.set(true);
|
||||
|
@ -4076,7 +3967,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
|
|||
}
|
||||
|
||||
try (Closeable finalizer = this::checkpoint) {
|
||||
merge.committed = true;
|
||||
// Must close before checkpoint, otherwise IFD won't be
|
||||
// able to delete the held-open files from the merge
|
||||
// readers:
|
||||
|
|
|
@ -111,9 +111,6 @@ public final class IndexWriterConfig extends LiveIndexWriterConfig {
|
|||
/** Default value for whether calls to {@link IndexWriter#close()} include a commit. */
|
||||
public final static boolean DEFAULT_COMMIT_ON_CLOSE = true;
|
||||
|
||||
/** Default value for time to wait for merges on commit (when using a {@link MergePolicy} that implements findFullFlushMerges). */
|
||||
public static final double DEFAULT_MAX_COMMIT_MERGE_WAIT_SECONDS = 30.0;
|
||||
|
||||
// indicates whether this config instance is already attached to a writer.
|
||||
// not final so that it can be cloned properly.
|
||||
private SetOnce<IndexWriter> writer = new SetOnce<>();
|
||||
|
@ -485,24 +482,6 @@ public final class IndexWriterConfig extends LiveIndexWriterConfig {
|
|||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Expert: sets the amount of time to wait for merges returned by MergePolicy.findFullFlushMerges(...).
|
||||
* If this time is reached, we proceed with the commit based on segments merged up to that point.
|
||||
* The merges are not cancelled, and may still run to completion independent of the commit.
|
||||
*/
|
||||
public IndexWriterConfig setMaxCommitMergeWaitSeconds(double maxCommitMergeWaitSeconds) {
|
||||
this.maxCommitMergeWaitSeconds = maxCommitMergeWaitSeconds;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the callback that gets invoked when IndexWriter performs various actions.
|
||||
*/
|
||||
public IndexWriterConfig setIndexWriterEvents(IndexWriterEvents indexWriterEvents) {
|
||||
this.indexWriterEvents = indexWriterEvents;
|
||||
return this;
|
||||
}
|
||||
|
||||
/** We only allow sorting on these types */
|
||||
private static final EnumSet<SortField.Type> ALLOWED_INDEX_SORT_TYPES = EnumSet.of(SortField.Type.STRING,
|
||||
SortField.Type.LONG,
|
||||
|
|
|
@ -1,57 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.lucene.index;
|
||||
|
||||
/**
|
||||
* Callback interface to signal various actions taken by IndexWriter.
|
||||
*
|
||||
* @lucene.experimental
|
||||
*/
|
||||
public interface IndexWriterEvents {
|
||||
/**
|
||||
* A default implementation that ignores all events.
|
||||
*/
|
||||
IndexWriterEvents NULL_EVENTS = new IndexWriterEvents() {
|
||||
@Override
|
||||
public void beginMergeOnCommit() { }
|
||||
|
||||
@Override
|
||||
public void finishMergeOnCommit() { }
|
||||
|
||||
@Override
|
||||
public void abandonedMergesOnCommit(int abandonedCount) { }
|
||||
};
|
||||
|
||||
/**
|
||||
* Signals the start of waiting for a merge on commit, returned from
|
||||
* {@link MergePolicy#findFullFlushMerges(MergeTrigger, SegmentInfos, MergePolicy.MergeContext)}.
|
||||
*/
|
||||
void beginMergeOnCommit();
|
||||
|
||||
/**
|
||||
* Signals the end of waiting for merges on commit. This may be either because the merges completed, or because we timed out according
|
||||
* to the limit set in {@link IndexWriterConfig#setMaxCommitMergeWaitSeconds(double)}.
|
||||
*/
|
||||
void finishMergeOnCommit();
|
||||
|
||||
/**
|
||||
* Called to signal that we abandoned some merges on commit upon reaching the timeout specified in
|
||||
* {@link IndexWriterConfig#setMaxCommitMergeWaitSeconds(double)}.
|
||||
*/
|
||||
void abandonedMergesOnCommit(int abandonedCount);
|
||||
}
|
|
@ -113,12 +113,6 @@ public class LiveIndexWriterConfig {
|
|||
/** soft deletes field */
|
||||
protected String softDeletesField = null;
|
||||
|
||||
/** Amount of time to wait for merges returned by MergePolicy.findFullFlushMerges(...) */
|
||||
protected volatile double maxCommitMergeWaitSeconds;
|
||||
|
||||
/** Callback interface called on index writer actions. */
|
||||
protected IndexWriterEvents indexWriterEvents;
|
||||
|
||||
|
||||
// used by IndexWriterConfig
|
||||
LiveIndexWriterConfig(Analyzer analyzer) {
|
||||
|
@ -143,8 +137,6 @@ public class LiveIndexWriterConfig {
|
|||
readerPooling = IndexWriterConfig.DEFAULT_READER_POOLING;
|
||||
indexerThreadPool = new DocumentsWriterPerThreadPool();
|
||||
perThreadHardLimitMB = IndexWriterConfig.DEFAULT_RAM_PER_THREAD_HARD_LIMIT_MB;
|
||||
maxCommitMergeWaitSeconds = IndexWriterConfig.DEFAULT_MAX_COMMIT_MERGE_WAIT_SECONDS;
|
||||
indexWriterEvents = IndexWriterEvents.NULL_EVENTS;
|
||||
}
|
||||
|
||||
/** Returns the default analyzer to use for indexing documents. */
|
||||
|
@ -484,22 +476,6 @@ public class LiveIndexWriterConfig {
|
|||
return softDeletesField;
|
||||
}
|
||||
|
||||
/**
|
||||
* Expert: return the amount of time to wait for merges returned by by MergePolicy.findFullFlushMerges(...).
|
||||
* If this time is reached, we proceed with the commit based on segments merged up to that point.
|
||||
* The merges are not cancelled, and may still run to completion independent of the commit.
|
||||
*/
|
||||
public double getMaxCommitMergeWaitSeconds() {
|
||||
return maxCommitMergeWaitSeconds;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a callback used to signal actions taken by the {@link IndexWriter}.
|
||||
*/
|
||||
public IndexWriterEvents getIndexWriterEvents() {
|
||||
return indexWriterEvents;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
|
@ -524,8 +500,6 @@ public class LiveIndexWriterConfig {
|
|||
sb.append("indexSort=").append(getIndexSort()).append("\n");
|
||||
sb.append("checkPendingFlushOnUpdate=").append(isCheckPendingFlushOnUpdate()).append("\n");
|
||||
sb.append("softDeletesField=").append(getSoftDeletesField()).append("\n");
|
||||
sb.append("maxCommitMergeWaitSeconds=").append(getMaxCommitMergeWaitSeconds()).append("\n");
|
||||
sb.append("indexWriterEvents=").append(getIndexWriterEvents().getClass().getName()).append("\n");
|
||||
return sb.toString();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -225,8 +225,6 @@ public abstract class MergePolicy {
|
|||
public final int totalMaxDoc;
|
||||
Throwable error;
|
||||
|
||||
boolean committed; // Set by IndexWriter once the merge has been committed to disk
|
||||
|
||||
/** Sole constructor.
|
||||
* @param segments List of {@link SegmentCommitInfo}s
|
||||
* to be merged. */
|
||||
|
@ -512,7 +510,7 @@ public abstract class MergePolicy {
|
|||
* an original segment present in the
|
||||
* to-be-merged index; else, it was a segment
|
||||
* produced by a cascaded merge.
|
||||
* @param mergeContext the MergeContext to find the merges on
|
||||
* @param mergeContext the IndexWriter to find the merges on
|
||||
*/
|
||||
public abstract MergeSpecification findForcedMerges(
|
||||
SegmentInfos segmentInfos, int maxSegmentCount, Map<SegmentCommitInfo,Boolean> segmentsToMerge, MergeContext mergeContext)
|
||||
|
@ -523,33 +521,11 @@ public abstract class MergePolicy {
|
|||
* deletes from the index.
|
||||
* @param segmentInfos
|
||||
* the total set of segments in the index
|
||||
* @param mergeContext the MergeContext to find the merges on
|
||||
* @param mergeContext the IndexWriter to find the merges on
|
||||
*/
|
||||
public abstract MergeSpecification findForcedDeletesMerges(
|
||||
SegmentInfos segmentInfos, MergeContext mergeContext) throws IOException;
|
||||
|
||||
/**
|
||||
* Identifies merges that we want to execute (synchronously) on commit. By default, do not synchronously merge on commit.
|
||||
*
|
||||
* Any merges returned here will make {@link IndexWriter#commit()} or {@link IndexWriter#prepareCommit()} block until
|
||||
* the merges complete or until {@link IndexWriterConfig#getMaxCommitMergeWaitSeconds()} have elapsed. This may be
|
||||
* used to merge small segments that have just been flushed as part of the commit, reducing the number of segments in
|
||||
* the commit. If a merge does not complete in the allotted time, it will continue to execute, but will not be reflected
|
||||
* in the commit.
|
||||
*
|
||||
* If a {@link OneMerge} in the returned {@link MergeSpecification} includes a segment already included in a registered
|
||||
* merge, then {@link IndexWriter#commit()} or {@link IndexWriter#prepareCommit()} will throw a {@link IllegalStateException}.
|
||||
* Use {@link MergeContext#getMergingSegments()} to determine which segments are currently registered to merge.
|
||||
*
|
||||
* @param mergeTrigger the event that triggered the merge (COMMIT or FULL_FLUSH).
|
||||
* @param segmentInfos the total set of segments in the index (while preparing the commit)
|
||||
* @param mergeContext the MergeContext to find the merges on, which should be used to determine which segments are
|
||||
* already in a registered merge (see {@link MergeContext#getMergingSegments()}).
|
||||
*/
|
||||
public MergeSpecification findFullFlushMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext) throws IOException {
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if a new segment (regardless of its origin) should use the
|
||||
* compound file format. The default implementation returns <code>true</code>
|
||||
|
|
|
@ -47,10 +47,5 @@ public enum MergeTrigger {
|
|||
/**
|
||||
* Merge was triggered by a closing IndexWriter.
|
||||
*/
|
||||
CLOSING,
|
||||
|
||||
/**
|
||||
* Merge was triggered on commit.
|
||||
*/
|
||||
COMMIT,
|
||||
CLOSING
|
||||
}
|
||||
|
|
|
@ -45,9 +45,6 @@ public final class NoMergePolicy extends MergePolicy {
|
|||
@Override
|
||||
public MergeSpecification findForcedDeletesMerges(SegmentInfos segmentInfos, MergeContext mergeContext) { return null; }
|
||||
|
||||
@Override
|
||||
public MergeSpecification findFullFlushMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext) { return null; }
|
||||
|
||||
@Override
|
||||
public boolean useCompoundFile(SegmentInfos segments, SegmentCommitInfo newSegment, MergeContext mergeContext) {
|
||||
return newSegment.info.getUseCompoundFile();
|
||||
|
|
|
@ -59,11 +59,6 @@ public class OneMergeWrappingMergePolicy extends FilterMergePolicy {
|
|||
return wrapSpec(in.findForcedDeletesMerges(segmentInfos, mergeContext));
|
||||
}
|
||||
|
||||
@Override
|
||||
public MergeSpecification findFullFlushMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext) throws IOException {
|
||||
return wrapSpec(in.findFullFlushMerges(mergeTrigger, segmentInfos, mergeContext));
|
||||
}
|
||||
|
||||
private MergeSpecification wrapSpec(MergeSpecification spec) {
|
||||
MergeSpecification wrapped = spec == null ? null : new MergeSpecification();
|
||||
if (wrapped != null) {
|
||||
|
|
|
@ -18,42 +18,17 @@ package org.apache.lucene.index;
|
|||
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.lucene.analysis.MockAnalyzer;
|
||||
import org.apache.lucene.document.Document;
|
||||
import org.apache.lucene.document.Field;
|
||||
import org.apache.lucene.index.IndexWriterConfig.OpenMode;
|
||||
import org.apache.lucene.search.IndexSearcher;
|
||||
import org.apache.lucene.search.MatchAllDocsQuery;
|
||||
import org.apache.lucene.store.Directory;
|
||||
|
||||
import org.apache.lucene.util.LuceneTestCase;
|
||||
|
||||
public class TestIndexWriterMergePolicy extends LuceneTestCase {
|
||||
|
||||
private static final MergePolicy MERGE_ON_COMMIT_POLICY = new LogDocMergePolicy() {
|
||||
@Override
|
||||
public MergeSpecification findFullFlushMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext) {
|
||||
// Optimize down to a single segment on commit
|
||||
if (mergeTrigger == MergeTrigger.COMMIT && segmentInfos.size() > 1) {
|
||||
List<SegmentCommitInfo> nonMergingSegments = new ArrayList<>();
|
||||
for (SegmentCommitInfo sci : segmentInfos) {
|
||||
if (mergeContext.getMergingSegments().contains(sci) == false) {
|
||||
nonMergingSegments.add(sci);
|
||||
}
|
||||
}
|
||||
if (nonMergingSegments.size() > 1) {
|
||||
MergeSpecification mergeSpecification = new MergeSpecification();
|
||||
mergeSpecification.add(new OneMerge(nonMergingSegments));
|
||||
return mergeSpecification;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
};
|
||||
|
||||
// Test the normal case
|
||||
public void testNormalCase() throws IOException {
|
||||
Directory dir = newDirectory();
|
||||
|
@ -303,49 +278,6 @@ public class TestIndexWriterMergePolicy extends LuceneTestCase {
|
|||
assertSetters(new LogDocMergePolicy());
|
||||
}
|
||||
|
||||
// Test basic semantics of merge on commit
|
||||
public void testMergeOnCommit() throws IOException {
|
||||
Directory dir = newDirectory();
|
||||
|
||||
IndexWriter firstWriter = new IndexWriter(dir, newIndexWriterConfig(new MockAnalyzer(random()))
|
||||
.setMergePolicy(NoMergePolicy.INSTANCE));
|
||||
for (int i = 0; i < 5; i++) {
|
||||
TestIndexWriter.addDoc(firstWriter);
|
||||
firstWriter.flush();
|
||||
}
|
||||
DirectoryReader firstReader = DirectoryReader.open(firstWriter);
|
||||
assertEquals(5, firstReader.leaves().size());
|
||||
firstReader.close();
|
||||
firstWriter.close(); // When this writer closes, it does not merge on commit.
|
||||
|
||||
IndexWriterConfig iwc = newIndexWriterConfig(new MockAnalyzer(random()))
|
||||
.setMergePolicy(MERGE_ON_COMMIT_POLICY);
|
||||
|
||||
IndexWriter writerWithMergePolicy = new IndexWriter(dir, iwc);
|
||||
writerWithMergePolicy.commit(); // No changes. Commit doesn't trigger a merge.
|
||||
|
||||
DirectoryReader unmergedReader = DirectoryReader.open(writerWithMergePolicy);
|
||||
assertEquals(5, unmergedReader.leaves().size());
|
||||
unmergedReader.close();
|
||||
|
||||
TestIndexWriter.addDoc(writerWithMergePolicy);
|
||||
writerWithMergePolicy.commit(); // Doc added, do merge on commit.
|
||||
assertEquals(1, writerWithMergePolicy.getSegmentCount()); //
|
||||
|
||||
DirectoryReader mergedReader = DirectoryReader.open(writerWithMergePolicy);
|
||||
assertEquals(1, mergedReader.leaves().size());
|
||||
mergedReader.close();
|
||||
|
||||
try (IndexReader reader = writerWithMergePolicy.getReader()) {
|
||||
IndexSearcher searcher = new IndexSearcher(reader);
|
||||
assertEquals(6, reader.numDocs());
|
||||
assertEquals(6, searcher.count(new MatchAllDocsQuery()));
|
||||
}
|
||||
|
||||
writerWithMergePolicy.close();
|
||||
dir.close();
|
||||
}
|
||||
|
||||
private void assertSetters(MergePolicy lmp) {
|
||||
lmp.setMaxCFSSegmentSizeMB(2.0);
|
||||
assertEquals(2.0, lmp.getMaxCFSSegmentSizeMB(), EPSILON);
|
||||
|
|
|
@ -128,38 +128,6 @@ public class MockRandomMergePolicy extends MergePolicy {
|
|||
return findMerges(null, segmentInfos, mergeContext);
|
||||
}
|
||||
|
||||
@Override
|
||||
public MergeSpecification findFullFlushMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext) throws IOException {
|
||||
MergeSpecification mergeSpecification = findMerges(null, segmentInfos, mergeContext);
|
||||
if (mergeSpecification == null) {
|
||||
return null;
|
||||
}
|
||||
// Do not return any merges involving already-merging segments.
|
||||
MergeSpecification filteredMergeSpecification = new MergeSpecification();
|
||||
for (OneMerge oneMerge : mergeSpecification.merges) {
|
||||
boolean filtered = false;
|
||||
List<SegmentCommitInfo> nonMergingSegments = new ArrayList<>();
|
||||
for (SegmentCommitInfo sci : oneMerge.segments) {
|
||||
if (mergeContext.getMergingSegments().contains(sci) == false) {
|
||||
nonMergingSegments.add(sci);
|
||||
} else {
|
||||
filtered = true;
|
||||
}
|
||||
}
|
||||
if (filtered == true) {
|
||||
if (nonMergingSegments.size() > 0) {
|
||||
filteredMergeSpecification.add(new OneMerge(nonMergingSegments));
|
||||
}
|
||||
} else {
|
||||
filteredMergeSpecification.add(oneMerge);
|
||||
}
|
||||
}
|
||||
if (filteredMergeSpecification.merges.size() > 0) {
|
||||
return filteredMergeSpecification;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean useCompoundFile(SegmentInfos infos, SegmentCommitInfo mergedInfo, MergeContext mergeContext) throws IOException {
|
||||
// 80% of the time we create CFS:
|
||||
|
|
|
@ -1003,7 +1003,6 @@ public abstract class LuceneTestCase extends Assert {
|
|||
if (rarely(r)) {
|
||||
c.setCheckPendingFlushUpdate(false);
|
||||
}
|
||||
c.setMaxCommitMergeWaitSeconds(atLeast(r, 1));
|
||||
return c;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue