diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt index 40409457994..b6ee4b8a3a8 100644 --- a/lucene/CHANGES.txt +++ b/lucene/CHANGES.txt @@ -117,6 +117,12 @@ API Changes instead of once all shard responses are present. (Simon Willnauer, Mike McCandless) +* LUCENE-7700: A cleanup of merge throughput control logic. Refactored all the + code previously scattered throughout the IndexWriter and + ConcurrentMergeScheduler into a more accessible set of public methods (see + MergePolicy.OneMergeProgress, MergeScheduler.wrapForMerge and + OneMerge.mergeInit). (Dawid Weiss, Mike McCandless). + * LUCENE-7734: FieldType's copy constructor was widened to accept any IndexableFieldType. (David Smiley) diff --git a/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java b/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java index 0dd0a4d8514..6e930c48a20 100644 --- a/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java +++ b/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java @@ -25,6 +25,11 @@ import java.util.Locale; import org.apache.lucene.index.MergePolicy.OneMerge; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FilterDirectory; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.store.RateLimitedIndexOutput; +import org.apache.lucene.store.RateLimiter; import org.apache.lucene.util.CollectionUtil; import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.ThreadInterruptedException; @@ -255,6 +260,36 @@ public class ConcurrentMergeScheduler extends MergeScheduler { assert false: "merge thread " + currentThread + " was not found"; } + @Override + public Directory wrapForMerge(OneMerge merge, Directory in) { + Thread mergeThread = Thread.currentThread(); + if (!MergeThread.class.isInstance(mergeThread)) { + throw new AssertionError("wrapForMerge should be called from MergeThread. Current thread: " + + mergeThread); + } + + // Return a wrapped Directory which has rate-limited output. + RateLimiter rateLimiter = ((MergeThread) mergeThread).rateLimiter; + return new FilterDirectory(in) { + @Override + public IndexOutput createOutput(String name, IOContext context) throws IOException { + ensureOpen(); + + // This Directory is only supposed to be used during merging, + // so all writes should have MERGE context, else there is a bug + // somewhere that is failing to pass down the right IOContext: + assert context.context == IOContext.Context.MERGE: "got context=" + context.context; + + // Because rateLimiter is bound to a particular merge thread, this method should + // always be called from that context. Verify this. + assert mergeThread == Thread.currentThread() : "Not the same merge thread, current=" + + Thread.currentThread() + ", expected=" + mergeThread; + + return new RateLimitedIndexOutput(rateLimiter, in.createOutput(name, context)); + } + }; + } + /** * Called whenever the running merges have changed, to set merge IO limits. * This method sorts the merge threads by their merge size in @@ -327,8 +362,9 @@ public class ConcurrentMergeScheduler extends MergeScheduler { newMBPerSec = targetMBPerSec; } - double curMBPerSec = merge.rateLimiter.getMBPerSec(); - + MergeRateLimiter rateLimiter = mergeThread.rateLimiter; + double curMBPerSec = rateLimiter.getMBPerSec(); + if (verbose()) { long mergeStartNS = merge.mergeStartNS; if (mergeStartNS == -1) { @@ -339,11 +375,11 @@ public class ConcurrentMergeScheduler extends MergeScheduler { message.append(String.format(Locale.ROOT, "merge thread %s estSize=%.1f MB (written=%.1f MB) runTime=%.1fs (stopped=%.1fs, paused=%.1fs) rate=%s\n", mergeThread.getName(), bytesToMB(merge.estimatedMergeBytes), - bytesToMB(merge.rateLimiter.totalBytesWritten), + bytesToMB(rateLimiter.getTotalBytesWritten()), nsToSec(now - mergeStartNS), - nsToSec(merge.rateLimiter.getTotalStoppedNS()), - nsToSec(merge.rateLimiter.getTotalPausedNS()), - rateToString(merge.rateLimiter.getMBPerSec()))); + nsToSec(rateLimiter.getTotalStoppedNS()), + nsToSec(rateLimiter.getTotalPausedNS()), + rateToString(rateLimiter.getMBPerSec()))); if (newMBPerSec != curMBPerSec) { if (newMBPerSec == 0.0) { @@ -364,7 +400,7 @@ public class ConcurrentMergeScheduler extends MergeScheduler { } } - merge.rateLimiter.setMBPerSec(newMBPerSec); + rateLimiter.setMBPerSec(newMBPerSec); } if (verbose()) { message(message.toString()); @@ -449,7 +485,7 @@ public class ConcurrentMergeScheduler extends MergeScheduler { Thread currentThread = Thread.currentThread(); int count = 0; for (MergeThread mergeThread : mergeThreads) { - if (currentThread != mergeThread && mergeThread.isAlive() && mergeThread.merge.rateLimiter.getAbort() == false) { + if (currentThread != mergeThread && mergeThread.isAlive() && mergeThread.merge.isAborted() == false) { count++; } } @@ -497,8 +533,6 @@ public class ConcurrentMergeScheduler extends MergeScheduler { return; } - updateIOThrottle(merge); - boolean success = false; try { if (verbose()) { @@ -507,14 +541,16 @@ public class ConcurrentMergeScheduler extends MergeScheduler { // OK to spawn a new merge thread to handle this // merge: - final MergeThread merger = getMergeThread(writer, merge); - mergeThreads.add(merger); + final MergeThread newMergeThread = getMergeThread(writer, merge); + mergeThreads.add(newMergeThread); + + updateIOThrottle(newMergeThread.merge, newMergeThread.rateLimiter); if (verbose()) { - message(" launch new thread [" + merger.getName() + "]"); + message(" launch new thread [" + newMergeThread.getName() + "]"); } - merger.start(); + newMergeThread.start(); updateMergeThreads(); success = true; @@ -598,16 +634,17 @@ public class ConcurrentMergeScheduler extends MergeScheduler { /** Runs a merge thread to execute a single merge, then exits. */ protected class MergeThread extends Thread implements Comparable { - final IndexWriter writer; final OneMerge merge; + final MergeRateLimiter rateLimiter; /** Sole constructor. */ public MergeThread(IndexWriter writer, OneMerge merge) { this.writer = writer; this.merge = merge; + this.rateLimiter = new MergeRateLimiter(merge.getMergeProgress()); } - + @Override public int compareTo(MergeThread other) { // Larger merges sort first: @@ -616,9 +653,7 @@ public class ConcurrentMergeScheduler extends MergeScheduler { @Override public void run() { - try { - if (verbose()) { message(" merge thread: start"); } @@ -715,7 +750,7 @@ public class ConcurrentMergeScheduler extends MergeScheduler { } /** Tunes IO throttle when a new merge starts. */ - private synchronized void updateIOThrottle(OneMerge newMerge) throws IOException { + private synchronized void updateIOThrottle(OneMerge newMerge, MergeRateLimiter rateLimiter) throws IOException { if (doAutoIOThrottle == false) { return; } @@ -794,7 +829,7 @@ public class ConcurrentMergeScheduler extends MergeScheduler { } else { rate = targetMBPerSec; } - newMerge.rateLimiter.setMBPerSec(rate); + rateLimiter.setMBPerSec(rate); targetMBPerSecChanged(); } diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java index da030caa0e6..aa28d9992a0 100644 --- a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java +++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java @@ -36,6 +36,7 @@ import java.util.Queue; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.codecs.Codec; @@ -51,22 +52,18 @@ import org.apache.lucene.search.Sort; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.Directory; import org.apache.lucene.store.FSDirectory; -import org.apache.lucene.store.FilterDirectory; import org.apache.lucene.store.FlushInfo; import org.apache.lucene.store.IOContext; -import org.apache.lucene.store.IndexOutput; import org.apache.lucene.store.Lock; import org.apache.lucene.store.LockObtainFailedException; import org.apache.lucene.store.LockValidatingDirectoryWrapper; import org.apache.lucene.store.MMapDirectory; import org.apache.lucene.store.MergeInfo; -import org.apache.lucene.store.RateLimitedIndexOutput; import org.apache.lucene.store.TrackingDirectoryWrapper; import org.apache.lucene.util.Accountable; import org.apache.lucene.util.ArrayUtil; import org.apache.lucene.util.Bits; import org.apache.lucene.util.BytesRef; -import org.apache.lucene.util.CloseableThreadLocal; import org.apache.lucene.util.Constants; import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.InfoStream; @@ -277,7 +274,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { private final Directory directoryOrig; // original user directory private final Directory directory; // wrapped with additional checks - private final Directory mergeDirectory; // wrapped with throttling: used for merging private final Analyzer analyzer; // how to analyze text private final AtomicLong changeCount = new AtomicLong(); // increments every time a change is completed @@ -353,8 +349,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { * card to make sure they can later charge you when you check out. */ final AtomicLong pendingNumDocs = new AtomicLong(); - final CloseableThreadLocal rateLimiters = new CloseableThreadLocal<>(); - DirectoryReader getReader() throws IOException { return getReader(true, false); } @@ -809,10 +803,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { directoryOrig = d; directory = new LockValidatingDirectoryWrapper(d, writeLock); - // Directory we use for merging, so we can abort running merges, and so - // merge schedulers can optionally rate-limit per-merge IO: - mergeDirectory = addMergeRateLimiters(directory); - analyzer = config.getAnalyzer(); mergeScheduler = config.getMergeScheduler(); mergeScheduler.setInfoStream(infoStream); @@ -2212,8 +2202,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { try { abortMerges(); - rateLimiters.close(); - if (infoStream.isEnabled("IW")) { infoStream.message("IW", "rollback: done finish merges"); } @@ -2418,7 +2406,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { if (infoStream.isEnabled("IW")) { infoStream.message("IW", "now abort pending merge " + segString(merge.segments)); } - merge.rateLimiter.setAbort(); + merge.setAborted(); mergeFinish(merge); } pendingMerges.clear(); @@ -2427,7 +2415,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { if (infoStream.isEnabled("IW")) { infoStream.message("IW", "now abort running merge " + segString(merge.segments)); } - merge.rateLimiter.setAbort(); + merge.setAborted(); } // We wait here to make all merges stop. It should not @@ -2775,13 +2763,17 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { * index. * *

