diff --git a/lucene/core/src/java/org/apache/lucene/index/FilterMergePolicy.java b/lucene/core/src/java/org/apache/lucene/index/FilterMergePolicy.java index eb634b48a6b..b4e33f8f6b4 100644 --- a/lucene/core/src/java/org/apache/lucene/index/FilterMergePolicy.java +++ b/lucene/core/src/java/org/apache/lucene/index/FilterMergePolicy.java @@ -57,6 +57,11 @@ public class FilterMergePolicy extends MergePolicy { return in.findForcedDeletesMerges(segmentInfos, mergeContext); } + @Override + public MergeSpecification findFullFlushMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext) throws IOException { + return in.findFullFlushMerges(mergeTrigger, segmentInfos, mergeContext); + } + @Override public boolean useCompoundFile(SegmentInfos infos, SegmentCommitInfo mergedInfo, MergeContext mergeContext) throws IOException { diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java index 81551799f5b..ad7a157c869 100644 --- a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java +++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java @@ -32,6 +32,8 @@ import java.util.Map; import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -3147,6 +3149,42 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable, } } + private MergePolicy.OneMerge updateSegmentInfosOnMergeFinish(MergePolicy.OneMerge merge, final SegmentInfos toCommit, + AtomicReference mergeLatchRef) { + return new MergePolicy.OneMerge(merge.segments) { + public void mergeFinished() throws IOException { + super.mergeFinished(); + CountDownLatch mergeAwaitLatch = mergeLatchRef.get(); + if (mergeAwaitLatch == null) { + // Commit thread timed out waiting for this merge and moved on. No need to manipulate toCommit. + return; + } + if (isAborted() == false) { + deleter.incRef(this.info.files()); + // Resolve "live" SegmentInfos segments to their toCommit cloned equivalents, based on segment name. + Set mergedSegmentNames = new HashSet<>(); + for (SegmentCommitInfo sci : this.segments) { + deleter.decRef(sci.files()); + mergedSegmentNames.add(sci.info.name); + } + List toCommitMergedAwaySegments = new ArrayList<>(); + for (SegmentCommitInfo sci : toCommit) { + if (mergedSegmentNames.contains(sci.info.name)) { + toCommitMergedAwaySegments.add(sci); + } + } + // Construct a OneMerge that applies to toCommit + MergePolicy.OneMerge applicableMerge = new MergePolicy.OneMerge(toCommitMergedAwaySegments); + applicableMerge.info = this.info.clone(); + long segmentCounter = Long.parseLong(this.info.info.name.substring(1), Character.MAX_RADIX); + toCommit.counter = Math.max(toCommit.counter, segmentCounter + 1); + toCommit.applyMergeChanges(applicableMerge, false); + } + mergeAwaitLatch.countDown(); + } + }; + } + private long prepareCommitInternal() throws IOException { startCommitTime = System.nanoTime(); synchronized(commitLock) { @@ -3169,6 +3207,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable, SegmentInfos toCommit = null; boolean anyChanges = false; long seqNo; + List commitMerges = null; + AtomicReference mergeAwaitLatchRef = null; // This is copied from doFlush, except it's modified to // clone & incRef the flushed SegmentInfos inside the @@ -3223,6 +3263,30 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable, // sneak into the commit point: toCommit = segmentInfos.clone(); + if (anyChanges) { + // Find any merges that can execute on commit (per MergePolicy). + MergePolicy.MergeSpecification mergeSpec = + config.getMergePolicy().findFullFlushMerges(MergeTrigger.COMMIT, segmentInfos, this); + if (mergeSpec != null && mergeSpec.merges.size() > 0) { + int mergeCount = mergeSpec.merges.size(); + commitMerges = new ArrayList<>(mergeCount); + mergeAwaitLatchRef = new AtomicReference<>(new CountDownLatch(mergeCount)); + for (MergePolicy.OneMerge oneMerge : mergeSpec.merges) { + MergePolicy.OneMerge trackedMerge = + updateSegmentInfosOnMergeFinish(oneMerge, toCommit, mergeAwaitLatchRef); + if (registerMerge(trackedMerge) == false) { + throw new IllegalStateException("MergePolicy " + config.getMergePolicy().getClass() + + " returned merging segments from findFullFlushMerges"); + } + commitMerges.add(trackedMerge); + } + if (infoStream.isEnabled("IW")) { + infoStream.message("IW", "Registered " + mergeCount + " commit merges"); + infoStream.message("IW", "Before executing commit merges, had " + toCommit.size() + " segments"); + } + } + } + pendingCommitChangeCount = changeCount.get(); // This protects the segmentInfos we are now going @@ -3230,8 +3294,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable, // we are trying to sync all referenced files, a // merge completes which would otherwise have // removed the files we are now syncing. - filesToCommit = toCommit.files(false); - deleter.incRef(filesToCommit); + deleter.incRef(toCommit.files(false)); } success = true; } finally { @@ -3252,6 +3315,53 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable, } finally { maybeCloseOnTragicEvent(); } + + if (mergeAwaitLatchRef != null) { + CountDownLatch mergeAwaitLatch = mergeAwaitLatchRef.get(); + // If we found and registered any merges above, within the flushLock, then we want to ensure that they + // complete execution. Note that since we released the lock, other merges may have been scheduled. We will + // block until the merges that we registered complete. As they complete, they will update toCommit to + // replace merged segments with the result of each merge. + config.getIndexWriterEvents().beginMergeOnCommit(); + mergeScheduler.merge(this, MergeTrigger.COMMIT, true); + long mergeWaitStart = System.nanoTime(); + int abandonedCount = 0; + long waitTimeMillis = (long) (config.getMaxCommitMergeWaitSeconds() * 1000.0); + try { + if (mergeAwaitLatch.await(waitTimeMillis, TimeUnit.MILLISECONDS) == false) { + synchronized (this) { + // Need to do this in a synchronized block, to make sure none of our commit merges are currently + // executing mergeFinished (since mergeFinished itself is called from within the IndexWriter lock). + // After we clear the value from mergeAwaitLatchRef, the merges we schedule will still execute as + // usual, but when they finish, they won't attempt to update toCommit or modify segment reference + // counts. + mergeAwaitLatchRef.set(null); + for (MergePolicy.OneMerge commitMerge : commitMerges) { + if (runningMerges.contains(commitMerge) || pendingMerges.contains(commitMerge)) { + abandonedCount++; + } + } + } + } + } catch (InterruptedException e) { + Thread.interrupted(); + throw new IOException("Interrupted waiting for merges"); + } finally { + if (infoStream.isEnabled("IW")) { + infoStream.message("IW", String.format(Locale.ROOT, "Waited %.1f ms for commit merges", + (System.nanoTime() - mergeWaitStart)/1_000_000.0)); + infoStream.message("IW", "After executing commit merges, had " + toCommit.size() + " segments"); + if (abandonedCount > 0) { + infoStream.message("IW", "Abandoned " + abandonedCount + " commit merges after " + waitTimeMillis + " ms"); + } + } + if (abandonedCount > 0) { + config.getIndexWriterEvents().abandonedMergesOnCommit(abandonedCount); + } + config.getIndexWriterEvents().finishMergeOnCommit(); + } + } + filesToCommit = toCommit.files(false); try { if (anyChanges) { diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java index 3137f69b1b8..638ec0a39bd 100644 --- a/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java +++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java @@ -112,6 +112,9 @@ public final class IndexWriterConfig extends LiveIndexWriterConfig { /** Default value for whether calls to {@link IndexWriter#close()} include a commit. */ public final static boolean DEFAULT_COMMIT_ON_CLOSE = true; + + /** Default value for time to wait for merges on commit (when using a {@link MergePolicy} that implements findFullFlushMerges). */ + public static final double DEFAULT_MAX_COMMIT_MERGE_WAIT_SECONDS = 30.0; // indicates whether this config instance is already attached to a writer. // not final so that it can be cloned properly. @@ -484,6 +487,24 @@ public final class IndexWriterConfig extends LiveIndexWriterConfig { return this; } + /** + * Expert: sets the amount of time to wait for merges returned by MergePolicy.findFullFlushMerges(...). + * If this time is reached, we proceed with the commit based on segments merged up to that point. + * The merges are not cancelled, and may still run to completion independent of the commit. + */ + public IndexWriterConfig setMaxCommitMergeWaitSeconds(double maxCommitMergeWaitSeconds) { + this.maxCommitMergeWaitSeconds = maxCommitMergeWaitSeconds; + return this; + } + + /** + * Set the callback that gets invoked when IndexWriter performs various actions. + */ + public IndexWriterConfig setIndexWriterEvents(IndexWriterEvents indexWriterEvents) { + this.indexWriterEvents = indexWriterEvents; + return this; + } + /** We only allow sorting on these types */ private static final EnumSet ALLOWED_INDEX_SORT_TYPES = EnumSet.of(SortField.Type.STRING, SortField.Type.LONG, diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriterEvents.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriterEvents.java new file mode 100644 index 00000000000..dd4d7a29424 --- /dev/null +++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriterEvents.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + +package org.apache.lucene.index; + +/** + * Callback interface to signal various actions taken by IndexWriter. + * + * @lucene.experimental + */ +public interface IndexWriterEvents { + /** + * A default implementation that ignores all events. + */ + IndexWriterEvents NULL_EVENTS = new IndexWriterEvents() { + @Override + public void beginMergeOnCommit() { } + + @Override + public void finishMergeOnCommit() { } + + @Override + public void abandonedMergesOnCommit(int abandonedCount) { } + }; + + /** + * Signals the start of waiting for a merge on commit, returned from + * {@link MergePolicy#findFullFlushMerges(MergeTrigger, SegmentInfos, MergePolicy.MergeContext)}. + */ + void beginMergeOnCommit(); + + /** + * Signals the end of waiting for merges on commit. This may be either because the merges completed, or because we timed out according + * to the limit set in {@link IndexWriterConfig#setMaxCommitMergeWaitSeconds(double)}. + */ + void finishMergeOnCommit(); + + /** + * Called to signal that we abandoned some merges on commit upon reaching the timeout specified in + * {@link IndexWriterConfig#setMaxCommitMergeWaitSeconds(double)}. + */ + void abandonedMergesOnCommit(int abandonedCount); +} diff --git a/lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java b/lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java index fbd72ecddb5..7236b33504a 100644 --- a/lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java +++ b/lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java @@ -117,6 +117,12 @@ public class LiveIndexWriterConfig { /** the attributes for the NRT readers */ protected Map readerAttributes = Collections.emptyMap(); + /** Amount of time to wait for merges returned by MergePolicy.findFullFlushMerges(...) */ + protected volatile double maxCommitMergeWaitSeconds; + + /** Callback interface called on index writer actions. */ + protected IndexWriterEvents indexWriterEvents; + // used by IndexWriterConfig LiveIndexWriterConfig(Analyzer analyzer) { @@ -141,6 +147,8 @@ public class LiveIndexWriterConfig { readerPooling = IndexWriterConfig.DEFAULT_READER_POOLING; indexerThreadPool = new DocumentsWriterPerThreadPool(); perThreadHardLimitMB = IndexWriterConfig.DEFAULT_RAM_PER_THREAD_HARD_LIMIT_MB; + maxCommitMergeWaitSeconds = IndexWriterConfig.DEFAULT_MAX_COMMIT_MERGE_WAIT_SECONDS; + indexWriterEvents = IndexWriterEvents.NULL_EVENTS; } /** Returns the default analyzer to use for indexing documents. */ @@ -480,6 +488,22 @@ public class LiveIndexWriterConfig { return softDeletesField; } + /** + * Expert: return the amount of time to wait for merges returned by by MergePolicy.findFullFlushMerges(...). + * If this time is reached, we proceed with the commit based on segments merged up to that point. + * The merges are not cancelled, and may still run to completion independent of the commit. + */ + public double getMaxCommitMergeWaitSeconds() { + return maxCommitMergeWaitSeconds; + } + + /** + * Returns a callback used to signal actions taken by the {@link IndexWriter}. + */ + public IndexWriterEvents getIndexWriterEvents() { + return indexWriterEvents; + } + @Override public String toString() { StringBuilder sb = new StringBuilder(); @@ -505,6 +529,8 @@ public class LiveIndexWriterConfig { sb.append("checkPendingFlushOnUpdate=").append(isCheckPendingFlushOnUpdate()).append("\n"); sb.append("softDeletesField=").append(getSoftDeletesField()).append("\n"); sb.append("readerAttributes=").append(getReaderAttributes()).append("\n"); + sb.append("maxCommitMergeWaitSeconds=").append(getMaxCommitMergeWaitSeconds()).append("\n"); + sb.append("indexWriterEvents=").append(getIndexWriterEvents().getClass().getName()).append("\n"); return sb.toString(); } diff --git a/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java b/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java index d9d6b0b431c..c191c6571ce 100644 --- a/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java +++ b/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java @@ -510,7 +510,7 @@ public abstract class MergePolicy { * an original segment present in the * to-be-merged index; else, it was a segment * produced by a cascaded merge. - * @param mergeContext the IndexWriter to find the merges on + * @param mergeContext the MergeContext to find the merges on */ public abstract MergeSpecification findForcedMerges( SegmentInfos segmentInfos, int maxSegmentCount, Map segmentsToMerge, MergeContext mergeContext) @@ -521,11 +521,33 @@ public abstract class MergePolicy { * deletes from the index. * @param segmentInfos * the total set of segments in the index - * @param mergeContext the IndexWriter to find the merges on + * @param mergeContext the MergeContext to find the merges on */ public abstract MergeSpecification findForcedDeletesMerges( SegmentInfos segmentInfos, MergeContext mergeContext) throws IOException; + /** + * Identifies merges that we want to execute (synchronously) on commit. By default, do not synchronously merge on commit. + * + * Any merges returned here will make {@link IndexWriter#commit()} or {@link IndexWriter#prepareCommit()} block until + * the merges complete or until {@link IndexWriterConfig#getMaxCommitMergeWaitSeconds()} have elapsed. This may be + * used to merge small segments that have just been flushed as part of the commit, reducing the number of segments in + * the commit. If a merge does not complete in the allotted time, it will continue to execute, but will not be reflected + * in the commit. + * + * If a {@link OneMerge} in the returned {@link MergeSpecification} includes a segment already included in a registered + * merge, then {@link IndexWriter#commit()} or {@link IndexWriter#prepareCommit()} will throw a {@link IllegalStateException}. + * Use {@link MergeContext#getMergingSegments()} to determine which segments are currently registered to merge. + * + * @param mergeTrigger the event that triggered the merge (COMMIT or FULL_FLUSH). + * @param segmentInfos the total set of segments in the index (while preparing the commit) + * @param mergeContext the MergeContext to find the merges on, which should be used to determine which segments are + * already in a registered merge (see {@link MergeContext#getMergingSegments()}). + */ + public MergeSpecification findFullFlushMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext) throws IOException { + return null; + } + /** * Returns true if a new segment (regardless of its origin) should use the * compound file format. The default implementation returns true diff --git a/lucene/core/src/java/org/apache/lucene/index/MergeTrigger.java b/lucene/core/src/java/org/apache/lucene/index/MergeTrigger.java index d165a27008f..01a6b15a035 100644 --- a/lucene/core/src/java/org/apache/lucene/index/MergeTrigger.java +++ b/lucene/core/src/java/org/apache/lucene/index/MergeTrigger.java @@ -47,5 +47,10 @@ public enum MergeTrigger { /** * Merge was triggered by a closing IndexWriter. */ - CLOSING + CLOSING, + + /** + * Merge was triggered on commit. + */ + COMMIT, } diff --git a/lucene/core/src/java/org/apache/lucene/index/NoMergePolicy.java b/lucene/core/src/java/org/apache/lucene/index/NoMergePolicy.java index 1480ce458fe..b209e8aedcf 100644 --- a/lucene/core/src/java/org/apache/lucene/index/NoMergePolicy.java +++ b/lucene/core/src/java/org/apache/lucene/index/NoMergePolicy.java @@ -45,6 +45,9 @@ public final class NoMergePolicy extends MergePolicy { @Override public MergeSpecification findForcedDeletesMerges(SegmentInfos segmentInfos, MergeContext mergeContext) { return null; } + @Override + public MergeSpecification findFullFlushMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext) { return null; } + @Override public boolean useCompoundFile(SegmentInfos segments, SegmentCommitInfo newSegment, MergeContext mergeContext) { return newSegment.info.getUseCompoundFile(); diff --git a/lucene/core/src/java/org/apache/lucene/index/OneMergeWrappingMergePolicy.java b/lucene/core/src/java/org/apache/lucene/index/OneMergeWrappingMergePolicy.java index d08711eb061..a5fd66a7c0a 100644 --- a/lucene/core/src/java/org/apache/lucene/index/OneMergeWrappingMergePolicy.java +++ b/lucene/core/src/java/org/apache/lucene/index/OneMergeWrappingMergePolicy.java @@ -59,6 +59,11 @@ public class OneMergeWrappingMergePolicy extends FilterMergePolicy { return wrapSpec(in.findForcedDeletesMerges(segmentInfos, mergeContext)); } + @Override + public MergeSpecification findFullFlushMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext) throws IOException { + return wrapSpec(in.findFullFlushMerges(mergeTrigger, segmentInfos, mergeContext)); + } + private MergeSpecification wrapSpec(MergeSpecification spec) { MergeSpecification wrapped = spec == null ? null : new MergeSpecification(); if (wrapped != null) { diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterMergePolicy.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterMergePolicy.java index ce591a280c6..16d7a51e75e 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterMergePolicy.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterMergePolicy.java @@ -18,13 +18,21 @@ package org.apache.lucene.index; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.lucene.analysis.MockAnalyzer; import org.apache.lucene.document.Document; import org.apache.lucene.document.Field; import org.apache.lucene.index.IndexWriterConfig.OpenMode; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.store.Directory; +import org.apache.lucene.util.LineFileDocs; import org.apache.lucene.util.LuceneTestCase; public class TestIndexWriterMergePolicy extends LuceneTestCase { @@ -278,6 +286,130 @@ public class TestIndexWriterMergePolicy extends LuceneTestCase { assertSetters(new LogDocMergePolicy()); } + public void testMergeOnCommit() throws IOException, InterruptedException { + Directory dir = newDirectory(); + + IndexWriter firstWriter = new IndexWriter(dir, newIndexWriterConfig(new MockAnalyzer(random())) + .setMergePolicy(NoMergePolicy.INSTANCE)); + for (int i = 0; i < 5; i++) { + TestIndexWriter.addDoc(firstWriter); + firstWriter.flush(); + } + DirectoryReader firstReader = DirectoryReader.open(firstWriter); + assertEquals(5, firstReader.leaves().size()); + firstReader.close(); + firstWriter.close(); + + MergePolicy mergeOnCommitPolicy = new LogDocMergePolicy() { + @Override + public MergeSpecification findFullFlushMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext) { + // Optimize down to a single segment on commit + if (mergeTrigger == MergeTrigger.COMMIT && segmentInfos.size() > 1) { + List nonMergingSegments = new ArrayList<>(); + for (SegmentCommitInfo sci : segmentInfos) { + if (mergeContext.getMergingSegments().contains(sci) == false) { + nonMergingSegments.add(sci); + } + } + if (nonMergingSegments.size() > 1) { + MergeSpecification mergeSpecification = new MergeSpecification(); + mergeSpecification.add(new OneMerge(nonMergingSegments)); + return mergeSpecification; + } + } + return null; + } + }; + + AtomicInteger abandonedMerges = new AtomicInteger(0); + IndexWriterConfig iwc = newIndexWriterConfig(new MockAnalyzer(random())) + .setMergePolicy(mergeOnCommitPolicy) + .setIndexWriterEvents(new IndexWriterEvents() { + @Override + public void beginMergeOnCommit() { + + } + + @Override + public void finishMergeOnCommit() { + + } + + @Override + public void abandonedMergesOnCommit(int abandonedCount) { + abandonedMerges.incrementAndGet(); + } + }); + IndexWriter writerWithMergePolicy = new IndexWriter(dir, iwc); + + writerWithMergePolicy.commit(); + + DirectoryReader unmergedReader = DirectoryReader.open(writerWithMergePolicy); + assertEquals(5, unmergedReader.leaves().size()); // Don't merge unless there's a change + unmergedReader.close(); + + TestIndexWriter.addDoc(writerWithMergePolicy); + writerWithMergePolicy.commit(); + + DirectoryReader mergedReader = DirectoryReader.open(writerWithMergePolicy); + assertEquals(1, mergedReader.leaves().size()); // Now we merge on commit + mergedReader.close(); + + LineFileDocs lineFileDocs = new LineFileDocs(random()); + int docCount = atLeast(1000); + AtomicInteger indexedDocs = new AtomicInteger(0); + int numIndexingThreads = atLeast(2); + CountDownLatch startingGun = new CountDownLatch(1); + Collection indexingThreads = new ArrayList<>(); + for (int i = 0; i < numIndexingThreads; i++) { + Thread t = new Thread(() -> { + try { + while (indexedDocs.getAndIncrement() < docCount) { + writerWithMergePolicy.addDocument(lineFileDocs.nextDoc()); + if (rarely()) { + writerWithMergePolicy.commit(); + } + } + } catch (IOException e) { + e.printStackTrace(); + fail(); + } + }); + t.start(); + indexingThreads.add(t); + } + startingGun.countDown(); + for (Thread t : indexingThreads) { + t.join(); + } + for (int i = 0; i < 50; i++) { + // Wait for pending merges to finish + synchronized (writerWithMergePolicy) { + if (writerWithMergePolicy.getMergingSegments().isEmpty()) { + break; + } + } + Thread.sleep(100); + } + abandonedMerges.set(0); + writerWithMergePolicy.commit(); + if (abandonedMerges.get() == 0) { + assertEquals(1, writerWithMergePolicy.listOfSegmentCommitInfos().size()); + } else { + assertNotEquals(1, writerWithMergePolicy.listOfSegmentCommitInfos().size()); + } + + try (IndexReader reader = writerWithMergePolicy.getReader()) { + IndexSearcher searcher = new IndexSearcher(reader); + assertEquals(docCount + 6, reader.numDocs()); + assertEquals(docCount + 6, searcher.count(new MatchAllDocsQuery())); + } + + writerWithMergePolicy.close(); + + dir.close(); + } + private void assertSetters(MergePolicy lmp) { lmp.setMaxCFSSegmentSizeMB(2.0); assertEquals(2.0, lmp.getMaxCFSSegmentSizeMB(), EPSILON); diff --git a/lucene/test-framework/src/java/org/apache/lucene/index/MockRandomMergePolicy.java b/lucene/test-framework/src/java/org/apache/lucene/index/MockRandomMergePolicy.java index beb4dad0357..92ffc732a29 100644 --- a/lucene/test-framework/src/java/org/apache/lucene/index/MockRandomMergePolicy.java +++ b/lucene/test-framework/src/java/org/apache/lucene/index/MockRandomMergePolicy.java @@ -128,6 +128,38 @@ public class MockRandomMergePolicy extends MergePolicy { return findMerges(null, segmentInfos, mergeContext); } + @Override + public MergeSpecification findFullFlushMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext) throws IOException { + MergeSpecification mergeSpecification = findMerges(null, segmentInfos, mergeContext); + if (mergeSpecification == null) { + return null; + } + // Do not return any merges involving already-merging segments. + MergeSpecification filteredMergeSpecification = new MergeSpecification(); + for (OneMerge oneMerge : mergeSpecification.merges) { + boolean filtered = false; + List nonMergingSegments = new ArrayList<>(); + for (SegmentCommitInfo sci : oneMerge.segments) { + if (mergeContext.getMergingSegments().contains(sci) == false) { + nonMergingSegments.add(sci); + } else { + filtered = true; + } + } + if (filtered == true) { + if (nonMergingSegments.size() > 0) { + filteredMergeSpecification.add(new OneMerge(nonMergingSegments)); + } + } else { + filteredMergeSpecification.add(oneMerge); + } + } + if (filteredMergeSpecification.merges.size() > 0) { + return filteredMergeSpecification; + } + return null; + } + @Override public boolean useCompoundFile(SegmentInfos infos, SegmentCommitInfo mergedInfo, MergeContext mergeContext) throws IOException { // 80% of the time we create CFS: diff --git a/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java b/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java index 476c4c013c1..c2846f65320 100644 --- a/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java +++ b/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java @@ -1004,6 +1004,7 @@ public abstract class LuceneTestCase extends Assert { if (rarely(r)) { c.setCheckPendingFlushUpdate(false); } + c.setMaxCommitMergeWaitSeconds(atLeast(r, 1)); return c; }