From 7f352a96659b4d304ae3ef0c7a1b62784ce2a406 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Sat, 27 Jun 2020 22:25:45 +0200 Subject: [PATCH] LUCENE-8962: Merge small segments on commit (#1617) Add IndexWriter merge-on-commit feature to selectively merge small segments on commit, subject to a configurable timeout, to improve search performance by reducing the number of small segments for searching. Co-authored-by: Michael Froh Co-authored-by: Michael Sokolov Co-authored-by: Mike McCandless --- lucene/CHANGES.txt | 4 + .../lucene/index/FilterMergePolicy.java | 5 + .../org/apache/lucene/index/IndexWriter.java | 280 +++++++++++++----- .../lucene/index/IndexWriterConfig.java | 18 ++ .../lucene/index/LiveIndexWriterConfig.java | 13 + .../org/apache/lucene/index/MergePolicy.java | 97 +++++- .../org/apache/lucene/index/MergeTrigger.java | 7 +- .../apache/lucene/index/NoMergePolicy.java | 3 + .../index/OneMergeWrappingMergePolicy.java | 5 + .../lucene/index/ReadersAndUpdates.java | 14 +- .../java/org/apache/lucene/util/IOUtils.java | 10 + .../index/TestDemoParallelLeafReader.java | 5 +- .../apache/lucene/index/TestIndexWriter.java | 169 +++++++---- .../index/TestIndexWriterMergePolicy.java | 240 ++++++++++++++- .../apache/lucene/index/TestMergePolicy.java | 10 +- .../search/TestPhraseWildcardQuery.java | 7 +- .../lucene/index/MockRandomMergePolicy.java | 32 ++ .../apache/lucene/util/LuceneTestCase.java | 1 + 18 files changed, 749 insertions(+), 171 deletions(-) diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt index 3b15facde7e..df9704c3b3a 100644 --- a/lucene/CHANGES.txt +++ b/lucene/CHANGES.txt @@ -418,6 +418,10 @@ Improvements * LUCENE-9253: KoreanTokenizer now supports custom dictionaries(system, unknown). (Namgyu Kim) +* LUCENE-8962: Add IndexWriter merge-on-commit feature to selectively merge small segments on commit, + subject to a configurable timeout, to improve search performance by reducing the number of small + segments for searching (Michael Froh, Mike Sokolov, Mike Mccandless, Simon Willnauer) + * LUCENE-9171: QueryBuilder can now use BoostAttributes on input token streams to selectively boost particular terms or synonyms in parsed queries. (Alessandro Benedetti, Alan Woodward) diff --git a/lucene/core/src/java/org/apache/lucene/index/FilterMergePolicy.java b/lucene/core/src/java/org/apache/lucene/index/FilterMergePolicy.java index eb634b48a6b..b4e33f8f6b4 100644 --- a/lucene/core/src/java/org/apache/lucene/index/FilterMergePolicy.java +++ b/lucene/core/src/java/org/apache/lucene/index/FilterMergePolicy.java @@ -57,6 +57,11 @@ public class FilterMergePolicy extends MergePolicy { return in.findForcedDeletesMerges(segmentInfos, mergeContext); } + @Override + public MergeSpecification findFullFlushMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext) throws IOException { + return in.findFullFlushMerges(mergeTrigger, segmentInfos, mergeContext); + } + @Override public boolean useCompoundFile(SegmentInfos infos, SegmentCommitInfo mergedInfo, MergeContext mergeContext) throws IOException { diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java index fd1722022a7..6c977940890 100644 --- a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java +++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java @@ -29,11 +29,11 @@ import java.util.LinkedList; import java.util.List; import java.util.Locale; import java.util.Map; -import java.util.Objects; import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -2167,7 +2167,13 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable, } } } else { - spec = mergePolicy.findMerges(trigger, segmentInfos, this); + switch (trigger) { + case COMMIT: + spec = mergePolicy.findFullFlushMerges(trigger, segmentInfos, this); + break; + default: + spec = mergePolicy.findMerges(trigger, segmentInfos, this); + } } if (spec != null) { final int numMerges = spec.merges.size(); @@ -2462,15 +2468,15 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable, /** Aborts running merges. Be careful when using this * method: when you abort a long-running merge, you lose * a lot of work that must later be redone. */ - private synchronized void abortMerges() { + private synchronized void abortMerges() throws IOException { // Abort all pending & running merges: - for (final MergePolicy.OneMerge merge : pendingMerges) { + IOUtils.applyToAll(pendingMerges, merge -> { if (infoStream.isEnabled("IW")) { infoStream.message("IW", "now abort pending merge " + segString(merge.segments)); } - merge.setAborted(); + abortOneMerge(merge); mergeFinish(merge); - } + }); pendingMerges.clear(); for (final MergePolicy.OneMerge merge : runningMerges) { @@ -3173,7 +3179,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable, SegmentInfos toCommit = null; boolean anyChanges = false; long seqNo; - + MergePolicy.MergeSpecification onCommitMerges = null; + AtomicBoolean includeInCommit = new AtomicBoolean(true); + final long maxCommitMergeWaitMillis = config.getMaxCommitMergeWaitMillis(); // This is copied from doFlush, except it's modified to // clone & incRef the flushed SegmentInfos inside the // sync block: @@ -3226,16 +3234,18 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable, // corresponding add from an updateDocument) can // sneak into the commit point: toCommit = segmentInfos.clone(); - pendingCommitChangeCount = changeCount.get(); - // This protects the segmentInfos we are now going // to commit. This is important in case, eg, while // we are trying to sync all referenced files, a // merge completes which would otherwise have - // removed the files we are now syncing. - filesToCommit = toCommit.files(false); - deleter.incRef(filesToCommit); + // removed the files we are now syncing. + deleter.incRef(toCommit.files(false)); + if (anyChanges && maxCommitMergeWaitMillis > 0) { + // we can safely call prepareOnCommitMerge since writeReaderPool(true) above wrote all + // necessary files to disk and checkpointed them. + onCommitMerges = prepareOnCommitMerge(toCommit, includeInCommit); + } } success = true; } finally { @@ -3256,7 +3266,24 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable, } finally { maybeCloseOnTragicEvent(); } - + + if (onCommitMerges != null) { + if (infoStream.isEnabled("IW")) { + infoStream.message("IW", "now run merges during commit: " + onCommitMerges.segString(directory)); + } + mergeScheduler.merge(mergeSource, MergeTrigger.COMMIT); + onCommitMerges.await(maxCommitMergeWaitMillis, TimeUnit.MILLISECONDS); + if (infoStream.isEnabled("IW")) { + infoStream.message("IW", "done waiting for merges during commit"); + } + synchronized (this) { + // we need to call this under lock since mergeFinished above is also called under the IW lock + includeInCommit.set(false); + } + } + // do this after handling any onCommitMerges since the files will have changed if any merges + // did complete + filesToCommit = toCommit.files(false); try { if (anyChanges) { maybeMerge.set(true); @@ -3284,6 +3311,120 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable, } } + /** + * This optimization allows a commit to wait for merges on smallish segments to + * reduce the eventual number of tiny segments in the commit point. We wrap a {@code OneMerge} to + * update the {@code committingSegmentInfos} once the merge has finished. We replace the source segments + * in the SIS that we are going to commit with the freshly merged segment, but ignore all deletions and updates + * that are made to documents in the merged segment while it was merging. The updates that are made do not belong to + * the point-in-time commit point and should therefore not be included. See the clone call in {@code onMergeComplete} + * below. We also ensure that we pull the merge readers while holding {@code IndexWriter}'s lock. Otherwise + * we could see concurrent deletions/updates applied that do not belong to the segment. + */ + private MergePolicy.MergeSpecification prepareOnCommitMerge(SegmentInfos committingSegmentInfos, AtomicBoolean includeInCommit) throws IOException { + assert Thread.holdsLock(this); + MergePolicy.MergeSpecification onCommitMerges = updatePendingMerges(new OneMergeWrappingMergePolicy(config.getMergePolicy(), toWrap -> + new MergePolicy.OneMerge(toWrap.segments) { + SegmentCommitInfo origInfo; + AtomicBoolean onlyOnce = new AtomicBoolean(false); + + @Override + public void mergeFinished(boolean committed, boolean segmentDropped) throws IOException { + assert Thread.holdsLock(IndexWriter.this); + + // includedInCommit will be set (above, by our caller) to false if the allowed max wall clock + // time (IWC.getMaxCommitMergeWaitMillis()) has elapsed, which means we did not make the timeout + // and will not commit our merge to the to-be-commited SegmentInfos + + if (segmentDropped == false + && committed + && includeInCommit.get()) { + + if (infoStream.isEnabled("IW")) { + infoStream.message("IW", "now apply merge during commit: " + toWrap.segString()); + } + + // make sure onMergeComplete really was called: + assert origInfo != null; + + deleter.incRef(origInfo.files()); + Set mergedSegmentNames = new HashSet<>(); + for (SegmentCommitInfo sci : segments) { + mergedSegmentNames.add(sci.info.name); + } + List toCommitMergedAwaySegments = new ArrayList<>(); + for (SegmentCommitInfo sci : committingSegmentInfos) { + if (mergedSegmentNames.contains(sci.info.name)) { + toCommitMergedAwaySegments.add(sci); + deleter.decRef(sci.files()); + } + } + // Construct a OneMerge that applies to toCommit + MergePolicy.OneMerge applicableMerge = new MergePolicy.OneMerge(toCommitMergedAwaySegments); + applicableMerge.info = origInfo; + long segmentCounter = Long.parseLong(origInfo.info.name.substring(1), Character.MAX_RADIX); + committingSegmentInfos.counter = Math.max(committingSegmentInfos.counter, segmentCounter + 1); + committingSegmentInfos.applyMergeChanges(applicableMerge, false); + } else { + if (infoStream.isEnabled("IW")) { + infoStream.message("IW", "skip apply merge during commit: " + toWrap.segString()); + } + } + toWrap.mergeFinished(committed, false); + super.mergeFinished(committed, segmentDropped); + } + + @Override + void onMergeComplete() { + // clone the target info to make sure we have the original info without the updated del and update gens + origInfo = info.clone(); + } + + @Override + void initMergeReaders(IOUtils.IOFunction readerFactory) throws IOException { + if (onlyOnce.compareAndSet(false, true)) { + // we do this only once below to pull readers as point in time readers with respect to the commit point + // we try to update + super.initMergeReaders(readerFactory); + } + } + + @Override + public CodecReader wrapForMerge(CodecReader reader) throws IOException { + return toWrap.wrapForMerge(reader); // must delegate + } + } + ), MergeTrigger.COMMIT, UNBOUNDED_MAX_MERGE_SEGMENTS); + if (onCommitMerges != null) { + boolean closeReaders = true; + try { + for (MergePolicy.OneMerge merge : onCommitMerges.merges) { + IOContext context = new IOContext(merge.getStoreMergeInfo()); + merge.initMergeReaders( + sci -> { + final ReadersAndUpdates rld = getPooledInstance(sci, true); + // calling setIsMerging is important since it causes the RaU to record all DV updates + // in a separate map in order to be applied to the merged segment after it's done + rld.setIsMerging(); + return rld.getReaderForMerge(context); + }); + } + closeReaders = false; + } finally { + if (closeReaders) { + IOUtils.applyToAll(onCommitMerges.merges, merge -> { + // that merge is broken we need to clean up after it - it's fine we still have the IW lock to do this + boolean removed = pendingMerges.remove(merge); + assert removed: "merge should be pending but isn't: " + merge.segString(); + abortOneMerge(merge); + mergeFinish(merge); + }); + } + } + } + return onCommitMerges; + } + /** * Ensures that all changes in the reader-pool are written to disk. * @param writeDeletes if true if deletes should be written to disk too. @@ -3697,7 +3838,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable, MergeState.DocMap segDocMap = mergeState.docMaps[i]; MergeState.DocMap segLeafDocMap = mergeState.leafDocMaps[i]; - carryOverHardDeletes(mergedDeletesAndUpdates, maxDoc, mergeState.liveDocs[i], merge.hardLiveDocs.get(i), rld.getHardLiveDocs(), + carryOverHardDeletes(mergedDeletesAndUpdates, maxDoc, mergeState.liveDocs[i], merge.getMergeReader().get(i).hardLiveDocs, rld.getHardLiveDocs(), segDocMap, segLeafDocMap); // Now carry over all doc values updates that were resolved while we were merging, remapping the docIDs to the newly merged docIDs. @@ -3850,7 +3991,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable, @SuppressWarnings("try") private synchronized boolean commitMerge(MergePolicy.OneMerge merge, MergeState mergeState) throws IOException { - + merge.onMergeComplete(); testPoint("startCommitMerge"); if (tragedy.get() != null) { @@ -3964,7 +4105,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable, // Must close before checkpoint, otherwise IFD won't be // able to delete the held-open files from the merge // readers: - closeMergeReaders(merge, false); + closeMergeReaders(merge, false, dropSegment); } if (infoStream.isEnabled("IW")) { @@ -4026,11 +4167,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable, try { try { mergeInit(merge); - if (infoStream.isEnabled("IW")) { infoStream.message("IW", "now merge\n merge=" + segString(merge.segments) + "\n index=" + segString()); } - mergeMiddle(merge, mergePolicy); mergeSuccess(merge); success = true; @@ -4039,7 +4178,11 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable, } } finally { synchronized(this) { - + // Readers are already closed in commitMerge if we didn't hit + // an exc: + if (success == false) { + closeMergeReaders(merge, true, false); + } mergeFinish(merge); if (success == false) { @@ -4071,6 +4214,11 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable, /** Hook that's called when the specified merge is complete. */ protected void mergeSuccess(MergePolicy.OneMerge merge) {} + private void abortOneMerge(MergePolicy.OneMerge merge) throws IOException { + merge.setAborted(); + closeMergeReaders(merge, true, false); + } + /** Checks whether this merge involves any segments * already participating in a merge. If not, this merge * is "registered", meaning we record that its segments @@ -4085,7 +4233,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable, assert merge.segments.size() > 0; if (stopMerges) { - merge.setAborted(); + abortOneMerge(merge); throw new MergePolicy.MergeAbortedException("merge is aborted: " + segString(merge.segments)); } @@ -4286,30 +4434,28 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable, } @SuppressWarnings("try") - private synchronized void closeMergeReaders(MergePolicy.OneMerge merge, boolean suppressExceptions) throws IOException { + private synchronized void closeMergeReaders(MergePolicy.OneMerge merge, boolean suppressExceptions, boolean droppedSegment) throws IOException { if (merge.hasFinished() == false) { final boolean drop = suppressExceptions == false; - try (Closeable finalizer = () -> merge.mergeFinished(suppressExceptions == false)) { - IOUtils.applyToAll(merge.readers, sr -> { - final ReadersAndUpdates rld = getPooledInstance(sr.getOriginalSegmentInfo(), false); - // We still hold a ref so it should not have been removed: - assert rld != null; - if (drop) { - rld.dropChanges(); - } else { - rld.dropMergingUpdates(); - } - rld.release(sr); - release(rld); - if (drop) { - readerPool.drop(rld.info); - } - }); - } finally { - Collections.fill(merge.readers, null); - } + // first call mergeFinished before we potentially drop the reader and the last reference. + merge.close(suppressExceptions == false, droppedSegment, mr -> { + final SegmentReader sr = mr.reader; + final ReadersAndUpdates rld = getPooledInstance(sr.getOriginalSegmentInfo(), false); + // We still hold a ref so it should not have been removed: + assert rld != null; + if (drop) { + rld.dropChanges(); + } else { + rld.dropMergingUpdates(); + } + rld.release(sr); + release(rld); + if (drop) { + readerPool.drop(rld.info); + } + }); } else { - assert merge.readers.stream().filter(Objects::nonNull).count() == 0 : "we are done but still have readers: " + merge.readers; + assert merge.getMergeReader().isEmpty() : "we are done but still have readers: " + merge.getMergeReader(); assert suppressExceptions : "can't be done and not suppressing exceptions"; } } @@ -4352,8 +4498,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable, merge.checkAborted(); Directory mergeDirectory = mergeScheduler.wrapForMerge(merge, directory); - List sourceSegments = merge.segments; - IOContext context = new IOContext(merge.getStoreMergeInfo()); final TrackingDirectoryWrapper dirWrapper = new TrackingDirectoryWrapper(mergeDirectory); @@ -4362,45 +4506,25 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable, infoStream.message("IW", "merging " + segString(merge.segments)); } - merge.readers = new ArrayList<>(sourceSegments.size()); - merge.hardLiveDocs = new ArrayList<>(sourceSegments.size()); - // This is try/finally to make sure merger's readers are // closed: boolean success = false; try { - int segUpto = 0; - while(segUpto < sourceSegments.size()) { - - final SegmentCommitInfo info = sourceSegments.get(segUpto); - - // Hold onto the "live" reader; we will use this to - // commit merged deletes - final ReadersAndUpdates rld = getPooledInstance(info, true); + merge.initMergeReaders(sci -> { + final ReadersAndUpdates rld = getPooledInstance(sci, true); rld.setIsMerging(); - - ReadersAndUpdates.MergeReader mr = rld.getReaderForMerge(context); - SegmentReader reader = mr.reader; - - if (infoStream.isEnabled("IW")) { - infoStream.message("IW", "seg=" + segString(info) + " reader=" + reader); - } - - merge.hardLiveDocs.add(mr.hardLiveDocs); - merge.readers.add(reader); - segUpto++; - } - + return rld.getReaderForMerge(context); + }); // Let the merge wrap readers List mergeReaders = new ArrayList<>(); Counter softDeleteCount = Counter.newCounter(false); - for (int r = 0; r < merge.readers.size(); r++) { - SegmentReader reader = merge.readers.get(r); + for (MergePolicy.MergeReader mergeReader : merge.getMergeReader()) { + SegmentReader reader = mergeReader.reader; CodecReader wrappedReader = merge.wrapForMerge(reader); validateMergeReader(wrappedReader); if (softDeletesEnabled) { if (reader != wrappedReader) { // if we don't have a wrapped reader we won't preserve any soft-deletes - Bits hardLiveDocs = merge.hardLiveDocs.get(r); + Bits hardLiveDocs = mergeReader.hardLiveDocs; if (hardLiveDocs != null) { // we only need to do this accounting if we have mixed deletes Bits wrappedLiveDocs = wrappedReader.getLiveDocs(); Counter hardDeleteCounter = Counter.newCounter(false); @@ -4433,7 +4557,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable, } final SegmentMerger merger = new SegmentMerger(mergeReaders, merge.info.info, infoStream, dirWrapper, - globalFieldNumberMap, + globalFieldNumberMap, context); merge.info.setSoftDelCount(Math.toIntExact(softDeleteCount.get())); merge.checkAborted(); @@ -4454,8 +4578,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable, String pauseInfo = merge.getMergeProgress().getPauseTimes().entrySet() .stream() .filter((e) -> e.getValue() > 0) - .map((e) -> String.format(Locale.ROOT, "%.1f sec %s", - e.getValue() / 1000000000., + .map((e) -> String.format(Locale.ROOT, "%.1f sec %s", + e.getValue() / 1000000000., e.getKey().name().toLowerCase(Locale.ROOT))) .collect(Collectors.joining(", ")); if (!pauseInfo.isEmpty()) { @@ -4467,9 +4591,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable, double segmentMB = (merge.info.sizeInBytes()/1024./1024.); infoStream.message("IW", "merge codec=" + codec + " maxDoc=" + merge.info.info.maxDoc() + "; merged segment has " + (mergeState.mergeFieldInfos.hasVectors() ? "vectors" : "no vectors") + "; " + - (mergeState.mergeFieldInfos.hasNorms() ? "norms" : "no norms") + "; " + - (mergeState.mergeFieldInfos.hasDocValues() ? "docValues" : "no docValues") + "; " + - (mergeState.mergeFieldInfos.hasProx() ? "prox" : "no prox") + "; " + + (mergeState.mergeFieldInfos.hasNorms() ? "norms" : "no norms") + "; " + + (mergeState.mergeFieldInfos.hasDocValues() ? "docValues" : "no docValues") + "; " + + (mergeState.mergeFieldInfos.hasProx() ? "prox" : "no prox") + "; " + (mergeState.mergeFieldInfos.hasProx() ? "freqs" : "no freqs") + "; " + (mergeState.mergeFieldInfos.hasPointValues() ? "points" : "no points") + "; " + String.format(Locale.ROOT, @@ -4579,7 +4703,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable, // TODO: ideally we would freeze merge.info here!! // because any changes after writing the .si will be - // lost... + // lost... if (infoStream.isEnabled("IW")) { infoStream.message("IW", String.format(Locale.ROOT, "merged segment size=%.3f MB vs estimate=%.3f MB", merge.info.sizeInBytes()/1024./1024., merge.estimatedMergeBytes/1024/1024.)); @@ -4611,7 +4735,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable, // Readers are already closed in commitMerge if we didn't hit // an exc: if (success == false) { - closeMergeReaders(merge, true); + closeMergeReaders(merge, true, false); } } diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java index 26e7e3dbb6a..6dcdf839b19 100644 --- a/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java +++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java @@ -109,6 +109,9 @@ public final class IndexWriterConfig extends LiveIndexWriterConfig { /** Default value for whether calls to {@link IndexWriter#close()} include a commit. */ public final static boolean DEFAULT_COMMIT_ON_CLOSE = true; + + /** Default value for time to wait for merges on commit (when using a {@link MergePolicy} that implements {@link MergePolicy#findFullFlushMerges}). */ + public static final long DEFAULT_MAX_COMMIT_MERGE_WAIT_MILLIS = 0; // indicates whether this config instance is already attached to a writer. // not final so that it can be cloned properly. @@ -459,6 +462,21 @@ public final class IndexWriterConfig extends LiveIndexWriterConfig { return this; } + /** + * Expert: sets the amount of time to wait for merges (during {@link IndexWriter#commit}) returned by + * MergePolicy.findFullFlushMerges(...). + * If this time is reached, we proceed with the commit based on segments merged up to that point. + * The merges are not cancelled, and will still run to completion independent of the commit, + * like natural segment merges. The default is {@value IndexWriterConfig#DEFAULT_MAX_COMMIT_MERGE_WAIT_MILLIS}. + * + * Note: This settings has no effect unless {@link MergePolicy#findFullFlushMerges(MergeTrigger, SegmentInfos, MergePolicy.MergeContext)} + * has an implementation that actually returns merges which by default doesn't return any merges. + */ + public IndexWriterConfig setMaxCommitMergeWaitMillis(long maxCommitMergeWaitMillis) { + this.maxCommitMergeWaitMillis = maxCommitMergeWaitMillis; + return this; + } + /** * Set the {@link Sort} order to use for all (flushed and merged) segments. */ diff --git a/lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java b/lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java index 1f48acc8d5f..1450331829c 100644 --- a/lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java +++ b/lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java @@ -109,6 +109,8 @@ public class LiveIndexWriterConfig { /** soft deletes field */ protected String softDeletesField = null; + /** Amount of time to wait for merges returned by MergePolicy.findFullFlushMerges(...) */ + protected volatile long maxCommitMergeWaitMillis; // used by IndexWriterConfig LiveIndexWriterConfig(Analyzer analyzer) { @@ -132,6 +134,7 @@ public class LiveIndexWriterConfig { flushPolicy = new FlushByRamOrCountsPolicy(); readerPooling = IndexWriterConfig.DEFAULT_READER_POOLING; perThreadHardLimitMB = IndexWriterConfig.DEFAULT_RAM_PER_THREAD_HARD_LIMIT_MB; + maxCommitMergeWaitMillis = IndexWriterConfig.DEFAULT_MAX_COMMIT_MERGE_WAIT_MILLIS; } /** Returns the default analyzer to use for indexing documents. */ @@ -461,6 +464,15 @@ public class LiveIndexWriterConfig { return softDeletesField; } + /** + * Expert: return the amount of time to wait for merges returned by by MergePolicy.findFullFlushMerges(...). + * If this time is reached, we proceed with the commit based on segments merged up to that point. + * The merges are not cancelled, and may still run to completion independent of the commit. + */ + public long getMaxCommitMergeWaitMillis() { + return maxCommitMergeWaitMillis; + } + @Override public String toString() { StringBuilder sb = new StringBuilder(); @@ -484,6 +496,7 @@ public class LiveIndexWriterConfig { sb.append("indexSort=").append(getIndexSort()).append("\n"); sb.append("checkPendingFlushOnUpdate=").append(isCheckPendingFlushOnUpdate()).append("\n"); sb.append("softDeletesField=").append(getSoftDeletesField()).append("\n"); + sb.append("maxCommitMergeWaitMillis=").append(getMaxCommitMergeWaitMillis()).append("\n"); return sb.toString(); } } diff --git a/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java b/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java index 91d2302ff93..796d3b8b480 100644 --- a/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java +++ b/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java @@ -41,6 +41,7 @@ import org.apache.lucene.store.Directory; import org.apache.lucene.store.MergeInfo; import org.apache.lucene.util.Bits; import org.apache.lucene.util.IOSupplier; +import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.InfoStream; import org.apache.lucene.util.ThreadInterruptedException; @@ -215,8 +216,7 @@ public abstract class MergePolicy { // Sum of sizeInBytes of all SegmentInfos; set by IW.mergeInit volatile long totalMergeBytes; - List readers; // used by IndexWriter - List hardLiveDocs; // used by IndexWriter + private List mergeReaders; // used by IndexWriter /** Segments to be merged. */ public final List segments; @@ -243,6 +243,7 @@ public abstract class MergePolicy { this.segments = List.copyOf(segments); totalMaxDoc = segments.stream().mapToInt(i -> i.info.maxDoc()).sum(); mergeProgress = new OneMergeProgress(); + mergeReaders = List.of(); } /** @@ -254,11 +255,27 @@ public abstract class MergePolicy { } /** Called by {@link IndexWriter} after the merge is done and all readers have been closed. - * @param success true iff the merge finished successfully ie. was committed */ - public void mergeFinished(boolean success) throws IOException { + * @param success true iff the merge finished successfully ie. was committed + * @param segmentDropped true iff the merged segment was dropped since it was fully deleted + */ + public void mergeFinished(boolean success, boolean segmentDropped) throws IOException { + } + + /** + * Closes this merge and releases all merge readers + */ + final void close(boolean success, boolean segmentDropped, IOUtils.IOConsumer readerConsumer) throws IOException { + // this method is final to ensure we never miss a super call to cleanup and finish the merge if (mergeCompleted.complete(success) == false) { throw new IllegalStateException("merge has already finished"); } + try { + mergeFinished(success, segmentDropped); + } finally { + final List readers = mergeReaders; + mergeReaders = List.of(); + IOUtils.applyToAll(readers, readerConsumer); + } } /** Wrap the reader in order to add/remove information to the merged segment. */ @@ -399,6 +416,40 @@ public abstract class MergePolicy { Optional hasCompletedSuccessfully() { return Optional.ofNullable(mergeCompleted.getNow(null)); } + + + /** + * Called just before the merge is applied to IndexWriter's SegmentInfos + */ + void onMergeComplete() { + } + + /** + * Sets the merge readers for this merge. + */ + void initMergeReaders(IOUtils.IOFunction readerFactory) throws IOException { + assert mergeReaders.isEmpty() : "merge readers must be empty"; + assert mergeCompleted.isDone() == false : "merge is already done"; + final ArrayList readers = new ArrayList<>(segments.size()); + try { + for (final SegmentCommitInfo info : segments) { + // Hold onto the "live" reader; we will use this to + // commit merged deletes + readers.add(readerFactory.apply(info)); + } + } finally { + // ensure we assign this to close them in the case of an exception + this.mergeReaders = List.copyOf(readers); // we do a copy here to ensure that mergeReaders are an immutable list + } + } + + /** + * Returns the merge readers or an empty list if the readers were not initialized yet. + */ + List getMergeReader() { + return mergeReaders; + } + } /** @@ -553,7 +604,7 @@ public abstract class MergePolicy { * an original segment present in the * to-be-merged index; else, it was a segment * produced by a cascaded merge. - * @param mergeContext the IndexWriter to find the merges on + * @param mergeContext the MergeContext to find the merges on */ public abstract MergeSpecification findForcedMerges( SegmentInfos segmentInfos, int maxSegmentCount, Map segmentsToMerge, MergeContext mergeContext) @@ -564,11 +615,35 @@ public abstract class MergePolicy { * deletes from the index. * @param segmentInfos * the total set of segments in the index - * @param mergeContext the IndexWriter to find the merges on + * @param mergeContext the MergeContext to find the merges on */ public abstract MergeSpecification findForcedDeletesMerges( SegmentInfos segmentInfos, MergeContext mergeContext) throws IOException; + /** + * Identifies merges that we want to execute (synchronously) on commit. By default, this will do no merging on commit. + * If you implement this method in your {@code MergePolicy} you must also set a non-zero timeout using + * {@link IndexWriterConfig#setMaxCommitMergeWaitMillis}. + * + * Any merges returned here will make {@link IndexWriter#commit()} or {@link IndexWriter#prepareCommit()} block until + * the merges complete or until {@link IndexWriterConfig#getMaxCommitMergeWaitMillis()} has elapsed. This may be + * used to merge small segments that have just been flushed as part of the commit, reducing the number of segments in + * the commit. If a merge does not complete in the allotted time, it will continue to execute, and eventually finish and + * apply to future commits, but will not be reflected in the current commit. + * + * If a {@link OneMerge} in the returned {@link MergeSpecification} includes a segment already included in a registered + * merge, then {@link IndexWriter#commit()} or {@link IndexWriter#prepareCommit()} will throw a {@link IllegalStateException}. + * Use {@link MergeContext#getMergingSegments()} to determine which segments are currently registered to merge. + * + * @param mergeTrigger the event that triggered the merge (COMMIT or FULL_FLUSH). + * @param segmentInfos the total set of segments in the index (while preparing the commit) + * @param mergeContext the MergeContext to find the merges on, which should be used to determine which segments are + * already in a registered merge (see {@link MergeContext#getMergingSegments()}). + */ + public MergeSpecification findFullFlushMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext) throws IOException { + return null; + } + /** * Returns true if a new segment (regardless of its origin) should use the * compound file format. The default implementation returns true @@ -745,4 +820,14 @@ public abstract class MergePolicy { */ Set getMergingSegments(); } + + final static class MergeReader { + final SegmentReader reader; + final Bits hardLiveDocs; + + MergeReader(SegmentReader reader, Bits hardLiveDocs) { + this.reader = reader; + this.hardLiveDocs = hardLiveDocs; + } + } } diff --git a/lucene/core/src/java/org/apache/lucene/index/MergeTrigger.java b/lucene/core/src/java/org/apache/lucene/index/MergeTrigger.java index d165a27008f..01a6b15a035 100644 --- a/lucene/core/src/java/org/apache/lucene/index/MergeTrigger.java +++ b/lucene/core/src/java/org/apache/lucene/index/MergeTrigger.java @@ -47,5 +47,10 @@ public enum MergeTrigger { /** * Merge was triggered by a closing IndexWriter. */ - CLOSING + CLOSING, + + /** + * Merge was triggered on commit. + */ + COMMIT, } diff --git a/lucene/core/src/java/org/apache/lucene/index/NoMergePolicy.java b/lucene/core/src/java/org/apache/lucene/index/NoMergePolicy.java index 1480ce458fe..b209e8aedcf 100644 --- a/lucene/core/src/java/org/apache/lucene/index/NoMergePolicy.java +++ b/lucene/core/src/java/org/apache/lucene/index/NoMergePolicy.java @@ -45,6 +45,9 @@ public final class NoMergePolicy extends MergePolicy { @Override public MergeSpecification findForcedDeletesMerges(SegmentInfos segmentInfos, MergeContext mergeContext) { return null; } + @Override + public MergeSpecification findFullFlushMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext) { return null; } + @Override public boolean useCompoundFile(SegmentInfos segments, SegmentCommitInfo newSegment, MergeContext mergeContext) { return newSegment.info.getUseCompoundFile(); diff --git a/lucene/core/src/java/org/apache/lucene/index/OneMergeWrappingMergePolicy.java b/lucene/core/src/java/org/apache/lucene/index/OneMergeWrappingMergePolicy.java index d08711eb061..a5fd66a7c0a 100644 --- a/lucene/core/src/java/org/apache/lucene/index/OneMergeWrappingMergePolicy.java +++ b/lucene/core/src/java/org/apache/lucene/index/OneMergeWrappingMergePolicy.java @@ -59,6 +59,11 @@ public class OneMergeWrappingMergePolicy extends FilterMergePolicy { return wrapSpec(in.findForcedDeletesMerges(segmentInfos, mergeContext)); } + @Override + public MergeSpecification findFullFlushMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext) throws IOException { + return wrapSpec(in.findFullFlushMerges(mergeTrigger, segmentInfos, mergeContext)); + } + private MergeSpecification wrapSpec(MergeSpecification spec) { MergeSpecification wrapped = spec == null ? null : new MergeSpecification(); if (wrapped != null) { diff --git a/lucene/core/src/java/org/apache/lucene/index/ReadersAndUpdates.java b/lucene/core/src/java/org/apache/lucene/index/ReadersAndUpdates.java index b0ee8d684f0..505f08fb911 100644 --- a/lucene/core/src/java/org/apache/lucene/index/ReadersAndUpdates.java +++ b/lucene/core/src/java/org/apache/lucene/index/ReadersAndUpdates.java @@ -695,18 +695,8 @@ final class ReadersAndUpdates { return isMerging; } - final static class MergeReader { - final SegmentReader reader; - final Bits hardLiveDocs; - - MergeReader(SegmentReader reader, Bits hardLiveDocs) { - this.reader = reader; - this.hardLiveDocs = hardLiveDocs; - } - } - /** Returns a reader for merge, with the latest doc values updates and deletions. */ - synchronized MergeReader getReaderForMerge(IOContext context) throws IOException { + synchronized MergePolicy.MergeReader getReaderForMerge(IOContext context) throws IOException { // We must carry over any still-pending DV updates because they were not // successfully written, e.g. because there was a hole in the delGens, @@ -728,7 +718,7 @@ final class ReadersAndUpdates { reader = createNewReaderWithLatestLiveDocs(reader); } assert pendingDeletes.verifyDocCounts(reader); - return new MergeReader(reader, pendingDeletes.getHardLiveDocs()); + return new MergePolicy.MergeReader(reader, pendingDeletes.getHardLiveDocs()); } /** diff --git a/lucene/core/src/java/org/apache/lucene/util/IOUtils.java b/lucene/core/src/java/org/apache/lucene/util/IOUtils.java index 80182bf79cd..32c9c74bc8f 100644 --- a/lucene/core/src/java/org/apache/lucene/util/IOUtils.java +++ b/lucene/core/src/java/org/apache/lucene/util/IOUtils.java @@ -656,4 +656,14 @@ public final class IOUtils { */ void accept(T input) throws IOException; } + + /** + * A Function that may throw an IOException + * @see java.util.function.Function + */ + @FunctionalInterface + public interface IOFunction { + R apply(T t) throws IOException; + } + } diff --git a/lucene/core/src/test/org/apache/lucene/index/TestDemoParallelLeafReader.java b/lucene/core/src/test/org/apache/lucene/index/TestDemoParallelLeafReader.java index 7fdad3ba2de..a222bb756e9 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestDemoParallelLeafReader.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestDemoParallelLeafReader.java @@ -530,7 +530,7 @@ public class TestDemoParallelLeafReader extends LuceneTestCase { @Override public CodecReader wrapForMerge(CodecReader reader) throws IOException { - LeafReader wrapped = getCurrentReader((SegmentReader)reader, schemaGen); + LeafReader wrapped = getCurrentReader(reader, schemaGen); if (wrapped instanceof ParallelLeafReader) { parallelReaders.add((ParallelLeafReader) wrapped); } @@ -538,7 +538,8 @@ public class TestDemoParallelLeafReader extends LuceneTestCase { } @Override - public void mergeFinished(boolean success) throws IOException { + public void mergeFinished(boolean success, boolean segmentDropped) throws IOException { + super.mergeFinished(success, segmentDropped); Throwable th = null; for (ParallelLeafReader r : parallelReaders) { try { diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java index f8dcae7995c..2c9bc23dca3 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java @@ -344,7 +344,7 @@ public class TestIndexWriter extends LuceneTestCase { // Make sure it's OK to change RAM buffer size and // maxBufferedDocs in a write session public void testChangingRAMBuffer() throws IOException { - Directory dir = newDirectory(); + Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig(new MockAnalyzer(random()))); writer.getConfig().setMaxBufferedDocs(10); writer.getConfig().setRAMBufferSizeMB(IndexWriterConfig.DISABLE_AUTO_FLUSH); @@ -607,7 +607,7 @@ public class TestIndexWriter extends LuceneTestCase { doc.add(newField("content4", contents, customType)); type = customType; } else - type = TextField.TYPE_NOT_STORED; + type = TextField.TYPE_NOT_STORED; doc.add(newTextField("content1", contents, Field.Store.NO)); doc.add(newField("content3", "", customType)); doc.add(newField("content5", "", type)); @@ -663,13 +663,13 @@ public class TestIndexWriter extends LuceneTestCase { writer.close(); dir.close(); } - + public void testEmptyFieldNameTerms() throws IOException { Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig(new MockAnalyzer(random()))); Document doc = new Document(); doc.add(newTextField("", "a b c", Field.Store.NO)); - writer.addDocument(doc); + writer.addDocument(doc); writer.close(); DirectoryReader reader = DirectoryReader.open(dir); LeafReader subreader = getOnlyLeafReader(reader); @@ -681,7 +681,7 @@ public class TestIndexWriter extends LuceneTestCase { reader.close(); dir.close(); } - + public void testEmptyFieldNameWithEmptyTerm() throws IOException { Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig(new MockAnalyzer(random()))); @@ -690,7 +690,7 @@ public class TestIndexWriter extends LuceneTestCase { doc.add(newStringField("", "a", Field.Store.NO)); doc.add(newStringField("", "b", Field.Store.NO)); doc.add(newStringField("", "c", Field.Store.NO)); - writer.addDocument(doc); + writer.addDocument(doc); writer.close(); DirectoryReader reader = DirectoryReader.open(dir); LeafReader subreader = getOnlyLeafReader(reader); @@ -834,7 +834,7 @@ public class TestIndexWriter extends LuceneTestCase { customType.setStoreTermVectors(true); customType.setStoreTermVectorPositions(true); customType.setStoreTermVectorOffsets(true); - + doc.add(newField("content", "aaa bbb ccc ddd eee fff ggg hhh iii", customType)); writer.addDocument(doc); writer.addDocument(doc); @@ -922,7 +922,7 @@ public class TestIndexWriter extends LuceneTestCase { // open/close slowly sometimes dir.setUseSlowOpenClosers(true); - + // throttle a little dir.setThrottling(MockDirectoryWrapper.Throttling.SOMETIMES); @@ -1148,7 +1148,7 @@ public class TestIndexWriter extends LuceneTestCase { FieldType customType = new FieldType(StoredField.TYPE); customType.setTokenized(true); - + Field f = new Field("binary", b, 10, 17, customType); // TODO: this is evil, changing the type after creating the field: customType.setIndexOptions(IndexOptions.DOCS); @@ -1157,7 +1157,7 @@ public class TestIndexWriter extends LuceneTestCase { f.setTokenStream(doc1field1); FieldType customType2 = new FieldType(TextField.TYPE_STORED); - + Field f2 = newField("string", "value", customType2); final MockTokenizer doc1field2 = new MockTokenizer(MockTokenizer.WHITESPACE, false); doc1field2.setReader(new StringReader("doc1field2")); @@ -1233,7 +1233,7 @@ public class TestIndexWriter extends LuceneTestCase { public void testDeleteUnusedFiles() throws Exception { assumeFalse("test relies on exact filenames", Codec.getDefault() instanceof SimpleTextCodec); assumeWorkingMMapOnWindows(); - + for(int iter=0;iter<2;iter++) { // relies on windows semantics Path path = createTempDir(); @@ -1250,7 +1250,7 @@ public class TestIndexWriter extends LuceneTestCase { } MergePolicy mergePolicy = newLogMergePolicy(true); - + // This test expects all of its segments to be in CFS mergePolicy.setNoCFSRatio(1.0); mergePolicy.setMaxCFSSegmentSizeMB(Double.POSITIVE_INFINITY); @@ -1338,7 +1338,7 @@ public class TestIndexWriter extends LuceneTestCase { customType.setStoreTermVectors(true); customType.setStoreTermVectorPositions(true); customType.setStoreTermVectorOffsets(true); - + doc.add(newField("c", "val", customType)); writer.addDocument(doc); writer.commit(); @@ -1379,7 +1379,7 @@ public class TestIndexWriter extends LuceneTestCase { // indexed, flushed (but not committed) and then IW rolls back, then no // files are left in the Directory. Directory dir = newDirectory(); - + String[] origFiles = dir.listAll(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig(new MockAnalyzer(random())) .setMaxBufferedDocs(2) @@ -1409,8 +1409,8 @@ public class TestIndexWriter extends LuceneTestCase { // Adding just one document does not call flush yet. int computedExtraFileCount = 0; for (String file : dir.listAll()) { - if (IndexWriter.WRITE_LOCK_NAME.equals(file) || - file.startsWith(IndexFileNames.SEGMENTS) || + if (IndexWriter.WRITE_LOCK_NAME.equals(file) || + file.startsWith(IndexFileNames.SEGMENTS) || IndexFileNames.CODEC_FILE_PATTERN.matcher(file).matches()) { if (file.lastIndexOf('.') < 0 // don't count stored fields and term vectors in, or any temporary files they might @@ -1458,7 +1458,7 @@ public class TestIndexWriter extends LuceneTestCase { FieldType customType3 = new FieldType(TextField.TYPE_STORED); customType3.setTokenized(false); customType3.setOmitNorms(true); - + for (int i=0; i<2; i++) { Document doc = new Document(); doc.add(new Field("id", Integer.toString(i)+BIG, customType3)); @@ -1478,7 +1478,7 @@ public class TestIndexWriter extends LuceneTestCase { SegmentReader sr = (SegmentReader) ctx.reader(); assertFalse(sr.getFieldInfos().hasVectors()); } - + r0.close(); dir.close(); } @@ -1501,7 +1501,7 @@ public class TestIndexWriter extends LuceneTestCase { @Override public final boolean incrementToken() { - clearAttributes(); + clearAttributes(); if (upto < tokens.length) { termAtt.setEmpty(); termAtt.append(tokens[upto]); @@ -1724,7 +1724,7 @@ public class TestIndexWriter extends LuceneTestCase { r.close(); dir.close(); } - + public void testDontInvokeAnalyzerForUnAnalyzedFields() throws Exception { Analyzer analyzer = new Analyzer() { @Override @@ -1759,13 +1759,13 @@ public class TestIndexWriter extends LuceneTestCase { w.close(); dir.close(); } - + //LUCENE-1468 -- make sure opening an IndexWriter with // create=true does not remove non-index files - + public void testOtherFiles() throws Throwable { Directory dir = newDirectory(); - IndexWriter iw = new IndexWriter(dir, + IndexWriter iw = new IndexWriter(dir, newIndexWriterConfig(new MockAnalyzer(random()))); iw.addDocument(new Document()); iw.close(); @@ -1774,15 +1774,15 @@ public class TestIndexWriter extends LuceneTestCase { IndexOutput out = dir.createOutput("myrandomfile", newIOContext(random())); out.writeByte((byte) 42); out.close(); - + new IndexWriter(dir, newIndexWriterConfig(new MockAnalyzer(random()))).close(); - + assertTrue(slowFileExists(dir, "myrandomfile")); } finally { dir.close(); } } - + // LUCENE-3849 public void testStopwordsPosIncHole() throws Exception { Directory dir = newDirectory(); @@ -1811,7 +1811,7 @@ public class TestIndexWriter extends LuceneTestCase { ir.close(); dir.close(); } - + // LUCENE-3849 public void testStopwordsPosIncHole2() throws Exception { // use two stopfilters for testing here @@ -1843,23 +1843,23 @@ public class TestIndexWriter extends LuceneTestCase { ir.close(); dir.close(); } - + // LUCENE-4575 public void testCommitWithUserDataOnly() throws Exception { Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig(null)); writer.commit(); // first commit to complete IW create transaction. - + // this should store the commit data, even though no other changes were made writer.setLiveCommitData(new HashMap() {{ put("key", "value"); }}.entrySet()); writer.commit(); - + DirectoryReader r = DirectoryReader.open(dir); assertEquals("value", r.getIndexCommit().getUserData().get("key")); r.close(); - + // now check setCommitData and prepareCommit/commit sequence writer.setLiveCommitData(new HashMap() {{ put("key", "value1"); @@ -1873,7 +1873,7 @@ public class TestIndexWriter extends LuceneTestCase { r = DirectoryReader.open(dir); assertEquals("value1", r.getIndexCommit().getUserData().get("key")); r.close(); - + // now should commit the second commitData - there was a bug where // IndexWriter.finishCommit overrode the second commitData writer.commit(); @@ -1881,7 +1881,7 @@ public class TestIndexWriter extends LuceneTestCase { assertEquals("IndexWriter.finishCommit may have overridden the second commitData", "value2", r.getIndexCommit().getUserData().get("key")); r.close(); - + writer.close(); dir.close(); } @@ -1896,7 +1896,7 @@ public class TestIndexWriter extends LuceneTestCase { } return data; } - + @Test public void testGetCommitData() throws Exception { Directory dir = newDirectory(); @@ -1906,16 +1906,16 @@ public class TestIndexWriter extends LuceneTestCase { }}.entrySet()); assertEquals("value", getLiveCommitData(writer).get("key")); writer.close(); - + // validate that it's also visible when opening a new IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig(null) .setOpenMode(OpenMode.APPEND)); assertEquals("value", getLiveCommitData(writer).get("key")); writer.close(); - + dir.close(); } - + public void testNullAnalyzer() throws IOException { Directory dir = newDirectory(); IndexWriterConfig iwConf = newIndexWriterConfig(null); @@ -1942,7 +1942,7 @@ public class TestIndexWriter extends LuceneTestCase { iw.close(); dir.close(); } - + public void testNullDocument() throws IOException { Directory dir = newDirectory(); RandomIndexWriter iw = new RandomIndexWriter(random(), dir); @@ -1967,7 +1967,7 @@ public class TestIndexWriter extends LuceneTestCase { iw.close(); dir.close(); } - + public void testNullDocuments() throws IOException { Directory dir = newDirectory(); RandomIndexWriter iw = new RandomIndexWriter(random(), dir); @@ -1992,7 +1992,7 @@ public class TestIndexWriter extends LuceneTestCase { iw.close(); dir.close(); } - + public void testIterableFieldThrowsException() throws IOException { Directory dir = newDirectory(); IndexWriter w = new IndexWriter(dir, newIndexWriterConfig(new MockAnalyzer(random()))); @@ -2000,7 +2000,7 @@ public class TestIndexWriter extends LuceneTestCase { int docCount = 0; int docId = 0; Set liveIds = new HashSet<>(); - for (int i = 0; i < iters; i++) { + for (int i = 0; i < iters; i++) { int numDocs = atLeast(4); for (int j = 0; j < numDocs; j++) { String id = Integer.toString(docId++); @@ -2008,7 +2008,7 @@ public class TestIndexWriter extends LuceneTestCase { fields.add(new StringField("id", id, Field.Store.YES)); fields.add(new StringField("foo", TestUtil.randomSimpleString(random()), Field.Store.NO)); docId++; - + boolean success = false; try { w.addDocument(new RandomFailingIterable(fields, random())); @@ -2040,7 +2040,7 @@ public class TestIndexWriter extends LuceneTestCase { w.close(); IOUtils.close(reader, dir); } - + public void testIterableThrowsException() throws IOException { Directory dir = newDirectory(); IndexWriter w = new IndexWriter(dir, newIndexWriterConfig(new MockAnalyzer(random()))); @@ -2088,7 +2088,7 @@ public class TestIndexWriter extends LuceneTestCase { w.close(); IOUtils.close(reader, dir); } - + public void testIterableThrowsException2() throws IOException { Directory dir = newDirectory(); IndexWriter w = new IndexWriter(dir, newIndexWriterConfig(new MockAnalyzer(random()))); @@ -2128,7 +2128,7 @@ public class TestIndexWriter extends LuceneTestCase { this.list = list; this.failOn = random.nextInt(5); } - + @Override public Iterator iterator() { final Iterator docIter = list.iterator(); @@ -2254,7 +2254,7 @@ public class TestIndexWriter extends LuceneTestCase { writer.close(); dir.close(); } - + public void testMergeAllDeleted() throws IOException { Directory dir = newDirectory(); IndexWriterConfig iwc = newIndexWriterConfig(new MockAnalyzer(random())); @@ -2477,12 +2477,12 @@ public class TestIndexWriter extends LuceneTestCase { IndexWriter w = new IndexWriter(d, newIndexWriterConfig(new MockAnalyzer(random()))); w.addDocument(new Document()); w.close(); - + SegmentInfos sis = SegmentInfos.readLatestCommit(d); byte[] id1 = sis.getId(); assertNotNull(id1); assertEquals(StringHelper.ID_LENGTH, id1.length); - + byte[] id2 = sis.info(0).info.getId(); byte[] sciId2 = sis.info(0).getId(); assertNotNull(id2); @@ -2514,7 +2514,7 @@ public class TestIndexWriter extends LuceneTestCase { ids.add(id); } } - + public void testEmptyNorm() throws Exception { Directory d = newDirectory(); IndexWriter w = new IndexWriter(d, newIndexWriterConfig(new MockAnalyzer(random()))); @@ -2579,7 +2579,7 @@ public class TestIndexWriter extends LuceneTestCase { assertEquals(1, r2.getIndexCommit().getGeneration()); assertEquals("segments_1", r2.getIndexCommit().getSegmentsFileName()); r2.close(); - + // make a change and another commit w.addDocument(new Document()); w.commit(); @@ -2866,7 +2866,7 @@ public class TestIndexWriter extends LuceneTestCase { IndexWriterConfig iwc = new IndexWriterConfig(new MockAnalyzer(random())); IndexWriter w = new IndexWriter(dir, iwc); w.close(); - + IndexOutput out = dir.createTempOutput("_0", "bkd", IOContext.DEFAULT); String tempName = out.getName(); out.close(); @@ -3151,7 +3151,7 @@ public class TestIndexWriter extends LuceneTestCase { expectThrows(IllegalArgumentException.class, () -> { writer.softUpdateDocument(null, new Document(), new NumericDocValuesField("soft_delete", 1)); }); - + expectThrows(IllegalArgumentException.class, () -> { writer.softUpdateDocument(new Term("id", "1"), new Document()); }); @@ -4181,24 +4181,63 @@ public class TestIndexWriter extends LuceneTestCase { SetOnce onlyFinishOnce = new SetOnce<>(); return new MergePolicy.OneMerge(merge.segments) { @Override - public void mergeFinished(boolean success) { + public void mergeFinished(boolean success, boolean segmentDropped) throws IOException { + super.mergeFinished(success, segmentDropped); onlyFinishOnce.set(true); } }; })))) { - Document doc = new Document(); - doc.add(new StringField("id", "1", Field.Store.NO)); - writer.addDocument(doc); - writer.flush(); - writer.addDocument(doc); - writer.flush(); - writer.deleteDocuments(new Term("id", "1")); - writer.flush(); - assertEquals(2, writer.getSegmentCount()); - assertEquals(0, writer.getDocStats().numDocs); - assertEquals(2, writer.getDocStats().maxDoc); - writer.forceMerge(1); + Document doc = new Document(); + doc.add(new StringField("id", "1", Field.Store.NO)); + writer.addDocument(doc); + writer.flush(); + writer.addDocument(doc); + writer.flush(); + writer.deleteDocuments(new Term("id", "1")); + writer.flush(); + assertEquals(2, writer.getSegmentCount()); + assertEquals(0, writer.getDocStats().numDocs); + assertEquals(2, writer.getDocStats().maxDoc); + writer.forceMerge(1); } } } + + public void testMergeOnCommitKeepFullyDeletedSegments() throws Exception { + Directory dir = newDirectory(); + IndexWriterConfig iwc = newIndexWriterConfig(); + iwc.setMaxCommitMergeWaitMillis(30 * 1000); + iwc.mergePolicy = new FilterMergePolicy(newMergePolicy()) { + @Override + public boolean keepFullyDeletedSegment(IOSupplier readerIOSupplier) { + return true; + } + + @Override + public MergeSpecification findFullFlushMerges(MergeTrigger mergeTrigger, + SegmentInfos segmentInfos, + MergeContext mergeContext) { + List fullyDeletedSegments = segmentInfos.asList().stream() + .filter(s -> s.info.maxDoc() - s.getDelCount() == 0) + .collect(Collectors.toList()); + if (fullyDeletedSegments.isEmpty()) { + return null; + } + MergeSpecification spec = new MergeSpecification(); + spec.add(new OneMerge(fullyDeletedSegments)); + return spec; + } + }; + IndexWriter w = new IndexWriter(dir, iwc); + Document d = new Document(); + d.add(new StringField("id", "1", Field.Store.YES)); + w.addDocument(d); + w.commit(); + w.updateDocument(new Term("id", "1"), d); + w.commit(); + try (DirectoryReader reader = w.getReader()) { + assertEquals(1, reader.numDocs()); + } + IOUtils.close(w, dir); + } } diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterMergePolicy.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterMergePolicy.java index ce591a280c6..12eeb9dc030 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterMergePolicy.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterMergePolicy.java @@ -18,17 +18,47 @@ package org.apache.lucene.index; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.lucene.analysis.MockAnalyzer; import org.apache.lucene.document.Document; import org.apache.lucene.document.Field; +import org.apache.lucene.document.NumericDocValuesField; +import org.apache.lucene.document.StringField; import org.apache.lucene.index.IndexWriterConfig.OpenMode; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.store.Directory; import org.apache.lucene.util.LuceneTestCase; public class TestIndexWriterMergePolicy extends LuceneTestCase { - + + private static final MergePolicy MERGE_ON_COMMIT_POLICY = new LogDocMergePolicy() { + @Override + public MergeSpecification findFullFlushMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext) { + // Optimize down to a single segment on commit + if (mergeTrigger == MergeTrigger.COMMIT && segmentInfos.size() > 1) { + List nonMergingSegments = new ArrayList<>(); + for (SegmentCommitInfo sci : segmentInfos) { + if (mergeContext.getMergingSegments().contains(sci) == false) { + nonMergingSegments.add(sci); + } + } + if (nonMergingSegments.size() > 1) { + MergeSpecification mergeSpecification = new MergeSpecification(); + mergeSpecification.add(new OneMerge(nonMergingSegments)); + return mergeSpecification; + } + } + return null; + } + }; + // Test the normal case public void testNormalCase() throws IOException { Directory dir = newDirectory(); @@ -278,6 +308,50 @@ public class TestIndexWriterMergePolicy extends LuceneTestCase { assertSetters(new LogDocMergePolicy()); } + // Test basic semantics of merge on commit + public void testMergeOnCommit() throws IOException { + Directory dir = newDirectory(); + + IndexWriter firstWriter = new IndexWriter(dir, newIndexWriterConfig(new MockAnalyzer(random())) + .setMergePolicy(NoMergePolicy.INSTANCE)); + for (int i = 0; i < 5; i++) { + TestIndexWriter.addDoc(firstWriter); + firstWriter.flush(); + } + DirectoryReader firstReader = DirectoryReader.open(firstWriter); + assertEquals(5, firstReader.leaves().size()); + firstReader.close(); + firstWriter.close(); // When this writer closes, it does not merge on commit. + + IndexWriterConfig iwc = newIndexWriterConfig(new MockAnalyzer(random())) + .setMergePolicy(MERGE_ON_COMMIT_POLICY).setMaxCommitMergeWaitMillis(Integer.MAX_VALUE); + + + IndexWriter writerWithMergePolicy = new IndexWriter(dir, iwc); + writerWithMergePolicy.commit(); // No changes. Commit doesn't trigger a merge. + + DirectoryReader unmergedReader = DirectoryReader.open(writerWithMergePolicy); + assertEquals(5, unmergedReader.leaves().size()); + unmergedReader.close(); + + TestIndexWriter.addDoc(writerWithMergePolicy); + writerWithMergePolicy.commit(); // Doc added, do merge on commit. + assertEquals(1, writerWithMergePolicy.getSegmentCount()); // + + DirectoryReader mergedReader = DirectoryReader.open(writerWithMergePolicy); + assertEquals(1, mergedReader.leaves().size()); + mergedReader.close(); + + try (IndexReader reader = writerWithMergePolicy.getReader()) { + IndexSearcher searcher = new IndexSearcher(reader); + assertEquals(6, reader.numDocs()); + assertEquals(6, searcher.count(new MatchAllDocsQuery())); + } + + writerWithMergePolicy.close(); + dir.close(); + } + private void assertSetters(MergePolicy lmp) { lmp.setMaxCFSSegmentSizeMB(2.0); assertEquals(2.0, lmp.getMaxCFSSegmentSizeMB(), EPSILON); @@ -294,4 +368,168 @@ public class TestIndexWriterMergePolicy extends LuceneTestCase { // TODO: Add more checks for other non-double setters! } + + public void testCarryOverNewDeletes() throws IOException, InterruptedException { + try (Directory directory = newDirectory()) { + boolean useSoftDeletes = random().nextBoolean(); + CountDownLatch waitForMerge = new CountDownLatch(1); + CountDownLatch waitForUpdate = new CountDownLatch(1); + try (IndexWriter writer = new IndexWriter(directory, newIndexWriterConfig() + .setMergePolicy(MERGE_ON_COMMIT_POLICY).setMaxCommitMergeWaitMillis(30 * 1000) + .setSoftDeletesField("soft_delete") + .setMergeScheduler(new ConcurrentMergeScheduler())) { + @Override + protected void merge(MergePolicy.OneMerge merge) throws IOException { + waitForMerge.countDown(); + try { + waitForUpdate.await(); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + super.merge(merge); + } + }) { + + Document d1 = new Document(); + d1.add(new StringField("id", "1", Field.Store.NO)); + Document d2 = new Document(); + d2.add(new StringField("id", "2", Field.Store.NO)); + Document d3 = new Document(); + d3.add(new StringField("id", "3", Field.Store.NO)); + writer.addDocument(d1); + writer.flush(); + writer.addDocument(d2); + boolean addThreeDocs = random().nextBoolean(); + int expectedNumDocs = 2; + if (addThreeDocs) { // sometimes add another doc to ensure we don't have a fully deleted segment + expectedNumDocs = 3; + writer.addDocument(d3); + } + Thread t = new Thread(() -> { + try { + waitForMerge.await(); + if (useSoftDeletes) { + writer.softUpdateDocument(new Term("id", "2"), d2, new NumericDocValuesField("soft_delete", 1)); + } else { + writer.updateDocument(new Term("id", "2"), d2); + } + writer.flush(); + } catch (Exception e) { + throw new AssertionError(e); + } finally { + waitForUpdate.countDown(); + } + + }); + t.start(); + writer.commit(); + t.join(); + try (DirectoryReader open = new SoftDeletesDirectoryReaderWrapper(DirectoryReader.open(directory), "soft_delete")) { + assertEquals(expectedNumDocs, open.numDocs()); + assertEquals("we should not have any deletes", expectedNumDocs, open.maxDoc()); + } + + try (DirectoryReader open = DirectoryReader.open(writer)) { + assertEquals(expectedNumDocs, open.numDocs()); + assertEquals("we should not have one delete", expectedNumDocs+1, open.maxDoc()); + } + } + } + } + + /** + * This test makes sure we release the merge readers on abort. MDW will fail if it + * can't close all files + */ + public void testAbortCommitMerge() throws IOException, InterruptedException { + try (Directory directory = newDirectory()) { + CountDownLatch waitForMerge = new CountDownLatch(1); + CountDownLatch waitForDeleteAll = new CountDownLatch(1); + try (IndexWriter writer = new IndexWriter(directory, newIndexWriterConfig() + .setMergePolicy(MERGE_ON_COMMIT_POLICY).setMaxCommitMergeWaitMillis(30 * 1000) + .setMergeScheduler(new SerialMergeScheduler() { + @Override + public synchronized void merge(MergeSource mergeSource, MergeTrigger trigger) throws IOException { + waitForMerge.countDown(); + try { + waitForDeleteAll.await(); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + super.merge(mergeSource, trigger); + } + }))) { + + Document d1 = new Document(); + d1.add(new StringField("id", "1", Field.Store.NO)); + Document d2 = new Document(); + d2.add(new StringField("id", "2", Field.Store.NO)); + Document d3 = new Document(); + d3.add(new StringField("id", "3", Field.Store.NO)); + writer.addDocument(d1); + writer.flush(); + writer.addDocument(d2); + Thread t = new Thread(() -> { + try { + writer.commit(); + } catch (IOException e) { + throw new AssertionError(e); + } + }); + t.start(); + waitForMerge.await(); + writer.deleteAll(); + waitForDeleteAll.countDown(); + t.join(); + } + } + } + + public void testStressUpdateSameDocumentWithMergeOnCommit() throws IOException, InterruptedException { + try (Directory directory = newDirectory()) { + try (RandomIndexWriter writer = new RandomIndexWriter(random(), directory, newIndexWriterConfig() + .setMergePolicy(MERGE_ON_COMMIT_POLICY).setMaxCommitMergeWaitMillis(10 + random().nextInt(2000)) + .setSoftDeletesField("soft_delete") + .setMergeScheduler(new ConcurrentMergeScheduler()))) { + Document d1 = new Document(); + d1.add(new StringField("id", "1", Field.Store.NO)); + writer.updateDocument(new Term("id", "1"), d1); + writer.commit(); + + AtomicInteger iters = new AtomicInteger(100 + random().nextInt(TEST_NIGHTLY ? 5000 : 1000)); + AtomicBoolean done = new AtomicBoolean(false); + Thread[] threads = new Thread[1 + random().nextInt(4)]; + for (int i = 0; i < threads.length; i++) { + Thread t = new Thread(() -> { + try { + while (iters.decrementAndGet() > 0) { + writer.updateDocument(new Term("id", "1"), d1); + } + } catch (Exception e) { + throw new AssertionError(e); + } finally { + done.set(true); + } + + }); + t.start(); + threads[i] = t; + } + try { + while (done.get() == false) { + if (random().nextBoolean()) { + writer.commit(); + } + try (DirectoryReader open = new SoftDeletesDirectoryReaderWrapper(DirectoryReader.open(directory), "___soft_deletes")) { + assertEquals(1, open.numDocs()); + } + } + } finally { + for (Thread t : threads) { + t.join(); + } + } + } + } + } } diff --git a/lucene/core/src/test/org/apache/lucene/index/TestMergePolicy.java b/lucene/core/src/test/org/apache/lucene/index/TestMergePolicy.java index c252da019d6..93a9b17e0bd 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestMergePolicy.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestMergePolicy.java @@ -43,7 +43,7 @@ public class TestMergePolicy extends LuceneTestCase { Thread t = new Thread(() -> { try { for (MergePolicy.OneMerge m : ms.merges) { - m.mergeFinished(true); + m.close(true, false, mr -> {}); } } catch (IOException e) { throw new AssertionError(e); @@ -66,7 +66,7 @@ public class TestMergePolicy extends LuceneTestCase { } Thread t = new Thread(() -> { try { - ms.merges.get(0).mergeFinished(true); + ms.merges.get(0).close(true, false, mr -> {}); } catch (IOException e) { throw new AssertionError(e); } @@ -89,7 +89,7 @@ public class TestMergePolicy extends LuceneTestCase { Thread t = new Thread(() -> { while (stop.get() == false) { try { - ms.merges.get(i.getAndIncrement()).mergeFinished(true); + ms.merges.get(i.getAndIncrement()).close(true, false, mr -> {}); Thread.sleep(1); } catch (IOException | InterruptedException e) { throw new AssertionError(e); @@ -114,8 +114,8 @@ public class TestMergePolicy extends LuceneTestCase { try (Directory dir = newDirectory()) { MergePolicy.MergeSpecification spec = createRandomMergeSpecification(dir, 1); MergePolicy.OneMerge oneMerge = spec.merges.get(0); - oneMerge.mergeFinished(true); - expectThrows(IllegalStateException.class, () -> oneMerge.mergeFinished(false)); + oneMerge.close(true, false, mr -> {}); + expectThrows(IllegalStateException.class, () -> oneMerge.close(false, false, mr -> {})); } } diff --git a/lucene/sandbox/src/test/org/apache/lucene/search/TestPhraseWildcardQuery.java b/lucene/sandbox/src/test/org/apache/lucene/search/TestPhraseWildcardQuery.java index c8d9d51fc4e..f3cc088f095 100644 --- a/lucene/sandbox/src/test/org/apache/lucene/search/TestPhraseWildcardQuery.java +++ b/lucene/sandbox/src/test/org/apache/lucene/search/TestPhraseWildcardQuery.java @@ -27,6 +27,7 @@ import org.apache.lucene.document.Document; import org.apache.lucene.document.Field; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.NoMergePolicy; import org.apache.lucene.index.RandomIndexWriter; import org.apache.lucene.index.Term; import org.apache.lucene.index.Terms; @@ -66,12 +67,16 @@ public class TestPhraseWildcardQuery extends LuceneTestCase { public void setUp() throws Exception { super.setUp(); directory = newDirectory(); - RandomIndexWriter iw = new RandomIndexWriter(random(), directory); + RandomIndexWriter iw = new RandomIndexWriter(random(), directory, + newIndexWriterConfig().setMergePolicy(NoMergePolicy.INSTANCE)); // do not accidentally merge + // the two segments we create + // here iw.setDoRandomForceMerge(false); // Keep the segments separated. addSegments(iw); reader = iw.getReader(); iw.close(); searcher = newSearcher(reader); + assertEquals("test test relies on 2 segments", 2, searcher.getIndexReader().leaves().size()); } @Override diff --git a/lucene/test-framework/src/java/org/apache/lucene/index/MockRandomMergePolicy.java b/lucene/test-framework/src/java/org/apache/lucene/index/MockRandomMergePolicy.java index beb4dad0357..92ffc732a29 100644 --- a/lucene/test-framework/src/java/org/apache/lucene/index/MockRandomMergePolicy.java +++ b/lucene/test-framework/src/java/org/apache/lucene/index/MockRandomMergePolicy.java @@ -128,6 +128,38 @@ public class MockRandomMergePolicy extends MergePolicy { return findMerges(null, segmentInfos, mergeContext); } + @Override + public MergeSpecification findFullFlushMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext) throws IOException { + MergeSpecification mergeSpecification = findMerges(null, segmentInfos, mergeContext); + if (mergeSpecification == null) { + return null; + } + // Do not return any merges involving already-merging segments. + MergeSpecification filteredMergeSpecification = new MergeSpecification(); + for (OneMerge oneMerge : mergeSpecification.merges) { + boolean filtered = false; + List nonMergingSegments = new ArrayList<>(); + for (SegmentCommitInfo sci : oneMerge.segments) { + if (mergeContext.getMergingSegments().contains(sci) == false) { + nonMergingSegments.add(sci); + } else { + filtered = true; + } + } + if (filtered == true) { + if (nonMergingSegments.size() > 0) { + filteredMergeSpecification.add(new OneMerge(nonMergingSegments)); + } + } else { + filteredMergeSpecification.add(oneMerge); + } + } + if (filteredMergeSpecification.merges.size() > 0) { + return filteredMergeSpecification; + } + return null; + } + @Override public boolean useCompoundFile(SegmentInfos infos, SegmentCommitInfo mergedInfo, MergeContext mergeContext) throws IOException { // 80% of the time we create CFS: diff --git a/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java b/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java index 9f2cd27c8c7..2adf31802a6 100644 --- a/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java +++ b/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java @@ -1003,6 +1003,7 @@ public abstract class LuceneTestCase extends Assert { if (rarely(r)) { c.setCheckPendingFlushUpdate(false); } + c.setMaxCommitMergeWaitMillis(rarely() ? atLeast(r, 1000) : atLeast(r, 200)); return c; }