- * NOTE: this method merges all given {@link LeafReader}s in one + * NOTE: this merges all given {@link LeafReader}s in one * merge. If you intend to merge a large number of readers, it may be better * to call this method multiple times, each time with a small set of readers. * In principle, if you use a merge policy with a {@code mergeFactor} or * {@code maxMergeAtOnce} parameter, you should pass that many readers in one * call. * + *

+ * NOTE: this method does not call or make use of the {@link MergeScheduler}, + * so any custom bandwidth throttling is at the moment ignored. + * * @return The sequence number * for this operation * @@ -2832,8 +2824,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { SegmentMerger merger = new SegmentMerger(Arrays.asList(readers), info, infoStream, trackingDir, globalFieldNumberMap, context); - - rateLimiters.set(new MergeRateLimiter(null)); if (!merger.shouldMerge()) { return docWriter.deleteQueue.getNextSequenceNumber(); @@ -2864,7 +2854,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { // Now create the compound file if needed if (useCompoundFile) { Collection filesToDelete = infoPerCommit.files(); - TrackingDirectoryWrapper trackingCFSDir = new TrackingDirectoryWrapper(mergeDirectory); + TrackingDirectoryWrapper trackingCFSDir = new TrackingDirectoryWrapper(directory); // TODO: unlike merge, on exception we arent sniping any trash cfs files here? // createCompoundFile tries to cleanup, but it might not always be able to... try { @@ -3745,7 +3735,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { // deleter.refresh() call that will remove any index // file that current segments does not reference), we // abort this merge - if (merge.rateLimiter.getAbort()) { + if (merge.isAborted()) { if (infoStream.isEnabled("IW")) { infoStream.message("IW", "commitMerge: skip: it was aborted"); } @@ -3905,8 +3895,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { boolean success = false; - rateLimiters.set(merge.rateLimiter); - final long t0 = System.currentTimeMillis(); final MergePolicy mergePolicy = config.getMergePolicy(); @@ -3937,7 +3925,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { if (infoStream.isEnabled("IW")) { infoStream.message("IW", "hit exception during merge"); } - } else if (merge.rateLimiter.getAbort() == false && (merge.maxNumSegments != UNBOUNDED_MAX_MERGE_SEGMENTS || (!closed && !closing))) { + } else if (!merge.isAborted() && (merge.maxNumSegments != UNBOUNDED_MAX_MERGE_SEGMENTS || (!closed && !closing))) { // This merge (and, generally, any change to the // segments) may now enable new merges, so we call // merge policy & update pending merges. @@ -3951,7 +3939,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { tragicEvent(t, "merge"); } - if (merge.info != null && merge.rateLimiter.getAbort() == false) { + if (merge.info != null && merge.isAborted() == false) { if (infoStream.isEnabled("IW")) { infoStream.message("IW", "merge time " + (System.currentTimeMillis()-t0) + " msec for " + merge.info.info.maxDoc() + " docs"); } @@ -3976,7 +3964,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { assert merge.segments.size() > 0; if (stopMerges) { - merge.rateLimiter.setAbort(); + merge.setAborted(); throw new MergePolicy.MergeAbortedException("merge is aborted: " + segString(merge.segments)); } @@ -4087,7 +4075,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { return; } - if (merge.rateLimiter.getAbort()) { + merge.mergeInit(); + + if (merge.isAborted()) { return; } @@ -4239,9 +4229,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { * but without holding synchronized lock on IndexWriter * instance */ private int mergeMiddle(MergePolicy.OneMerge merge, MergePolicy mergePolicy) throws IOException { + merge.checkAborted(); - merge.rateLimiter.checkAbort(); - + Directory mergeDirectory = config.getMergeScheduler().wrapForMerge(merge, directory); List sourceSegments = merge.segments; IOContext context = new IOContext(merge.getStoreMergeInfo()); @@ -4339,7 +4329,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { globalFieldNumberMap, context); - merge.rateLimiter.checkAbort(); + merge.checkAborted(); merge.mergeStartNS = System.nanoTime(); @@ -4354,11 +4344,20 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { if (infoStream.isEnabled("IW")) { if (merger.shouldMerge()) { + String pauseInfo = merge.getMergeProgress().getPauseTimes().entrySet() + .stream() + .filter((e) -> e.getValue() > 0) + .map((e) -> String.format(Locale.ROOT, "%.1f sec %s", + e.getValue() / 1000000000., + e.getKey().name().toLowerCase(Locale.ROOT))) + .collect(Collectors.joining(", ")); + if (!pauseInfo.isEmpty()) { + pauseInfo = " (" + pauseInfo + ")"; + } + long t1 = System.nanoTime(); double sec = (t1-merge.mergeStartNS)/1000000000.; double segmentMB = (merge.info.sizeInBytes()/1024./1024.); - double stoppedSec = merge.rateLimiter.getTotalStoppedNS()/1000000000.; - double throttleSec = merge.rateLimiter.getTotalPausedNS()/1000000000.; infoStream.message("IW", "merge codec=" + codec + " maxDoc=" + merge.info.info.maxDoc() + "; merged segment has " + (mergeState.mergeFieldInfos.hasVectors() ? "vectors" : "no vectors") + "; " + (mergeState.mergeFieldInfos.hasNorms() ? "norms" : "no norms") + "; " + @@ -4367,10 +4366,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { (mergeState.mergeFieldInfos.hasProx() ? "freqs" : "no freqs") + "; " + (mergeState.mergeFieldInfos.hasPointValues() ? "points" : "no points") + "; " + String.format(Locale.ROOT, - "%.1f sec (%.1f sec stopped, %.1f sec paused) to merge segment [%.2f MB, %.2f MB/sec]", + "%.1f sec%s to merge segment [%.2f MB, %.2f MB/sec]", sec, - stoppedSec, - throttleSec, + pauseInfo, segmentMB, segmentMB / sec)); } else { @@ -4406,7 +4404,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { success = true; } catch (Throwable t) { synchronized(this) { - if (merge.rateLimiter.getAbort()) { + if (merge.isAborted()) { // This can happen if rollback is called while we were building // our CFS -- fall through to logic below to remove the non-CFS // merged files: @@ -4439,7 +4437,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { // registered with IFD deleteNewFiles(filesToRemove); - if (merge.rateLimiter.getAbort()) { + if (merge.isAborted()) { if (infoStream.isEnabled("IW")) { infoStream.message("IW", "abort merge after building CFS"); } @@ -5063,30 +5061,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { throw new IllegalArgumentException("number of documents in the index cannot exceed " + actualMaxDocs + " (current document count is " + pendingNumDocs.get() + "; added numDocs is " + addedNumDocs + ")"); } - /** Wraps the incoming {@link Directory} so that we assign a per-thread - * {@link MergeRateLimiter} to all created {@link IndexOutput}s. */ - private Directory addMergeRateLimiters(Directory in) { - return new FilterDirectory(in) { - @Override - public IndexOutput createOutput(String name, IOContext context) throws IOException { - ensureOpen(); - - // Paranoia defense: if this trips we have a bug somewhere... - IndexWriter.this.ensureOpen(false); - - // This Directory is only supposed to be used during merging, - // so all writes should have MERGE context, else there is a bug - // somewhere that is failing to pass down the right IOContext: - assert context.context == IOContext.Context.MERGE: "got context=" + context.context; - - MergeRateLimiter rateLimiter = rateLimiters.get(); - assert rateLimiter != null; - - return new RateLimitedIndexOutput(rateLimiter, in.createOutput(name, context)); - } - }; - } - /** Returns the highest sequence number across * all completed operations, or 0 if no operations have finished yet. Still * in-flight operations (in other threads) are not counted until they finish. diff --git a/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java b/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java index dbf37dfb81d..d9a0ab83ee8 100644 --- a/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java +++ b/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java @@ -19,12 +19,19 @@ package org.apache.lucene.index; import java.io.IOException; import java.util.ArrayList; +import java.util.EnumMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.BooleanSupplier; +import java.util.stream.Collectors; import org.apache.lucene.store.Directory; import org.apache.lucene.store.MergeInfo; -import org.apache.lucene.store.RateLimiter; /** *

Expert: a MergePolicy determines the sequence of @@ -55,6 +62,125 @@ import org.apache.lucene.store.RateLimiter; * @lucene.experimental */ public abstract class MergePolicy { + /** + * Progress and state for an executing merge. This class + * encapsulates the logic to pause and resume the merge thread + * or to abort the merge entirely. + * + * @lucene.experimental */ + public static class OneMergeProgress { + /** Reason for pausing the merge thread. */ + public static enum PauseReason { + /** Stopped (because of throughput rate set to 0, typically). */ + STOPPED, + /** Temporarily paused because of exceeded throughput rate. */ + PAUSED, + /** Other reason. */ + OTHER + }; + + private final ReentrantLock pauseLock = new ReentrantLock(); + private final Condition pausing = pauseLock.newCondition(); + + /** + * Pause times (in nanoseconds) for each {@link PauseReason}. + */ + private final EnumMap pauseTimesNS; + + private volatile boolean aborted; + + /** + * This field is for sanity-check purposes only. Only the same thread that invoked + * {@link OneMerge#mergeInit()} is permitted to be calling + * {@link #pauseNanos}. This is always verified at runtime. + */ + private Thread owner; + + /** Creates a new merge progress info. */ + public OneMergeProgress() { + // Place all the pause reasons in there immediately so that we can simply update values. + pauseTimesNS = new EnumMap(PauseReason.class); + for (PauseReason p : PauseReason.values()) { + pauseTimesNS.put(p, new AtomicLong()); + } + } + + /** + * Abort the merge this progress tracks at the next + * possible moment. + */ + public void abort() { + aborted = true; + wakeup(); // wakeup any paused merge thread. + } + + /** + * Return the aborted state of this merge. + */ + public boolean isAborted() { + return aborted; + } + + /** + * Pauses the calling thread for at least pauseNanos nanoseconds + * unless the merge is aborted or the external condition returns false, + * in which case control returns immediately. + * + * The external condition is required so that other threads can terminate the pausing immediately, + * before pauseNanos expires. We can't rely on just {@link Condition#awaitNanos(long)} alone + * because it can return due to spurious wakeups too. + * + * @param condition The pause condition that should return false if immediate return from this + * method is needed. Other threads can wake up any sleeping thread by calling + * {@link #wakeup}, but it'd fall to sleep for the remainder of the requested time if this + * condition + */ + public void pauseNanos(long pauseNanos, PauseReason reason, BooleanSupplier condition) throws InterruptedException { + if (Thread.currentThread() != owner) { + throw new RuntimeException("Only the merge owner thread can call pauseNanos(). This thread: " + + Thread.currentThread().getName() + ", owner thread: " + + owner); + } + + long start = System.nanoTime(); + AtomicLong timeUpdate = pauseTimesNS.get(reason); + pauseLock.lock(); + try { + while (pauseNanos > 0 && !aborted && condition.getAsBoolean()) { + pauseNanos = pausing.awaitNanos(pauseNanos); + } + } finally { + pauseLock.unlock(); + timeUpdate.addAndGet(System.nanoTime() - start); + } + } + + /** + * Request a wakeup for any threads stalled in {@link #pauseNanos}. + */ + public void wakeup() { + pauseLock.lock(); + try { + pausing.signalAll(); + } finally { + pauseLock.unlock(); + } + } + + /** Returns pause reasons and associated times in nanoseconds. */ + public Map getPauseTimes() { + Set> entries = pauseTimesNS.entrySet(); + return entries.stream() + .collect(Collectors.toMap( + (e) -> e.getKey(), + (e) -> e.getValue().get())); + } + + final void setMergeThread(Thread owner) { + assert this.owner == null; + this.owner = owner; + } + } /** OneMerge provides the information necessary to perform * an individual primitive merge operation, resulting in @@ -64,7 +190,6 @@ public abstract class MergePolicy { * * @lucene.experimental */ public static class OneMerge { - SegmentCommitInfo info; // used by IndexWriter boolean registerDone; // used by IndexWriter long mergeGen; // used by IndexWriter @@ -82,8 +207,10 @@ public abstract class MergePolicy { /** Segments to be merged. */ public final List segments; - /** A private {@link RateLimiter} for this merge, used to rate limit writes and abort. */ - public final MergeRateLimiter rateLimiter; + /** + * Control used to pause/stop/resume the merge thread. + */ + private final OneMergeProgress mergeProgress; volatile long mergeStartNS = -1; @@ -106,9 +233,17 @@ public abstract class MergePolicy { } totalMaxDoc = count; - rateLimiter = new MergeRateLimiter(this); + mergeProgress = new OneMergeProgress(); } + /** + * Called by {@link IndexWriter} after the merge started and from the + * thread that will be executing the merge. + */ + public void mergeInit() throws IOException { + mergeProgress.setMergeThread(Thread.currentThread()); + } + /** Called by {@link IndexWriter} after the merge is done and all readers have been closed. */ public void mergeFinished() throws IOException { } @@ -163,7 +298,7 @@ public abstract class MergePolicy { if (maxNumSegments != -1) { b.append(" [maxNumSegments=" + maxNumSegments + "]"); } - if (rateLimiter.getAbort()) { + if (isAborted()) { b.append(" [ABORTED]"); } return b.toString(); @@ -194,7 +329,32 @@ public abstract class MergePolicy { /** Return {@link MergeInfo} describing this merge. */ public MergeInfo getStoreMergeInfo() { return new MergeInfo(totalMaxDoc, estimatedMergeBytes, isExternal, maxNumSegments); - } + } + + /** Returns true if this merge was or should be aborted. */ + public boolean isAborted() { + return mergeProgress.isAborted(); + } + + /** Marks this merge as aborted. The merge thread should terminate at the soonest possible moment. */ + public void setAborted() { + this.mergeProgress.abort(); + } + + /** Checks if merge has been aborted and throws a merge exception if so. */ + public void checkAborted() throws MergeAbortedException { + if (isAborted()) { + throw new MergePolicy.MergeAbortedException("merge is aborted: " + segString()); + } + } + + /** + * Returns a {@link OneMergeProgress} instance for this merge, which provides + * statistics of the merge threads (run time vs. sleep time) if merging is throttled. + */ + public OneMergeProgress getMergeProgress() { + return mergeProgress; + } } /** @@ -222,8 +382,7 @@ public abstract class MergePolicy { merges.add(merge); } - /** Returns a description of the merges in this - * specification. */ + /** Returns a description of the merges in this specification. */ public String segString(Directory dir) { StringBuilder b = new StringBuilder(); b.append("MergeSpec:\n"); @@ -235,8 +394,7 @@ public abstract class MergePolicy { } } - /** Exception thrown if there are any problems while - * executing a merge. */ + /** Exception thrown if there are any problems while executing a merge. */ public static class MergeException extends RuntimeException { private Directory dir; @@ -259,9 +417,9 @@ public abstract class MergePolicy { } } - /** Thrown when a merge was explicity aborted because + /** Thrown when a merge was explicitly aborted because * {@link IndexWriter#abortMerges} was called. Normally - * this exception is privately caught and suppresed by + * this exception is privately caught and suppressed by * {@link IndexWriter}. */ public static class MergeAbortedException extends IOException { /** Create a {@link MergeAbortedException}. */ diff --git a/lucene/core/src/java/org/apache/lucene/index/MergeRateLimiter.java b/lucene/core/src/java/org/apache/lucene/index/MergeRateLimiter.java index d04c2d2299f..e5361d52c5b 100644 --- a/lucene/core/src/java/org/apache/lucene/index/MergeRateLimiter.java +++ b/lucene/core/src/java/org/apache/lucene/index/MergeRateLimiter.java @@ -20,118 +20,107 @@ package org.apache.lucene.index; import org.apache.lucene.store.RateLimiter; import org.apache.lucene.util.ThreadInterruptedException; -import static org.apache.lucene.store.RateLimiter.SimpleRateLimiter; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.lucene.index.MergePolicy.OneMergeProgress; +import org.apache.lucene.index.MergePolicy.OneMergeProgress.PauseReason; /** This is the {@link RateLimiter} that {@link IndexWriter} assigns to each running merge, to * give {@link MergeScheduler}s ionice like control. * - * This is similar to {@link SimpleRateLimiter}, except it's merge-private, - * it will wake up if its rate changes while it's paused, it tracks how - * much time it spent stopped and paused, and it supports aborting. - * * @lucene.internal */ public class MergeRateLimiter extends RateLimiter { private final static int MIN_PAUSE_CHECK_MSEC = 25; - volatile long totalBytesWritten; + + private final static long MIN_PAUSE_NS = TimeUnit.MILLISECONDS.toNanos(2); + private final static long MAX_PAUSE_NS = TimeUnit.MILLISECONDS.toNanos(250); + + private volatile double mbPerSec; + private volatile long minPauseCheckBytes; - double mbPerSec; private long lastNS; - private long minPauseCheckBytes; - private boolean abort; - long totalPausedNS; - long totalStoppedNS; - final MergePolicy.OneMerge merge; - /** Returned by {@link #maybePause}. */ - private static enum PauseResult {NO, STOPPED, PAUSED}; + private AtomicLong totalBytesWritten = new AtomicLong(); + + private final OneMergeProgress mergeProgress; /** Sole constructor. */ - public MergeRateLimiter(MergePolicy.OneMerge merge) { - this.merge = merge; - + public MergeRateLimiter(OneMergeProgress mergeProgress) { // Initially no IO limit; use setter here so minPauseCheckBytes is set: + this.mergeProgress = mergeProgress; setMBPerSec(Double.POSITIVE_INFINITY); } @Override - public synchronized void setMBPerSec(double mbPerSec) { - // 0.0 is allowed: it means the merge is paused - if (mbPerSec < 0.0) { - throw new IllegalArgumentException("mbPerSec must be positive; got: " + mbPerSec); + public void setMBPerSec(double mbPerSec) { + // Synchronized to make updates to mbPerSec and minPauseCheckBytes atomic. + synchronized (this) { + // 0.0 is allowed: it means the merge is paused + if (mbPerSec < 0.0) { + throw new IllegalArgumentException("mbPerSec must be positive; got: " + mbPerSec); + } + this.mbPerSec = mbPerSec; + + // NOTE: Double.POSITIVE_INFINITY casts to Long.MAX_VALUE + this.minPauseCheckBytes = Math.min(1024*1024, (long) ((MIN_PAUSE_CHECK_MSEC / 1000.0) * mbPerSec * 1024 * 1024)); + assert minPauseCheckBytes >= 0; } - this.mbPerSec = mbPerSec; - // NOTE: Double.POSITIVE_INFINITY casts to Long.MAX_VALUE - minPauseCheckBytes = Math.min(1024*1024, (long) ((MIN_PAUSE_CHECK_MSEC / 1000.0) * mbPerSec * 1024 * 1024)); - assert minPauseCheckBytes >= 0; - notify(); + + mergeProgress.wakeup(); } @Override - public synchronized double getMBPerSec() { + public double getMBPerSec() { return mbPerSec; } /** Returns total bytes written by this merge. */ public long getTotalBytesWritten() { - return totalBytesWritten; + return totalBytesWritten.get(); } @Override public long pause(long bytes) throws MergePolicy.MergeAbortedException { + totalBytesWritten.addAndGet(bytes); - totalBytesWritten += bytes; - - long startNS = System.nanoTime(); - long curNS = startNS; - - // While loop because 1) Thread.wait doesn't always sleep long - // enough, and 2) we wake up and check again when our rate limit + // While loop because we may wake up and check again when our rate limit // is changed while we were pausing: - long pausedNS = 0; - while (true) { - PauseResult result = maybePause(bytes, curNS); - if (result == PauseResult.NO) { - // Set to curNS, not targetNS, to enforce the instant rate, not - // the "averaaged over all history" rate: - lastNS = curNS; - break; - } - curNS = System.nanoTime(); - long ns = curNS - startNS; - startNS = curNS; - - // Separately track when merge was stopped vs rate limited: - if (result == PauseResult.STOPPED) { - totalStoppedNS += ns; - } else { - assert result == PauseResult.PAUSED; - totalPausedNS += ns; - } - pausedNS += ns; + long paused = 0; + long delta; + while ((delta = maybePause(bytes, System.nanoTime())) >= 0) { + // Keep waiting. + paused += delta; } - return pausedNS; + return paused; } /** Total NS merge was stopped. */ - public synchronized long getTotalStoppedNS() { - return totalStoppedNS; + public long getTotalStoppedNS() { + return mergeProgress.getPauseTimes().get(PauseReason.STOPPED); } /** Total NS merge was paused to rate limit IO. */ - public synchronized long getTotalPausedNS() { - return totalPausedNS; + public long getTotalPausedNS() { + return mergeProgress.getPauseTimes().get(PauseReason.PAUSED); } - /** Returns NO if no pause happened, STOPPED if pause because rate was 0.0 (merge is stopped), PAUSED if paused with a normal rate limit. */ - private synchronized PauseResult maybePause(long bytes, long curNS) throws MergePolicy.MergeAbortedException { - + /** + * Returns the number of nanoseconds spent in a paused state or -1 + * if no pause was applied. If the thread needs pausing, this method delegates + * to the linked {@link OneMergeProgress}. + */ + private long maybePause(long bytes, long curNS) throws MergePolicy.MergeAbortedException { // Now is a good time to abort the merge: - checkAbort(); + if (mergeProgress.isAborted()) { + throw new MergePolicy.MergeAbortedException("Merge aborted."); + } - double secondsToPause = (bytes/1024./1024.) / mbPerSec; + double rate = mbPerSec; // read from volatile rate once. + double secondsToPause = (bytes/1024./1024.) / rate; // Time we should sleep until; this is purely instantaneous // rate (just adds seconds onto the last time we had paused to); @@ -140,54 +129,30 @@ public class MergeRateLimiter extends RateLimiter { long curPauseNS = targetNS - curNS; - // NOTE: except maybe on real-time JVMs, minimum realistic - // wait/sleep time is 1 msec; if you pass just 1 nsec the impl - // rounds up to 1 msec, so we don't bother unless it's > 2 msec: - - if (curPauseNS <= 2000000) { - return PauseResult.NO; + // We don't bother with thread pausing if the pause is smaller than 2 msec. + if (curPauseNS <= MIN_PAUSE_NS) { + // Set to curNS, not targetNS, to enforce the instant rate, not + // the "averaged over all history" rate: + lastNS = curNS; + return -1; } - // Defensive: sleep for at most 250 msec; the loop above will call us again if we should keep sleeping: - if (curPauseNS > 250L*1000000) { - curPauseNS = 250L*1000000; + // Defensive: don't sleep for too long; the loop above will call us again if + // we should keep sleeping and the rate may be adjusted in between. + if (curPauseNS > MAX_PAUSE_NS) { + curPauseNS = MAX_PAUSE_NS; } - int sleepMS = (int) (curPauseNS / 1000000); - int sleepNS = (int) (curPauseNS % 1000000); - - double rate = mbPerSec; - + long start = System.nanoTime(); try { - // CMS can wake us up here if it changes our target rate: - wait(sleepMS, sleepNS); + mergeProgress.pauseNanos( + curPauseNS, + rate == 0.0 ? PauseReason.STOPPED : PauseReason.PAUSED, + () -> rate == mbPerSec); } catch (InterruptedException ie) { throw new ThreadInterruptedException(ie); } - - if (rate == 0.0) { - return PauseResult.STOPPED; - } else { - return PauseResult.PAUSED; - } - } - - /** Throws {@link MergePolicy.MergeAbortedException} if this merge was aborted. */ - public synchronized void checkAbort() throws MergePolicy.MergeAbortedException { - if (abort) { - throw new MergePolicy.MergeAbortedException("merge is aborted: " + merge.segString()); - } - } - - /** Mark this merge aborted. */ - public synchronized void setAbort() { - abort = true; - notify(); - } - - /** Returns true if this merge was aborted. */ - public synchronized boolean getAbort() { - return abort; + return System.nanoTime() - start; } @Override diff --git a/lucene/core/src/java/org/apache/lucene/index/MergeScheduler.java b/lucene/core/src/java/org/apache/lucene/index/MergeScheduler.java index 65af45b6625..66d08706080 100644 --- a/lucene/core/src/java/org/apache/lucene/index/MergeScheduler.java +++ b/lucene/core/src/java/org/apache/lucene/index/MergeScheduler.java @@ -20,6 +20,9 @@ package org.apache.lucene.index; import java.io.Closeable; import java.io.IOException; +import org.apache.lucene.index.MergePolicy.OneMerge; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.RateLimitedIndexOutput; import org.apache.lucene.util.InfoStream; /**

Expert: {@link IndexWriter} uses an instance @@ -42,6 +45,15 @@ public abstract class MergeScheduler implements Closeable { * */ public abstract void merge(IndexWriter writer, MergeTrigger trigger, boolean newMergesFound) throws IOException; + /** + * Wraps the incoming {@link Directory} so that we can merge-throttle it + * using {@link RateLimitedIndexOutput}. + */ + public Directory wrapForMerge(OneMerge merge, Directory in) { + // A no-op by default. + return in; + } + /** Close this MergeScheduler. */ @Override public abstract void close() throws IOException; diff --git a/lucene/core/src/java/org/apache/lucene/index/NoMergeScheduler.java b/lucene/core/src/java/org/apache/lucene/index/NoMergeScheduler.java index 16306535afc..e4c01366031 100644 --- a/lucene/core/src/java/org/apache/lucene/index/NoMergeScheduler.java +++ b/lucene/core/src/java/org/apache/lucene/index/NoMergeScheduler.java @@ -16,6 +16,8 @@ */ package org.apache.lucene.index; +import org.apache.lucene.index.MergePolicy.OneMerge; +import org.apache.lucene.store.Directory; /** * A {@link MergeScheduler} which never executes any merges. It is also a @@ -41,6 +43,11 @@ public final class NoMergeScheduler extends MergeScheduler { @Override public void merge(IndexWriter writer, MergeTrigger trigger, boolean newMergesFound) {} + + @Override + public Directory wrapForMerge(OneMerge merge, Directory in) { + return in; + } @Override public MergeScheduler clone() { diff --git a/lucene/core/src/test/org/apache/lucene/index/TestMergeRateLimiter.java b/lucene/core/src/test/org/apache/lucene/index/TestMergeRateLimiter.java index ef922bb2f8b..723cfbc2007 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestMergeRateLimiter.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestMergeRateLimiter.java @@ -27,8 +27,8 @@ public class TestMergeRateLimiter extends LuceneTestCase { RandomIndexWriter w = new RandomIndexWriter(random(), dir); w.addDocument(new Document()); w.close(); - MergePolicy.OneMerge merge = new MergePolicy.OneMerge(SegmentInfos.readLatestCommit(dir).asList()); - MergeRateLimiter rateLimiter = new MergeRateLimiter(merge); + + MergeRateLimiter rateLimiter = new MergeRateLimiter(new MergePolicy.OneMergeProgress()); assertEquals(Double.POSITIVE_INFINITY, rateLimiter.getMBPerSec(), 0.0); assertTrue(rateLimiter.getMinPauseCheckBytes() > 0); dir.close();