LUCENE-7700: Move throughput control and merge aborting out of IndexWriter's core.

This commit is contained in:
Dawid Weiss 2017-03-10 10:23:29 +01:00
parent d2bf30d58f
commit 9540bc3758
8 changed files with 358 additions and 201 deletions

View File

@ -117,6 +117,12 @@ API Changes
instead of once all shard responses are present. (Simon Willnauer,
Mike McCandless)
* LUCENE-7700: A cleanup of merge throughput control logic. Refactored all the
code previously scattered throughout the IndexWriter and
ConcurrentMergeScheduler into a more accessible set of public methods (see
MergePolicy.OneMergeProgress, MergeScheduler.wrapForMerge and
OneMerge.mergeInit). (Dawid Weiss, Mike McCandless).
* LUCENE-7734: FieldType's copy constructor was widened to accept any IndexableFieldType.
(David Smiley)

View File

@ -25,6 +25,11 @@ import java.util.Locale;
import org.apache.lucene.index.MergePolicy.OneMerge;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.RateLimitedIndexOutput;
import org.apache.lucene.store.RateLimiter;
import org.apache.lucene.util.CollectionUtil;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.ThreadInterruptedException;
@ -255,6 +260,36 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
assert false: "merge thread " + currentThread + " was not found";
}
@Override
public Directory wrapForMerge(OneMerge merge, Directory in) {
Thread mergeThread = Thread.currentThread();
if (!MergeThread.class.isInstance(mergeThread)) {
throw new AssertionError("wrapForMerge should be called from MergeThread. Current thread: "
+ mergeThread);
}
// Return a wrapped Directory which has rate-limited output.
RateLimiter rateLimiter = ((MergeThread) mergeThread).rateLimiter;
return new FilterDirectory(in) {
@Override
public IndexOutput createOutput(String name, IOContext context) throws IOException {
ensureOpen();
// This Directory is only supposed to be used during merging,
// so all writes should have MERGE context, else there is a bug
// somewhere that is failing to pass down the right IOContext:
assert context.context == IOContext.Context.MERGE: "got context=" + context.context;
// Because rateLimiter is bound to a particular merge thread, this method should
// always be called from that context. Verify this.
assert mergeThread == Thread.currentThread() : "Not the same merge thread, current="
+ Thread.currentThread() + ", expected=" + mergeThread;
return new RateLimitedIndexOutput(rateLimiter, in.createOutput(name, context));
}
};
}
/**
* Called whenever the running merges have changed, to set merge IO limits.
* This method sorts the merge threads by their merge size in
@ -327,8 +362,9 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
newMBPerSec = targetMBPerSec;
}
double curMBPerSec = merge.rateLimiter.getMBPerSec();
MergeRateLimiter rateLimiter = mergeThread.rateLimiter;
double curMBPerSec = rateLimiter.getMBPerSec();
if (verbose()) {
long mergeStartNS = merge.mergeStartNS;
if (mergeStartNS == -1) {
@ -339,11 +375,11 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
message.append(String.format(Locale.ROOT, "merge thread %s estSize=%.1f MB (written=%.1f MB) runTime=%.1fs (stopped=%.1fs, paused=%.1fs) rate=%s\n",
mergeThread.getName(),
bytesToMB(merge.estimatedMergeBytes),
bytesToMB(merge.rateLimiter.totalBytesWritten),
bytesToMB(rateLimiter.getTotalBytesWritten()),
nsToSec(now - mergeStartNS),
nsToSec(merge.rateLimiter.getTotalStoppedNS()),
nsToSec(merge.rateLimiter.getTotalPausedNS()),
rateToString(merge.rateLimiter.getMBPerSec())));
nsToSec(rateLimiter.getTotalStoppedNS()),
nsToSec(rateLimiter.getTotalPausedNS()),
rateToString(rateLimiter.getMBPerSec())));
if (newMBPerSec != curMBPerSec) {
if (newMBPerSec == 0.0) {
@ -364,7 +400,7 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
}
}
merge.rateLimiter.setMBPerSec(newMBPerSec);
rateLimiter.setMBPerSec(newMBPerSec);
}
if (verbose()) {
message(message.toString());
@ -449,7 +485,7 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
Thread currentThread = Thread.currentThread();
int count = 0;
for (MergeThread mergeThread : mergeThreads) {
if (currentThread != mergeThread && mergeThread.isAlive() && mergeThread.merge.rateLimiter.getAbort() == false) {
if (currentThread != mergeThread && mergeThread.isAlive() && mergeThread.merge.isAborted() == false) {
count++;
}
}
@ -497,8 +533,6 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
return;
}
updateIOThrottle(merge);
boolean success = false;
try {
if (verbose()) {
@ -507,14 +541,16 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
// OK to spawn a new merge thread to handle this
// merge:
final MergeThread merger = getMergeThread(writer, merge);
mergeThreads.add(merger);
final MergeThread newMergeThread = getMergeThread(writer, merge);
mergeThreads.add(newMergeThread);
updateIOThrottle(newMergeThread.merge, newMergeThread.rateLimiter);
if (verbose()) {
message(" launch new thread [" + merger.getName() + "]");
message(" launch new thread [" + newMergeThread.getName() + "]");
}
merger.start();
newMergeThread.start();
updateMergeThreads();
success = true;
@ -598,16 +634,17 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
/** Runs a merge thread to execute a single merge, then exits. */
protected class MergeThread extends Thread implements Comparable<MergeThread> {
final IndexWriter writer;
final OneMerge merge;
final MergeRateLimiter rateLimiter;
/** Sole constructor. */
public MergeThread(IndexWriter writer, OneMerge merge) {
this.writer = writer;
this.merge = merge;
this.rateLimiter = new MergeRateLimiter(merge.getMergeProgress());
}
@Override
public int compareTo(MergeThread other) {
// Larger merges sort first:
@ -616,9 +653,7 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
@Override
public void run() {
try {
if (verbose()) {
message(" merge thread: start");
}
@ -715,7 +750,7 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
}
/** Tunes IO throttle when a new merge starts. */
private synchronized void updateIOThrottle(OneMerge newMerge) throws IOException {
private synchronized void updateIOThrottle(OneMerge newMerge, MergeRateLimiter rateLimiter) throws IOException {
if (doAutoIOThrottle == false) {
return;
}
@ -794,7 +829,7 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
} else {
rate = targetMBPerSec;
}
newMerge.rateLimiter.setMBPerSec(rate);
rateLimiter.setMBPerSec(rate);
targetMBPerSecChanged();
}

View File

@ -36,6 +36,7 @@ import java.util.Queue;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.codecs.Codec;
@ -51,22 +52,18 @@ import org.apache.lucene.search.Sort;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.FlushInfo;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.Lock;
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.store.LockValidatingDirectoryWrapper;
import org.apache.lucene.store.MMapDirectory;
import org.apache.lucene.store.MergeInfo;
import org.apache.lucene.store.RateLimitedIndexOutput;
import org.apache.lucene.store.TrackingDirectoryWrapper;
import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.CloseableThreadLocal;
import org.apache.lucene.util.Constants;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.InfoStream;
@ -277,7 +274,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
private final Directory directoryOrig; // original user directory
private final Directory directory; // wrapped with additional checks
private final Directory mergeDirectory; // wrapped with throttling: used for merging
private final Analyzer analyzer; // how to analyze text
private final AtomicLong changeCount = new AtomicLong(); // increments every time a change is completed
@ -353,8 +349,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* card to make sure they can later charge you when you check out. */
final AtomicLong pendingNumDocs = new AtomicLong();
final CloseableThreadLocal<MergeRateLimiter> rateLimiters = new CloseableThreadLocal<>();
DirectoryReader getReader() throws IOException {
return getReader(true, false);
}
@ -809,10 +803,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
directoryOrig = d;
directory = new LockValidatingDirectoryWrapper(d, writeLock);
// Directory we use for merging, so we can abort running merges, and so
// merge schedulers can optionally rate-limit per-merge IO:
mergeDirectory = addMergeRateLimiters(directory);
analyzer = config.getAnalyzer();
mergeScheduler = config.getMergeScheduler();
mergeScheduler.setInfoStream(infoStream);
@ -2212,8 +2202,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
try {
abortMerges();
rateLimiters.close();
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "rollback: done finish merges");
}
@ -2418,7 +2406,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "now abort pending merge " + segString(merge.segments));
}
merge.rateLimiter.setAbort();
merge.setAborted();
mergeFinish(merge);
}
pendingMerges.clear();
@ -2427,7 +2415,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "now abort running merge " + segString(merge.segments));
}
merge.rateLimiter.setAbort();
merge.setAborted();
}
// We wait here to make all merges stop. It should not
@ -2775,13 +2763,17 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* index.
*
* <p>
* <b>NOTE:</b> this method merges all given {@link LeafReader}s in one
* <b>NOTE:</b> this merges all given {@link LeafReader}s in one
* merge. If you intend to merge a large number of readers, it may be better
* to call this method multiple times, each time with a small set of readers.
* In principle, if you use a merge policy with a {@code mergeFactor} or
* {@code maxMergeAtOnce} parameter, you should pass that many readers in one
* call.
*
* <p>
* <b>NOTE:</b> this method does not call or make use of the {@link MergeScheduler},
* so any custom bandwidth throttling is at the moment ignored.
*
* @return The <a href="#sequence_number">sequence number</a>
* for this operation
*
@ -2832,8 +2824,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
SegmentMerger merger = new SegmentMerger(Arrays.asList(readers), info, infoStream, trackingDir,
globalFieldNumberMap,
context);
rateLimiters.set(new MergeRateLimiter(null));
if (!merger.shouldMerge()) {
return docWriter.deleteQueue.getNextSequenceNumber();
@ -2864,7 +2854,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
// Now create the compound file if needed
if (useCompoundFile) {
Collection<String> filesToDelete = infoPerCommit.files();
TrackingDirectoryWrapper trackingCFSDir = new TrackingDirectoryWrapper(mergeDirectory);
TrackingDirectoryWrapper trackingCFSDir = new TrackingDirectoryWrapper(directory);
// TODO: unlike merge, on exception we arent sniping any trash cfs files here?
// createCompoundFile tries to cleanup, but it might not always be able to...
try {
@ -3745,7 +3735,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
// deleter.refresh() call that will remove any index
// file that current segments does not reference), we
// abort this merge
if (merge.rateLimiter.getAbort()) {
if (merge.isAborted()) {
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "commitMerge: skip: it was aborted");
}
@ -3905,8 +3895,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
boolean success = false;
rateLimiters.set(merge.rateLimiter);
final long t0 = System.currentTimeMillis();
final MergePolicy mergePolicy = config.getMergePolicy();
@ -3937,7 +3925,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "hit exception during merge");
}
} else if (merge.rateLimiter.getAbort() == false && (merge.maxNumSegments != UNBOUNDED_MAX_MERGE_SEGMENTS || (!closed && !closing))) {
} else if (!merge.isAborted() && (merge.maxNumSegments != UNBOUNDED_MAX_MERGE_SEGMENTS || (!closed && !closing))) {
// This merge (and, generally, any change to the
// segments) may now enable new merges, so we call
// merge policy & update pending merges.
@ -3951,7 +3939,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
tragicEvent(t, "merge");
}
if (merge.info != null && merge.rateLimiter.getAbort() == false) {
if (merge.info != null && merge.isAborted() == false) {
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "merge time " + (System.currentTimeMillis()-t0) + " msec for " + merge.info.info.maxDoc() + " docs");
}
@ -3976,7 +3964,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
assert merge.segments.size() > 0;
if (stopMerges) {
merge.rateLimiter.setAbort();
merge.setAborted();
throw new MergePolicy.MergeAbortedException("merge is aborted: " + segString(merge.segments));
}
@ -4087,7 +4075,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
return;
}
if (merge.rateLimiter.getAbort()) {
merge.mergeInit();
if (merge.isAborted()) {
return;
}
@ -4239,9 +4229,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* but without holding synchronized lock on IndexWriter
* instance */
private int mergeMiddle(MergePolicy.OneMerge merge, MergePolicy mergePolicy) throws IOException {
merge.checkAborted();
merge.rateLimiter.checkAbort();
Directory mergeDirectory = config.getMergeScheduler().wrapForMerge(merge, directory);
List<SegmentCommitInfo> sourceSegments = merge.segments;
IOContext context = new IOContext(merge.getStoreMergeInfo());
@ -4339,7 +4329,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
globalFieldNumberMap,
context);
merge.rateLimiter.checkAbort();
merge.checkAborted();
merge.mergeStartNS = System.nanoTime();
@ -4354,11 +4344,20 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
if (infoStream.isEnabled("IW")) {
if (merger.shouldMerge()) {
String pauseInfo = merge.getMergeProgress().getPauseTimes().entrySet()
.stream()
.filter((e) -> e.getValue() > 0)
.map((e) -> String.format(Locale.ROOT, "%.1f sec %s",
e.getValue() / 1000000000.,
e.getKey().name().toLowerCase(Locale.ROOT)))
.collect(Collectors.joining(", "));
if (!pauseInfo.isEmpty()) {
pauseInfo = " (" + pauseInfo + ")";
}
long t1 = System.nanoTime();
double sec = (t1-merge.mergeStartNS)/1000000000.;
double segmentMB = (merge.info.sizeInBytes()/1024./1024.);
double stoppedSec = merge.rateLimiter.getTotalStoppedNS()/1000000000.;
double throttleSec = merge.rateLimiter.getTotalPausedNS()/1000000000.;
infoStream.message("IW", "merge codec=" + codec + " maxDoc=" + merge.info.info.maxDoc() + "; merged segment has " +
(mergeState.mergeFieldInfos.hasVectors() ? "vectors" : "no vectors") + "; " +
(mergeState.mergeFieldInfos.hasNorms() ? "norms" : "no norms") + "; " +
@ -4367,10 +4366,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
(mergeState.mergeFieldInfos.hasProx() ? "freqs" : "no freqs") + "; " +
(mergeState.mergeFieldInfos.hasPointValues() ? "points" : "no points") + "; " +
String.format(Locale.ROOT,
"%.1f sec (%.1f sec stopped, %.1f sec paused) to merge segment [%.2f MB, %.2f MB/sec]",
"%.1f sec%s to merge segment [%.2f MB, %.2f MB/sec]",
sec,
stoppedSec,
throttleSec,
pauseInfo,
segmentMB,
segmentMB / sec));
} else {
@ -4406,7 +4404,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
success = true;
} catch (Throwable t) {
synchronized(this) {
if (merge.rateLimiter.getAbort()) {
if (merge.isAborted()) {
// This can happen if rollback is called while we were building
// our CFS -- fall through to logic below to remove the non-CFS
// merged files:
@ -4439,7 +4437,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
// registered with IFD
deleteNewFiles(filesToRemove);
if (merge.rateLimiter.getAbort()) {
if (merge.isAborted()) {
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "abort merge after building CFS");
}
@ -5063,30 +5061,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
throw new IllegalArgumentException("number of documents in the index cannot exceed " + actualMaxDocs + " (current document count is " + pendingNumDocs.get() + "; added numDocs is " + addedNumDocs + ")");
}
/** Wraps the incoming {@link Directory} so that we assign a per-thread
* {@link MergeRateLimiter} to all created {@link IndexOutput}s. */
private Directory addMergeRateLimiters(Directory in) {
return new FilterDirectory(in) {
@Override
public IndexOutput createOutput(String name, IOContext context) throws IOException {
ensureOpen();
// Paranoia defense: if this trips we have a bug somewhere...
IndexWriter.this.ensureOpen(false);
// This Directory is only supposed to be used during merging,
// so all writes should have MERGE context, else there is a bug
// somewhere that is failing to pass down the right IOContext:
assert context.context == IOContext.Context.MERGE: "got context=" + context.context;
MergeRateLimiter rateLimiter = rateLimiters.get();
assert rateLimiter != null;
return new RateLimitedIndexOutput(rateLimiter, in.createOutput(name, context));
}
};
}
/** Returns the highest <a href="#sequence_number">sequence number</a> across
* all completed operations, or 0 if no operations have finished yet. Still
* in-flight operations (in other threads) are not counted until they finish.

View File

@ -19,12 +19,19 @@ package org.apache.lucene.index;
import java.io.IOException;
import java.util.ArrayList;
import java.util.EnumMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BooleanSupplier;
import java.util.stream.Collectors;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.MergeInfo;
import org.apache.lucene.store.RateLimiter;
/**
* <p>Expert: a MergePolicy determines the sequence of
@ -55,6 +62,125 @@ import org.apache.lucene.store.RateLimiter;
* @lucene.experimental
*/
public abstract class MergePolicy {
/**
* Progress and state for an executing merge. This class
* encapsulates the logic to pause and resume the merge thread
* or to abort the merge entirely.
*
* @lucene.experimental */
public static class OneMergeProgress {
/** Reason for pausing the merge thread. */
public static enum PauseReason {
/** Stopped (because of throughput rate set to 0, typically). */
STOPPED,
/** Temporarily paused because of exceeded throughput rate. */
PAUSED,
/** Other reason. */
OTHER
};
private final ReentrantLock pauseLock = new ReentrantLock();
private final Condition pausing = pauseLock.newCondition();
/**
* Pause times (in nanoseconds) for each {@link PauseReason}.
*/
private final EnumMap<PauseReason, AtomicLong> pauseTimesNS;
private volatile boolean aborted;
/**
* This field is for sanity-check purposes only. Only the same thread that invoked
* {@link OneMerge#mergeInit()} is permitted to be calling
* {@link #pauseNanos}. This is always verified at runtime.
*/
private Thread owner;
/** Creates a new merge progress info. */
public OneMergeProgress() {
// Place all the pause reasons in there immediately so that we can simply update values.
pauseTimesNS = new EnumMap<PauseReason,AtomicLong>(PauseReason.class);
for (PauseReason p : PauseReason.values()) {
pauseTimesNS.put(p, new AtomicLong());
}
}
/**
* Abort the merge this progress tracks at the next
* possible moment.
*/
public void abort() {
aborted = true;
wakeup(); // wakeup any paused merge thread.
}
/**
* Return the aborted state of this merge.
*/
public boolean isAborted() {
return aborted;
}
/**
* Pauses the calling thread for at least <code>pauseNanos</code> nanoseconds
* unless the merge is aborted or the external condition returns <code>false</code>,
* in which case control returns immediately.
*
* The external condition is required so that other threads can terminate the pausing immediately,
* before <code>pauseNanos</code> expires. We can't rely on just {@link Condition#awaitNanos(long)} alone
* because it can return due to spurious wakeups too.
*
* @param condition The pause condition that should return false if immediate return from this
* method is needed. Other threads can wake up any sleeping thread by calling
* {@link #wakeup}, but it'd fall to sleep for the remainder of the requested time if this
* condition
*/
public void pauseNanos(long pauseNanos, PauseReason reason, BooleanSupplier condition) throws InterruptedException {
if (Thread.currentThread() != owner) {
throw new RuntimeException("Only the merge owner thread can call pauseNanos(). This thread: "
+ Thread.currentThread().getName() + ", owner thread: "
+ owner);
}
long start = System.nanoTime();
AtomicLong timeUpdate = pauseTimesNS.get(reason);
pauseLock.lock();
try {
while (pauseNanos > 0 && !aborted && condition.getAsBoolean()) {
pauseNanos = pausing.awaitNanos(pauseNanos);
}
} finally {
pauseLock.unlock();
timeUpdate.addAndGet(System.nanoTime() - start);
}
}
/**
* Request a wakeup for any threads stalled in {@link #pauseNanos}.
*/
public void wakeup() {
pauseLock.lock();
try {
pausing.signalAll();
} finally {
pauseLock.unlock();
}
}
/** Returns pause reasons and associated times in nanoseconds. */
public Map<PauseReason,Long> getPauseTimes() {
Set<Entry<PauseReason,AtomicLong>> entries = pauseTimesNS.entrySet();
return entries.stream()
.collect(Collectors.toMap(
(e) -> e.getKey(),
(e) -> e.getValue().get()));
}
final void setMergeThread(Thread owner) {
assert this.owner == null;
this.owner = owner;
}
}
/** OneMerge provides the information necessary to perform
* an individual primitive merge operation, resulting in
@ -64,7 +190,6 @@ public abstract class MergePolicy {
*
* @lucene.experimental */
public static class OneMerge {
SegmentCommitInfo info; // used by IndexWriter
boolean registerDone; // used by IndexWriter
long mergeGen; // used by IndexWriter
@ -82,8 +207,10 @@ public abstract class MergePolicy {
/** Segments to be merged. */
public final List<SegmentCommitInfo> segments;
/** A private {@link RateLimiter} for this merge, used to rate limit writes and abort. */
public final MergeRateLimiter rateLimiter;
/**
* Control used to pause/stop/resume the merge thread.
*/
private final OneMergeProgress mergeProgress;
volatile long mergeStartNS = -1;
@ -106,9 +233,17 @@ public abstract class MergePolicy {
}
totalMaxDoc = count;
rateLimiter = new MergeRateLimiter(this);
mergeProgress = new OneMergeProgress();
}
/**
* Called by {@link IndexWriter} after the merge started and from the
* thread that will be executing the merge.
*/
public void mergeInit() throws IOException {
mergeProgress.setMergeThread(Thread.currentThread());
}
/** Called by {@link IndexWriter} after the merge is done and all readers have been closed. */
public void mergeFinished() throws IOException {
}
@ -163,7 +298,7 @@ public abstract class MergePolicy {
if (maxNumSegments != -1) {
b.append(" [maxNumSegments=" + maxNumSegments + "]");
}
if (rateLimiter.getAbort()) {
if (isAborted()) {
b.append(" [ABORTED]");
}
return b.toString();
@ -194,7 +329,32 @@ public abstract class MergePolicy {
/** Return {@link MergeInfo} describing this merge. */
public MergeInfo getStoreMergeInfo() {
return new MergeInfo(totalMaxDoc, estimatedMergeBytes, isExternal, maxNumSegments);
}
}
/** Returns true if this merge was or should be aborted. */
public boolean isAborted() {
return mergeProgress.isAborted();
}
/** Marks this merge as aborted. The merge thread should terminate at the soonest possible moment. */
public void setAborted() {
this.mergeProgress.abort();
}
/** Checks if merge has been aborted and throws a merge exception if so. */
public void checkAborted() throws MergeAbortedException {
if (isAborted()) {
throw new MergePolicy.MergeAbortedException("merge is aborted: " + segString());
}
}
/**
* Returns a {@link OneMergeProgress} instance for this merge, which provides
* statistics of the merge threads (run time vs. sleep time) if merging is throttled.
*/
public OneMergeProgress getMergeProgress() {
return mergeProgress;
}
}
/**
@ -222,8 +382,7 @@ public abstract class MergePolicy {
merges.add(merge);
}
/** Returns a description of the merges in this
* specification. */
/** Returns a description of the merges in this specification. */
public String segString(Directory dir) {
StringBuilder b = new StringBuilder();
b.append("MergeSpec:\n");
@ -235,8 +394,7 @@ public abstract class MergePolicy {
}
}
/** Exception thrown if there are any problems while
* executing a merge. */
/** Exception thrown if there are any problems while executing a merge. */
public static class MergeException extends RuntimeException {
private Directory dir;
@ -259,9 +417,9 @@ public abstract class MergePolicy {
}
}
/** Thrown when a merge was explicity aborted because
/** Thrown when a merge was explicitly aborted because
* {@link IndexWriter#abortMerges} was called. Normally
* this exception is privately caught and suppresed by
* this exception is privately caught and suppressed by
* {@link IndexWriter}. */
public static class MergeAbortedException extends IOException {
/** Create a {@link MergeAbortedException}. */

View File

@ -20,118 +20,107 @@ package org.apache.lucene.index;
import org.apache.lucene.store.RateLimiter;
import org.apache.lucene.util.ThreadInterruptedException;
import static org.apache.lucene.store.RateLimiter.SimpleRateLimiter;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.lucene.index.MergePolicy.OneMergeProgress;
import org.apache.lucene.index.MergePolicy.OneMergeProgress.PauseReason;
/** This is the {@link RateLimiter} that {@link IndexWriter} assigns to each running merge, to
* give {@link MergeScheduler}s ionice like control.
*
* This is similar to {@link SimpleRateLimiter}, except it's merge-private,
* it will wake up if its rate changes while it's paused, it tracks how
* much time it spent stopped and paused, and it supports aborting.
*
* @lucene.internal */
public class MergeRateLimiter extends RateLimiter {
private final static int MIN_PAUSE_CHECK_MSEC = 25;
volatile long totalBytesWritten;
private final static long MIN_PAUSE_NS = TimeUnit.MILLISECONDS.toNanos(2);
private final static long MAX_PAUSE_NS = TimeUnit.MILLISECONDS.toNanos(250);
private volatile double mbPerSec;
private volatile long minPauseCheckBytes;
double mbPerSec;
private long lastNS;
private long minPauseCheckBytes;
private boolean abort;
long totalPausedNS;
long totalStoppedNS;
final MergePolicy.OneMerge merge;
/** Returned by {@link #maybePause}. */
private static enum PauseResult {NO, STOPPED, PAUSED};
private AtomicLong totalBytesWritten = new AtomicLong();
private final OneMergeProgress mergeProgress;
/** Sole constructor. */
public MergeRateLimiter(MergePolicy.OneMerge merge) {
this.merge = merge;
public MergeRateLimiter(OneMergeProgress mergeProgress) {
// Initially no IO limit; use setter here so minPauseCheckBytes is set:
this.mergeProgress = mergeProgress;
setMBPerSec(Double.POSITIVE_INFINITY);
}
@Override
public synchronized void setMBPerSec(double mbPerSec) {
// 0.0 is allowed: it means the merge is paused
if (mbPerSec < 0.0) {
throw new IllegalArgumentException("mbPerSec must be positive; got: " + mbPerSec);
public void setMBPerSec(double mbPerSec) {
// Synchronized to make updates to mbPerSec and minPauseCheckBytes atomic.
synchronized (this) {
// 0.0 is allowed: it means the merge is paused
if (mbPerSec < 0.0) {
throw new IllegalArgumentException("mbPerSec must be positive; got: " + mbPerSec);
}
this.mbPerSec = mbPerSec;
// NOTE: Double.POSITIVE_INFINITY casts to Long.MAX_VALUE
this.minPauseCheckBytes = Math.min(1024*1024, (long) ((MIN_PAUSE_CHECK_MSEC / 1000.0) * mbPerSec * 1024 * 1024));
assert minPauseCheckBytes >= 0;
}
this.mbPerSec = mbPerSec;
// NOTE: Double.POSITIVE_INFINITY casts to Long.MAX_VALUE
minPauseCheckBytes = Math.min(1024*1024, (long) ((MIN_PAUSE_CHECK_MSEC / 1000.0) * mbPerSec * 1024 * 1024));
assert minPauseCheckBytes >= 0;
notify();
mergeProgress.wakeup();
}
@Override
public synchronized double getMBPerSec() {
public double getMBPerSec() {
return mbPerSec;
}
/** Returns total bytes written by this merge. */
public long getTotalBytesWritten() {
return totalBytesWritten;
return totalBytesWritten.get();
}
@Override
public long pause(long bytes) throws MergePolicy.MergeAbortedException {
totalBytesWritten.addAndGet(bytes);
totalBytesWritten += bytes;
long startNS = System.nanoTime();
long curNS = startNS;
// While loop because 1) Thread.wait doesn't always sleep long
// enough, and 2) we wake up and check again when our rate limit
// While loop because we may wake up and check again when our rate limit
// is changed while we were pausing:
long pausedNS = 0;
while (true) {
PauseResult result = maybePause(bytes, curNS);
if (result == PauseResult.NO) {
// Set to curNS, not targetNS, to enforce the instant rate, not
// the "averaaged over all history" rate:
lastNS = curNS;
break;
}
curNS = System.nanoTime();
long ns = curNS - startNS;
startNS = curNS;
// Separately track when merge was stopped vs rate limited:
if (result == PauseResult.STOPPED) {
totalStoppedNS += ns;
} else {
assert result == PauseResult.PAUSED;
totalPausedNS += ns;
}
pausedNS += ns;
long paused = 0;
long delta;
while ((delta = maybePause(bytes, System.nanoTime())) >= 0) {
// Keep waiting.
paused += delta;
}
return pausedNS;
return paused;
}
/** Total NS merge was stopped. */
public synchronized long getTotalStoppedNS() {
return totalStoppedNS;
public long getTotalStoppedNS() {
return mergeProgress.getPauseTimes().get(PauseReason.STOPPED);
}
/** Total NS merge was paused to rate limit IO. */
public synchronized long getTotalPausedNS() {
return totalPausedNS;
public long getTotalPausedNS() {
return mergeProgress.getPauseTimes().get(PauseReason.PAUSED);
}
/** Returns NO if no pause happened, STOPPED if pause because rate was 0.0 (merge is stopped), PAUSED if paused with a normal rate limit. */
private synchronized PauseResult maybePause(long bytes, long curNS) throws MergePolicy.MergeAbortedException {
/**
* Returns the number of nanoseconds spent in a paused state or <code>-1</code>
* if no pause was applied. If the thread needs pausing, this method delegates
* to the linked {@link OneMergeProgress}.
*/
private long maybePause(long bytes, long curNS) throws MergePolicy.MergeAbortedException {
// Now is a good time to abort the merge:
checkAbort();
if (mergeProgress.isAborted()) {
throw new MergePolicy.MergeAbortedException("Merge aborted.");
}
double secondsToPause = (bytes/1024./1024.) / mbPerSec;
double rate = mbPerSec; // read from volatile rate once.
double secondsToPause = (bytes/1024./1024.) / rate;
// Time we should sleep until; this is purely instantaneous
// rate (just adds seconds onto the last time we had paused to);
@ -140,54 +129,30 @@ public class MergeRateLimiter extends RateLimiter {
long curPauseNS = targetNS - curNS;
// NOTE: except maybe on real-time JVMs, minimum realistic
// wait/sleep time is 1 msec; if you pass just 1 nsec the impl
// rounds up to 1 msec, so we don't bother unless it's > 2 msec:
if (curPauseNS <= 2000000) {
return PauseResult.NO;
// We don't bother with thread pausing if the pause is smaller than 2 msec.
if (curPauseNS <= MIN_PAUSE_NS) {
// Set to curNS, not targetNS, to enforce the instant rate, not
// the "averaged over all history" rate:
lastNS = curNS;
return -1;
}
// Defensive: sleep for at most 250 msec; the loop above will call us again if we should keep sleeping:
if (curPauseNS > 250L*1000000) {
curPauseNS = 250L*1000000;
// Defensive: don't sleep for too long; the loop above will call us again if
// we should keep sleeping and the rate may be adjusted in between.
if (curPauseNS > MAX_PAUSE_NS) {
curPauseNS = MAX_PAUSE_NS;
}
int sleepMS = (int) (curPauseNS / 1000000);
int sleepNS = (int) (curPauseNS % 1000000);
double rate = mbPerSec;
long start = System.nanoTime();
try {
// CMS can wake us up here if it changes our target rate:
wait(sleepMS, sleepNS);
mergeProgress.pauseNanos(
curPauseNS,
rate == 0.0 ? PauseReason.STOPPED : PauseReason.PAUSED,
() -> rate == mbPerSec);
} catch (InterruptedException ie) {
throw new ThreadInterruptedException(ie);
}
if (rate == 0.0) {
return PauseResult.STOPPED;
} else {
return PauseResult.PAUSED;
}
}
/** Throws {@link MergePolicy.MergeAbortedException} if this merge was aborted. */
public synchronized void checkAbort() throws MergePolicy.MergeAbortedException {
if (abort) {
throw new MergePolicy.MergeAbortedException("merge is aborted: " + merge.segString());
}
}
/** Mark this merge aborted. */
public synchronized void setAbort() {
abort = true;
notify();
}
/** Returns true if this merge was aborted. */
public synchronized boolean getAbort() {
return abort;
return System.nanoTime() - start;
}
@Override

View File

@ -20,6 +20,9 @@ package org.apache.lucene.index;
import java.io.Closeable;
import java.io.IOException;
import org.apache.lucene.index.MergePolicy.OneMerge;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.RateLimitedIndexOutput;
import org.apache.lucene.util.InfoStream;
/** <p>Expert: {@link IndexWriter} uses an instance
@ -42,6 +45,15 @@ public abstract class MergeScheduler implements Closeable {
* */
public abstract void merge(IndexWriter writer, MergeTrigger trigger, boolean newMergesFound) throws IOException;
/**
* Wraps the incoming {@link Directory} so that we can merge-throttle it
* using {@link RateLimitedIndexOutput}.
*/
public Directory wrapForMerge(OneMerge merge, Directory in) {
// A no-op by default.
return in;
}
/** Close this MergeScheduler. */
@Override
public abstract void close() throws IOException;

View File

@ -16,6 +16,8 @@
*/
package org.apache.lucene.index;
import org.apache.lucene.index.MergePolicy.OneMerge;
import org.apache.lucene.store.Directory;
/**
* A {@link MergeScheduler} which never executes any merges. It is also a
@ -41,6 +43,11 @@ public final class NoMergeScheduler extends MergeScheduler {
@Override
public void merge(IndexWriter writer, MergeTrigger trigger, boolean newMergesFound) {}
@Override
public Directory wrapForMerge(OneMerge merge, Directory in) {
return in;
}
@Override
public MergeScheduler clone() {

View File

@ -27,8 +27,8 @@ public class TestMergeRateLimiter extends LuceneTestCase {
RandomIndexWriter w = new RandomIndexWriter(random(), dir);
w.addDocument(new Document());
w.close();
MergePolicy.OneMerge merge = new MergePolicy.OneMerge(SegmentInfos.readLatestCommit(dir).asList());
MergeRateLimiter rateLimiter = new MergeRateLimiter(merge);
MergeRateLimiter rateLimiter = new MergeRateLimiter(new MergePolicy.OneMergeProgress());
assertEquals(Double.POSITIVE_INFINITY, rateLimiter.getMBPerSec(), 0.0);
assertTrue(rateLimiter.getMinPauseCheckBytes() > 0);
dir.close();