LUCENE-6119: CMS dynamically rate limits IO writes of each merge depending on incoming merge rate

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1649532 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael McCandless 2015-01-05 14:28:28 +00:00
parent 749d0ccff0
commit 41fcc722ff
48 changed files with 995 additions and 961 deletions

View File

@ -130,6 +130,10 @@ New Features
* LUCENE-5914: Add an option to Lucene50Codec to support either BEST_SPEED * LUCENE-5914: Add an option to Lucene50Codec to support either BEST_SPEED
or BEST_COMPRESSION for stored fields. (Adrien Grand, Robert Muir) or BEST_COMPRESSION for stored fields. (Adrien Grand, Robert Muir)
* LUCENE-6119: Add auto-IO-throttling to ConcurrentMergeScheduler, to
rate limit IO writes for each merge depending on incoming merge
rate. (Mike McCandless)
Optimizations Optimizations
* LUCENE-5960: Use a more efficient bitset, not a Set<Integer>, to * LUCENE-5960: Use a more efficient bitset, not a Set<Integer>, to

View File

@ -29,7 +29,6 @@ import java.util.Locale;
import org.apache.lucene.codecs.CompoundFormat; import org.apache.lucene.codecs.CompoundFormat;
import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.MergeState.CheckAbort;
import org.apache.lucene.index.IndexFileNames; import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.index.SegmentInfo; import org.apache.lucene.index.SegmentInfo;
import org.apache.lucene.store.Directory; import org.apache.lucene.store.Directory;
@ -157,7 +156,7 @@ public class SimpleTextCompoundFormat extends CompoundFormat {
} }
@Override @Override
public void write(Directory dir, SegmentInfo si, Collection<String> files, CheckAbort checkAbort, IOContext context) throws IOException { public void write(Directory dir, SegmentInfo si, Collection<String> files, IOContext context) throws IOException {
String dataFile = IndexFileNames.segmentFileName(si.name, "", DATA_EXTENSION); String dataFile = IndexFileNames.segmentFileName(si.name, "", DATA_EXTENSION);
int numFiles = files.size(); int numFiles = files.size();
@ -181,8 +180,6 @@ public class SimpleTextCompoundFormat extends CompoundFormat {
out.copyBytes(in, in.length()); out.copyBytes(in, in.length());
} }
endOffsets[i] = out.getFilePointer(); endOffsets[i] = out.getFilePointer();
checkAbort.work(endOffsets[i] - startOffsets[i]);
} }
long tocPos = out.getFilePointer(); long tocPos = out.getFilePointer();

View File

@ -20,7 +20,6 @@ package org.apache.lucene.codecs;
import java.io.IOException; import java.io.IOException;
import java.util.Collection; import java.util.Collection;
import org.apache.lucene.index.MergeState.CheckAbort;
import org.apache.lucene.index.SegmentInfo; import org.apache.lucene.index.SegmentInfo;
import org.apache.lucene.store.Directory; import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IOContext;
@ -47,8 +46,7 @@ public abstract class CompoundFormat {
/** /**
* Packs the provided files into a compound format. * Packs the provided files into a compound format.
*/ */
// TODO: get checkAbort out of here, and everywhere, and have iw do it at a higher level public abstract void write(Directory dir, SegmentInfo si, Collection<String> files, IOContext context) throws IOException;
public abstract void write(Directory dir, SegmentInfo si, Collection<String> files, CheckAbort checkAbort, IOContext context) throws IOException;
/** /**
* Returns the compound file names used by this segment. * Returns the compound file names used by this segment.

View File

@ -96,7 +96,6 @@ public abstract class StoredFieldsWriter implements Closeable {
storedFieldsReader.visitDocument(docID, visitor); storedFieldsReader.visitDocument(docID, visitor);
finishDocument(); finishDocument();
docCount++; docCount++;
mergeState.checkAbort.work(300);
} }
} }
finish(mergeState.mergeFieldInfos, docCount); finish(mergeState.mergeFieldInfos, docCount);

View File

@ -196,7 +196,6 @@ public abstract class TermVectorsWriter implements Closeable {
} }
addAllDocVectors(vectors, mergeState); addAllDocVectors(vectors, mergeState);
docCount++; docCount++;
mergeState.checkAbort.work(300);
} }
} }
finish(mergeState.mergeFieldInfos, docCount); finish(mergeState.mergeFieldInfos, docCount);

View File

@ -506,7 +506,6 @@ public final class CompressingStoredFieldsWriter extends StoredFieldsWriter {
storedFieldsReader.visitDocument(docID, visitor); storedFieldsReader.visitDocument(docID, visitor);
finishDocument(); finishDocument();
++docCount; ++docCount;
mergeState.checkAbort.work(300);
} }
} else { } else {
// optimized merge, we copy serialized (but decompressed) bytes directly // optimized merge, we copy serialized (but decompressed) bytes directly
@ -522,7 +521,6 @@ public final class CompressingStoredFieldsWriter extends StoredFieldsWriter {
numStoredFieldsInDoc = doc.numStoredFields; numStoredFieldsInDoc = doc.numStoredFields;
finishDocument(); finishDocument();
++docCount; ++docCount;
mergeState.checkAbort.work(300);
} }
} }
} }

View File

@ -751,7 +751,6 @@ public final class CompressingTermVectorsWriter extends TermVectorsWriter {
} }
addAllDocVectors(vectors, mergeState); addAllDocVectors(vectors, mergeState);
++docCount; ++docCount;
mergeState.checkAbort.work(300);
} }
} else { } else {
final CompressingStoredFieldsIndexReader index = matchingVectorsReader.getIndex(); final CompressingStoredFieldsIndexReader index = matchingVectorsReader.getIndex();
@ -781,7 +780,6 @@ public final class CompressingTermVectorsWriter extends TermVectorsWriter {
this.vectorsStream.copyBytes(vectorsStream, chunkLength); this.vectorsStream.copyBytes(vectorsStream, chunkLength);
docCount += chunkDocs; docCount += chunkDocs;
this.numDocs += chunkDocs; this.numDocs += chunkDocs;
mergeState.checkAbort.work(300 * chunkDocs);
i = nextLiveDoc(docBase + chunkDocs, liveDocs, maxDoc); i = nextLiveDoc(docBase + chunkDocs, liveDocs, maxDoc);
} else { } else {
for (; i < docBase + chunkDocs; i = nextLiveDoc(i + 1, liveDocs, maxDoc)) { for (; i < docBase + chunkDocs; i = nextLiveDoc(i + 1, liveDocs, maxDoc)) {
@ -793,7 +791,6 @@ public final class CompressingTermVectorsWriter extends TermVectorsWriter {
} }
addAllDocVectors(vectors, mergeState); addAllDocVectors(vectors, mergeState);
++docCount; ++docCount;
mergeState.checkAbort.work(300);
} }
} }
} else { } else {
@ -805,7 +802,6 @@ public final class CompressingTermVectorsWriter extends TermVectorsWriter {
} }
addAllDocVectors(vectors, mergeState); addAllDocVectors(vectors, mergeState);
++docCount; ++docCount;
mergeState.checkAbort.work(300);
i = nextLiveDoc(i + 1, liveDocs, maxDoc); i = nextLiveDoc(i + 1, liveDocs, maxDoc);
} }
} }

View File

@ -23,7 +23,6 @@ import java.util.Collection;
import org.apache.lucene.codecs.CodecUtil; import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.codecs.CompoundFormat; import org.apache.lucene.codecs.CompoundFormat;
import org.apache.lucene.index.IndexFileNames; import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.index.MergeState.CheckAbort;
import org.apache.lucene.index.SegmentInfo; import org.apache.lucene.index.SegmentInfo;
import org.apache.lucene.store.DataOutput; import org.apache.lucene.store.DataOutput;
import org.apache.lucene.store.Directory; import org.apache.lucene.store.Directory;
@ -73,7 +72,7 @@ public final class Lucene50CompoundFormat extends CompoundFormat {
} }
@Override @Override
public void write(Directory dir, SegmentInfo si, Collection<String> files, CheckAbort checkAbort, IOContext context) throws IOException { public void write(Directory dir, SegmentInfo si, Collection<String> files, IOContext context) throws IOException {
String dataFile = IndexFileNames.segmentFileName(si.name, "", DATA_EXTENSION); String dataFile = IndexFileNames.segmentFileName(si.name, "", DATA_EXTENSION);
String entriesFile = IndexFileNames.segmentFileName(si.name, "", ENTRIES_EXTENSION); String entriesFile = IndexFileNames.segmentFileName(si.name, "", ENTRIES_EXTENSION);
@ -99,8 +98,6 @@ public final class Lucene50CompoundFormat extends CompoundFormat {
entries.writeString(IndexFileNames.stripSegmentName(file)); entries.writeString(IndexFileNames.stripSegmentName(file));
entries.writeLong(startOffset); entries.writeLong(startOffset);
entries.writeLong(length); entries.writeLong(length);
checkAbort.work(length);
} }
CodecUtil.writeFooter(data); CodecUtil.writeFooter(data);

View File

@ -23,6 +23,7 @@ import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
import java.util.List; import java.util.List;
import java.util.Locale;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
@ -173,16 +174,19 @@ class BufferedUpdatesStream implements Accountable {
Collections.sort(infos2, sortSegInfoByDelGen); Collections.sort(infos2, sortSegInfoByDelGen);
CoalescedUpdates coalescedDeletes = null; CoalescedUpdates coalescedDeletes = null;
boolean anyNewDeletes = false;
int infosIDX = infos2.size()-1; int infosIDX = infos2.size()-1;
int delIDX = updates.size()-1; int delIDX = updates.size()-1;
long totDelCount = 0;
long totTermVisitedCount = 0;
List<SegmentCommitInfo> allDeleted = null; List<SegmentCommitInfo> allDeleted = null;
while (infosIDX >= 0) { while (infosIDX >= 0) {
//System.out.println("BD: cycle delIDX=" + delIDX + " infoIDX=" + infosIDX); //System.out.println("BD: cycle delIDX=" + delIDX + " infoIDX=" + infosIDX);
final long segStartNS = System.nanoTime();
final FrozenBufferedUpdates packet = delIDX >= 0 ? updates.get(delIDX) : null; final FrozenBufferedUpdates packet = delIDX >= 0 ? updates.get(delIDX) : null;
final SegmentCommitInfo info = infos2.get(infosIDX); final SegmentCommitInfo info = infos2.get(infosIDX);
final long segGen = info.getBufferedDeletesGen(); final long segGen = info.getBufferedDeletesGen();
@ -213,12 +217,14 @@ class BufferedUpdatesStream implements Accountable {
final ReadersAndUpdates rld = readerPool.get(info, true); final ReadersAndUpdates rld = readerPool.get(info, true);
final SegmentReader reader = rld.getReader(IOContext.READ); final SegmentReader reader = rld.getReader(IOContext.READ);
int delCount = 0; int delCount = 0;
long termVisitedCount = 0;
final boolean segAllDeletes; final boolean segAllDeletes;
try { try {
final DocValuesFieldUpdates.Container dvUpdates = new DocValuesFieldUpdates.Container(); final DocValuesFieldUpdates.Container dvUpdates = new DocValuesFieldUpdates.Container();
if (coalescedDeletes != null) { if (coalescedDeletes != null) {
//System.out.println(" del coalesced"); TermDeleteCounts counts = applyTermDeletes(coalescedDeletes.termsIterable(), rld, reader);
delCount += applyTermDeletes(coalescedDeletes.termsIterable(), rld, reader); delCount += counts.delCount;
termVisitedCount += counts.termVisitedCount;
delCount += applyQueryDeletes(coalescedDeletes.queriesIterable(), rld, reader); delCount += applyQueryDeletes(coalescedDeletes.queriesIterable(), rld, reader);
applyDocValuesUpdates(coalescedDeletes.numericDVUpdates, rld, reader, dvUpdates); applyDocValuesUpdates(coalescedDeletes.numericDVUpdates, rld, reader, dvUpdates);
applyDocValuesUpdates(coalescedDeletes.binaryDVUpdates, rld, reader, dvUpdates); applyDocValuesUpdates(coalescedDeletes.binaryDVUpdates, rld, reader, dvUpdates);
@ -239,7 +245,8 @@ class BufferedUpdatesStream implements Accountable {
rld.release(reader); rld.release(reader);
readerPool.release(rld); readerPool.release(rld);
} }
anyNewDeletes |= delCount > 0; totDelCount += delCount;
totTermVisitedCount += termVisitedCount;
if (segAllDeletes) { if (segAllDeletes) {
if (allDeleted == null) { if (allDeleted == null) {
@ -249,7 +256,7 @@ class BufferedUpdatesStream implements Accountable {
} }
if (infoStream.isEnabled("BD")) { if (infoStream.isEnabled("BD")) {
infoStream.message("BD", "seg=" + info + " segGen=" + segGen + " segDeletes=[" + packet + "]; coalesced deletes=[" + (coalescedDeletes == null ? "null" : coalescedDeletes) + "] newDelCount=" + delCount + (segAllDeletes ? " 100% deleted" : "")); infoStream.message("BD", String.format(Locale.ROOT, "%.3fs", ((System.nanoTime() - segStartNS)/1000000000.0)) + " seg=" + info + " segGen=" + segGen + " segDeletes=[" + packet + "]; coalesced deletes=[" + (coalescedDeletes == null ? "null" : coalescedDeletes) + "] newDelCount=" + delCount + " termVisitedCount=" + termVisitedCount + (segAllDeletes ? " 100% deleted" : ""));
} }
if (coalescedDeletes == null) { if (coalescedDeletes == null) {
@ -274,9 +281,12 @@ class BufferedUpdatesStream implements Accountable {
final ReadersAndUpdates rld = readerPool.get(info, true); final ReadersAndUpdates rld = readerPool.get(info, true);
final SegmentReader reader = rld.getReader(IOContext.READ); final SegmentReader reader = rld.getReader(IOContext.READ);
int delCount = 0; int delCount = 0;
long termVisitedCount = 0;
final boolean segAllDeletes; final boolean segAllDeletes;
try { try {
delCount += applyTermDeletes(coalescedDeletes.termsIterable(), rld, reader); TermDeleteCounts counts = applyTermDeletes(coalescedDeletes.termsIterable(), rld, reader);
delCount += counts.delCount;
termVisitedCount += counts.termVisitedCount;
delCount += applyQueryDeletes(coalescedDeletes.queriesIterable(), rld, reader); delCount += applyQueryDeletes(coalescedDeletes.queriesIterable(), rld, reader);
DocValuesFieldUpdates.Container dvUpdates = new DocValuesFieldUpdates.Container(); DocValuesFieldUpdates.Container dvUpdates = new DocValuesFieldUpdates.Container();
applyDocValuesUpdates(coalescedDeletes.numericDVUpdates, rld, reader, dvUpdates); applyDocValuesUpdates(coalescedDeletes.numericDVUpdates, rld, reader, dvUpdates);
@ -291,7 +301,9 @@ class BufferedUpdatesStream implements Accountable {
rld.release(reader); rld.release(reader);
readerPool.release(rld); readerPool.release(rld);
} }
anyNewDeletes |= delCount > 0;
totDelCount += delCount;
totTermVisitedCount += termVisitedCount;
if (segAllDeletes) { if (segAllDeletes) {
if (allDeleted == null) { if (allDeleted == null) {
@ -301,7 +313,7 @@ class BufferedUpdatesStream implements Accountable {
} }
if (infoStream.isEnabled("BD")) { if (infoStream.isEnabled("BD")) {
infoStream.message("BD", "seg=" + info + " segGen=" + segGen + " coalesced deletes=[" + coalescedDeletes + "] newDelCount=" + delCount + (segAllDeletes ? " 100% deleted" : "")); infoStream.message("BD", String.format(Locale.ROOT, "%.3fs", ((System.nanoTime() - segStartNS)/1000000000.0)) + " seg=" + info + " segGen=" + segGen + " coalesced deletes=[" + coalescedDeletes + "] newDelCount=" + delCount + " termVisitedCount=" + termVisitedCount + (segAllDeletes ? " 100% deleted" : ""));
} }
} }
info.setBufferedDeletesGen(gen); info.setBufferedDeletesGen(gen);
@ -312,11 +324,11 @@ class BufferedUpdatesStream implements Accountable {
assert checkDeleteStats(); assert checkDeleteStats();
if (infoStream.isEnabled("BD")) { if (infoStream.isEnabled("BD")) {
infoStream.message("BD", "applyDeletes took " + (System.currentTimeMillis()-t0) + " msec"); infoStream.message("BD", "applyDeletes took " + (System.currentTimeMillis()-t0) + " msec for " + infos.size() + " segments, " + totDelCount + " deleted docs, " + totTermVisitedCount + " visited terms");
} }
// assert infos != segmentInfos || !any() : "infos=" + infos + " segmentInfos=" + segmentInfos + " any=" + any; // assert infos != segmentInfos || !any() : "infos=" + infos + " segmentInfos=" + segmentInfos + " any=" + any;
return new ApplyDeletesResult(anyNewDeletes, gen, allDeleted); return new ApplyDeletesResult(totDelCount > 0, gen, allDeleted);
} }
synchronized long getNextGen() { synchronized long getNextGen() {
@ -374,9 +386,23 @@ class BufferedUpdatesStream implements Accountable {
} }
} }
private static class TermDeleteCounts {
/** How many documents were actually deleted. */
public final int delCount;
/** How many terms we checked. */
public final long termVisitedCount;
public TermDeleteCounts(int delCount, long termVisitedCount) {
this.delCount = delCount;
this.termVisitedCount = termVisitedCount;
}
}
// Delete by Term // Delete by Term
private synchronized long applyTermDeletes(Iterable<Term> termsIter, ReadersAndUpdates rld, SegmentReader reader) throws IOException { private synchronized TermDeleteCounts applyTermDeletes(Iterable<Term> termsIter, ReadersAndUpdates rld, SegmentReader reader) throws IOException {
long delCount = 0; int delCount = 0;
long termVisitedCount = 0;
Fields fields = reader.fields(); Fields fields = reader.fields();
TermsEnum termsEnum = null; TermsEnum termsEnum = null;
@ -388,8 +414,10 @@ class BufferedUpdatesStream implements Accountable {
boolean any = false; boolean any = false;
//System.out.println(Thread.currentThread().getName() + " del terms reader=" + reader); long ns = System.nanoTime();
for (Term term : termsIter) { for (Term term : termsIter) {
termVisitedCount++;
// Since we visit terms sorted, we gain performance // Since we visit terms sorted, we gain performance
// by re-using the same TermsEnum and seeking only // by re-using the same TermsEnum and seeking only
// forwards // forwards
@ -440,7 +468,7 @@ class BufferedUpdatesStream implements Accountable {
} }
} }
return delCount; return new TermDeleteCounts(delCount, termVisitedCount);
} }
// DocValues updates // DocValues updates

View File

@ -18,15 +18,15 @@ package org.apache.lucene.index;
*/ */
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Iterator;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import org.apache.lucene.search.Query;
import org.apache.lucene.index.BufferedUpdatesStream.QueryAndLimit; import org.apache.lucene.index.BufferedUpdatesStream.QueryAndLimit;
import org.apache.lucene.index.DocValuesUpdate.BinaryDocValuesUpdate; import org.apache.lucene.index.DocValuesUpdate.BinaryDocValuesUpdate;
import org.apache.lucene.index.DocValuesUpdate.NumericDocValuesUpdate; import org.apache.lucene.index.DocValuesUpdate.NumericDocValuesUpdate;
import org.apache.lucene.search.Query;
import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.MergedIterator; import org.apache.lucene.util.MergedIterator;
@ -35,16 +35,19 @@ class CoalescedUpdates {
final List<Iterable<Term>> iterables = new ArrayList<>(); final List<Iterable<Term>> iterables = new ArrayList<>();
final List<NumericDocValuesUpdate> numericDVUpdates = new ArrayList<>(); final List<NumericDocValuesUpdate> numericDVUpdates = new ArrayList<>();
final List<BinaryDocValuesUpdate> binaryDVUpdates = new ArrayList<>(); final List<BinaryDocValuesUpdate> binaryDVUpdates = new ArrayList<>();
int totalTermCount;
@Override @Override
public String toString() { public String toString() {
// note: we could add/collect more debugging information // note: we could add/collect more debugging information
return "CoalescedUpdates(termSets=" + iterables.size() + ",queries=" return "CoalescedUpdates(termSets=" + iterables.size()
+ queries.size() + ",numericDVUpdates=" + numericDVUpdates.size() + ",totalTermCount=" + totalTermCount
+ ",queries=" + queries.size() + ",numericDVUpdates=" + numericDVUpdates.size()
+ ",binaryDVUpdates=" + binaryDVUpdates.size() + ")"; + ",binaryDVUpdates=" + binaryDVUpdates.size() + ")";
} }
void update(FrozenBufferedUpdates in) { void update(FrozenBufferedUpdates in) {
totalTermCount += in.termCount;
iterables.add(in.termsIterable()); iterables.add(in.termsIterable());
for (int queryIdx = 0; queryIdx < in.queries.length; queryIdx++) { for (int queryIdx = 0; queryIdx < in.queries.length; queryIdx++) {

View File

@ -19,9 +19,11 @@ package org.apache.lucene.index;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Comparator;
import java.util.List; import java.util.List;
import java.util.Locale;
import org.apache.lucene.index.MergePolicy.OneMerge;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory; import org.apache.lucene.store.Directory;
import org.apache.lucene.util.CollectionUtil; import org.apache.lucene.util.CollectionUtil;
import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.IOUtils;
@ -54,18 +56,17 @@ import org.apache.lucene.util.ThreadInterruptedException;
* settings for spinning or solid state disks for such * settings for spinning or solid state disks for such
* operating systems, use {@link #setDefaultMaxMergesAndThreads(boolean)}. * operating systems, use {@link #setDefaultMaxMergesAndThreads(boolean)}.
*/ */
public class ConcurrentMergeScheduler extends MergeScheduler { public class ConcurrentMergeScheduler extends MergeScheduler {
/** Dynamic default for {@code maxThreadCount} and {@code maxMergeCount}, /** Dynamic default for {@code maxThreadCount} and {@code maxMergeCount},
* used to detect whether the index is backed by an SSD or rotational disk and * used to detect whether the index is backed by an SSD or rotational disk and
* set {@code maxThreadCount} accordingly. If it's an SSD, * set {@code maxThreadCount} accordingly. If it's an SSD,
* {@code maxThreadCount} is set to {@code max(1, min(3, cpuCoreCount/2))}, * {@code maxThreadCount} is set to {@code max(1, min(4, cpuCoreCount/2))},
* otherwise 1. Note that detection only currently works on * otherwise 1. Note that detection only currently works on
* Linux; other platforms will assume the index is not on an SSD. */ * Linux; other platforms will assume the index is not on an SSD. */
public static final int AUTO_DETECT_MERGES_AND_THREADS = -1; public static final int AUTO_DETECT_MERGES_AND_THREADS = -1;
private int mergeThreadPriority = -1;
/** List of currently active {@link MergeThread}s. */ /** List of currently active {@link MergeThread}s. */
protected final List<MergeThread> mergeThreads = new ArrayList<>(); protected final List<MergeThread> mergeThreads = new ArrayList<>();
@ -81,16 +82,27 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
// throttling the incoming threads // throttling the incoming threads
private int maxMergeCount = AUTO_DETECT_MERGES_AND_THREADS; private int maxMergeCount = AUTO_DETECT_MERGES_AND_THREADS;
/** {@link Directory} that holds the index. */
protected Directory dir;
/** {@link IndexWriter} that owns this instance. */
protected IndexWriter writer;
/** How many {@link MergeThread}s have kicked off (this is use /** How many {@link MergeThread}s have kicked off (this is use
* to name them). */ * to name them). */
protected int mergeThreadCount; protected int mergeThreadCount;
/** Floor for IO write rate limit (we will never go any lower than this) */
private static final double MIN_MERGE_MB_PER_SEC = 5.0;
/** Initial value for IO write rate limit when doAutoIOThrottle is true */
private static final double START_MB_PER_SEC = 20.0;
/** Merges below this size are not counted in the maxThreadCount, i.e. they can freely run in their own thread (up until maxMergeCount). */
private static final double MIN_BIG_MERGE_MB = 50.0;
/** Current IO writes throttle rate */
protected double targetMBPerSec = START_MB_PER_SEC;
/** true if we should rate-limit writes for each merge */
private boolean doAutoIOThrottle = true;
private double forceMergeMBPerSec = Double.POSITIVE_INFINITY;
/** Sole constructor, with all settings set to default /** Sole constructor, with all settings set to default
* values. */ * values. */
public ConcurrentMergeScheduler() { public ConcurrentMergeScheduler() {
@ -142,10 +154,48 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
public synchronized void setDefaultMaxMergesAndThreads(boolean spins) { public synchronized void setDefaultMaxMergesAndThreads(boolean spins) {
if (spins) { if (spins) {
maxThreadCount = 1; maxThreadCount = 1;
maxMergeCount = 2; maxMergeCount = 6;
} else { } else {
maxThreadCount = Math.max(1, Math.min(3, Runtime.getRuntime().availableProcessors()/2)); maxThreadCount = Math.max(1, Math.min(4, Runtime.getRuntime().availableProcessors()/2));
maxMergeCount = maxThreadCount+2; maxMergeCount = maxThreadCount+5;
}
}
/** Set the per-merge IO throttle rate for forced merges (default: {@code Double.POSITIVE_INFINITY}). */
public synchronized void setForceMergeMBPerSec(double v) {
forceMergeMBPerSec = v;
updateMergeThreads();
}
/** Get the per-merge IO throttle rate for forced merges. */
public synchronized double getForceMergeMBPerSec() {
return forceMergeMBPerSec;
}
/** Turn on dynamic IO throttling, to adaptively rate limit writes
* bytes/sec to the minimal rate necessary so merges do not fall behind.
* By default this is enabled. */
public synchronized void enableAutoIOThrottle() {
doAutoIOThrottle = true;
targetMBPerSec = START_MB_PER_SEC;
updateMergeThreads();
}
/** Turn off auto IO throttling.
*
* @see #enableAutoIOThrottle */
public synchronized void disableAutoIOThrottle() {
doAutoIOThrottle = false;
updateMergeThreads();
}
/** Returns the currently set per-merge IO writes rate limit, if {@link #enableAutoIOThrottle}
* was called, else {@code Double.POSITIVE_INFINITY}. */
public synchronized double getIORateLimitMBPerSec() {
if (doAutoIOThrottle) {
return targetMBPerSec;
} else {
return Double.POSITIVE_INFINITY;
} }
} }
@ -161,48 +211,18 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
return maxMergeCount; return maxMergeCount;
} }
/** Return the priority that merge threads run at. By synchronized void removeMergeThread(MergeThread thread) {
* default the priority is 1 plus the priority of (ie, boolean result = mergeThreads.remove(thread);
* slightly higher priority than) the first thread that assert result;
* calls merge. */
public synchronized int getMergeThreadPriority() {
initMergeThreadPriority();
return mergeThreadPriority;
} }
/** Set the base priority that merge threads run at.
* Note that CMS may increase priority of some merge
* threads beyond this base priority. It's best not to
* set this any higher than
* Thread.MAX_PRIORITY-maxThreadCount, so that CMS has
* room to set relative priority among threads. */
public synchronized void setMergeThreadPriority(int pri) {
if (pri > Thread.MAX_PRIORITY || pri < Thread.MIN_PRIORITY)
throw new IllegalArgumentException("priority must be in range " + Thread.MIN_PRIORITY + " .. " + Thread.MAX_PRIORITY + " inclusive");
mergeThreadPriority = pri;
updateMergeThreads();
}
/** Sorts {@link MergeThread}s; larger merges come first. */
protected static final Comparator<MergeThread> compareByMergeDocCount = new Comparator<MergeThread>() {
@Override
public int compare(MergeThread t1, MergeThread t2) {
final MergePolicy.OneMerge m1 = t1.getCurrentMerge();
final MergePolicy.OneMerge m2 = t2.getCurrentMerge();
final int c1 = m1 == null ? Integer.MAX_VALUE : m1.totalDocCount;
final int c2 = m2 == null ? Integer.MAX_VALUE : m2.totalDocCount;
return c2 - c1;
}
};
/** /**
* Called whenever the running merges have changed, to pause and unpause * Called whenever the running merges have changed, to set merge IO limits.
* threads. This method sorts the merge threads by their merge size in * This method sorts the merge threads by their merge size in
* descending order and then pauses/unpauses threads from first to last -- * descending order and then pauses/unpauses threads from first to last --
* that way, smaller merges are guaranteed to run before larger ones. * that way, smaller merges are guaranteed to run before larger ones.
*/ */
protected synchronized void updateMergeThreads() { protected synchronized void updateMergeThreads() {
// Only look at threads that are alive & not in the // Only look at threads that are alive & not in the
@ -217,93 +237,121 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
mergeThreads.remove(threadIdx); mergeThreads.remove(threadIdx);
continue; continue;
} }
if (mergeThread.getCurrentMerge() != null) {
activeMerges.add(mergeThread); activeMerges.add(mergeThread);
}
threadIdx++; threadIdx++;
} }
// Sort the merge threads in descending order. // Sort the merge threads, largest first:
CollectionUtil.timSort(activeMerges, compareByMergeDocCount); CollectionUtil.timSort(activeMerges);
int pri = mergeThreadPriority;
final int activeMergeCount = activeMerges.size(); final int activeMergeCount = activeMerges.size();
for (threadIdx=0;threadIdx<activeMergeCount;threadIdx++) {
final MergeThread mergeThread = activeMerges.get(threadIdx); int bigMergeCount = 0;
final MergePolicy.OneMerge merge = mergeThread.getCurrentMerge();
if (merge == null) { for (threadIdx=activeMergeCount-1;threadIdx>=0;threadIdx--) {
continue; MergeThread mergeThread = activeMerges.get(threadIdx);
if (mergeThread.merge.estimatedMergeBytes > MIN_BIG_MERGE_MB*1024*1024) {
bigMergeCount = 1+threadIdx;
break;
} }
}
long now = System.nanoTime();
StringBuilder message;
if (verbose()) {
message = new StringBuilder();
message.append(String.format(Locale.ROOT, "updateMergeThreads ioThrottle=%s targetMBPerSec=%.1f MB/sec", doAutoIOThrottle, targetMBPerSec));
} else {
message = null;
}
for (threadIdx=0;threadIdx<activeMergeCount;threadIdx++) {
MergeThread mergeThread = activeMerges.get(threadIdx);
OneMerge merge = mergeThread.merge;
// pause the thread if maxThreadCount is smaller than the number of merge threads. // pause the thread if maxThreadCount is smaller than the number of merge threads.
final boolean doPause = threadIdx < activeMergeCount - maxThreadCount; final boolean doPause = threadIdx < bigMergeCount - maxThreadCount;
if (verbose()) { double newMBPerSec;
if (doPause != merge.getPause()) {
if (doPause) { if (doPause) {
message("pause thread " + mergeThread.getName()); newMBPerSec = 0.0;
} else if (merge.maxNumSegments != -1) {
newMBPerSec = forceMergeMBPerSec;
} else if (doAutoIOThrottle == false) {
newMBPerSec = Double.POSITIVE_INFINITY;
} else if (merge.estimatedMergeBytes < MIN_BIG_MERGE_MB*1024*1024) {
// Don't rate limit small merges:
newMBPerSec = Double.POSITIVE_INFINITY;
} else { } else {
message("unpause thread " + mergeThread.getName()); newMBPerSec = targetMBPerSec;
}
}
}
if (doPause != merge.getPause()) {
merge.setPause(doPause);
} }
if (!doPause) { double curMBPerSec = merge.rateLimiter.getMBPerSec();
if (verbose()) { if (verbose()) {
message("set priority of merge thread " + mergeThread.getName() + " to " + pri); long mergeStartNS = merge.mergeStartNS;
if (mergeStartNS == -1) {
// IndexWriter didn't start the merge yet:
mergeStartNS = now;
} }
mergeThread.setThreadPriority(pri); message.append('\n');
pri = Math.min(Thread.MAX_PRIORITY, 1+pri); message.append(String.format(Locale.ROOT, "merge thread %s estSize=%.1f MB (written=%.1f MB) runTime=%.1fs (stopped=%.1fs, paused=%.1fs) rate=%s\n",
mergeThread.getName(),
bytesToMB(merge.estimatedMergeBytes),
bytesToMB(merge.rateLimiter.totalBytesWritten),
nsToSec(now - mergeStartNS),
nsToSec(merge.rateLimiter.getTotalStoppedNS()),
nsToSec(merge.rateLimiter.getTotalPausedNS()),
rateToString(merge.rateLimiter.getMBPerSec())));
if (newMBPerSec != curMBPerSec) {
if (newMBPerSec == 0.0) {
message.append(" now stop");
} else if (curMBPerSec == 0.0) {
if (newMBPerSec == Double.POSITIVE_INFINITY) {
message.append(" now resume");
} else {
message.append(String.format(Locale.ROOT, " now resume to %.1f MB/sec", newMBPerSec));
} }
} else {
message.append(String.format(Locale.ROOT, " now change from %.1f MB/sec to %.1f MB/sec", curMBPerSec, newMBPerSec));
}
} else if (curMBPerSec == 0.0) {
message.append(" leave stopped");
} else {
message.append(String.format(Locale.ROOT, " leave running at %.1f MB/sec", curMBPerSec));
} }
} }
/** merge.rateLimiter.setMBPerSec(newMBPerSec);
* Returns true if verbosing is enabled. This method is usually used in
* conjunction with {@link #message(String)}, like that:
*
* <pre class="prettyprint">
* if (verbose()) {
* message(&quot;your message&quot;);
* }
* </pre>
*/
protected boolean verbose() {
return writer != null && writer.infoStream.isEnabled("CMS");
} }
if (verbose()) {
/** message(message.toString());
* Outputs the given message - this method assumes {@link #verbose()} was
* called and returned true.
*/
protected void message(String message) {
writer.infoStream.message("CMS", message);
}
private synchronized void initMergeThreadPriority() {
if (mergeThreadPriority == -1) {
// Default to slightly higher priority than our
// calling thread
mergeThreadPriority = 1+Thread.currentThread().getPriority();
if (mergeThreadPriority > Thread.MAX_PRIORITY)
mergeThreadPriority = Thread.MAX_PRIORITY;
} }
} }
private synchronized void initMaxMergesAndThreads() throws IOException { private synchronized void initDynamicDefaults(IndexWriter writer) throws IOException {
if (maxThreadCount == AUTO_DETECT_MERGES_AND_THREADS) { if (maxThreadCount == AUTO_DETECT_MERGES_AND_THREADS) {
assert writer != null;
boolean spins = IOUtils.spins(writer.getDirectory()); boolean spins = IOUtils.spins(writer.getDirectory());
setDefaultMaxMergesAndThreads(spins); setDefaultMaxMergesAndThreads(spins);
if (verbose()) { if (verbose()) {
message("initMaxMergesAndThreads spins=" + spins + " maxThreadCount=" + maxThreadCount + " maxMergeCount=" + maxMergeCount); message("initDynamicDefaults spins=" + spins + " maxThreadCount=" + maxThreadCount + " maxMergeCount=" + maxMergeCount);
} }
} }
} }
private static String rateToString(double mbPerSec) {
if (mbPerSec == 0.0) {
return "stopped";
} else if (mbPerSec == Double.POSITIVE_INFINITY) {
return "unlimited";
} else {
return String.format(Locale.ROOT, "%.1f MB/sec", mbPerSec);
}
}
@Override @Override
public void close() { public void close() {
sync(); sync();
@ -346,14 +394,11 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
*/ */
protected synchronized int mergeThreadCount() { protected synchronized int mergeThreadCount() {
int count = 0; int count = 0;
for (MergeThread mt : mergeThreads) { for (MergeThread mergeThread : mergeThreads) {
if (mt.isAlive()) { if (mergeThread.isAlive() && mergeThread.merge.rateLimiter.getAbort() == false) {
MergePolicy.OneMerge merge = mt.getCurrentMerge();
if (merge != null && merge.isAborted() == false) {
count++; count++;
} }
} }
}
return count; return count;
} }
@ -362,12 +407,7 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
assert !Thread.holdsLock(writer); assert !Thread.holdsLock(writer);
this.writer = writer; initDynamicDefaults(writer);
initMergeThreadPriority();
initMaxMergesAndThreads();
dir = writer.getDirectory();
// First, quickly run through the newly proposed merges // First, quickly run through the newly proposed merges
// and add any orthogonal merges (ie a merge not // and add any orthogonal merges (ie a merge not
@ -385,9 +425,9 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
// pending merges, until it's empty: // pending merges, until it's empty:
while (true) { while (true) {
maybeStall(); maybeStall(writer);
MergePolicy.OneMerge merge = writer.getNextMerge(); OneMerge merge = writer.getNextMerge();
if (merge == null) { if (merge == null) {
if (verbose()) { if (verbose()) {
message(" no more merges pending; now return"); message(" no more merges pending; now return");
@ -395,6 +435,8 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
return; return;
} }
updateIOThrottle(merge);
boolean success = false; boolean success = false;
try { try {
if (verbose()) { if (verbose()) {
@ -405,17 +447,13 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
// merge: // merge:
final MergeThread merger = getMergeThread(writer, merge); final MergeThread merger = getMergeThread(writer, merge);
mergeThreads.add(merger); mergeThreads.add(merger);
if (verbose()) { if (verbose()) {
message(" launch new thread [" + merger.getName() + "]"); message(" launch new thread [" + merger.getName() + "]");
} }
merger.start(); merger.start();
// Must call this after starting the thread else
// the new thread is removed from mergeThreads
// (since it's not alive yet):
updateMergeThreads();
success = true; success = true;
} finally { } finally {
if (!success) { if (!success) {
@ -433,7 +471,7 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
* as limiting how many threads are allowed to index, can do nothing * as limiting how many threads are allowed to index, can do nothing
* here and throttle elsewhere. */ * here and throttle elsewhere. */
protected synchronized void maybeStall() { protected synchronized void maybeStall(IndexWriter writer) {
long startStallTime = 0; long startStallTime = 0;
while (writer.hasPendingMerges() && mergeThreadCount() >= maxMergeCount) { while (writer.hasPendingMerges() && mergeThreadCount() >= maxMergeCount) {
// This means merging has fallen too far behind: we // This means merging has fallen too far behind: we
@ -465,127 +503,78 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
} }
/** Does the actual merge, by calling {@link IndexWriter#merge} */ /** Does the actual merge, by calling {@link IndexWriter#merge} */
protected void doMerge(MergePolicy.OneMerge merge) throws IOException { protected void doMerge(IndexWriter writer, OneMerge merge) throws IOException {
writer.merge(merge); writer.merge(merge);
} }
/** Create and return a new MergeThread */ /** Create and return a new MergeThread */
protected synchronized MergeThread getMergeThread(IndexWriter writer, MergePolicy.OneMerge merge) throws IOException { protected synchronized MergeThread getMergeThread(IndexWriter writer, OneMerge merge) throws IOException {
final MergeThread thread = new MergeThread(writer, merge); final MergeThread thread = new MergeThread(writer, merge);
thread.setThreadPriority(mergeThreadPriority);
thread.setDaemon(true); thread.setDaemon(true);
thread.setName("Lucene Merge Thread #" + mergeThreadCount++); thread.setName("Lucene Merge Thread #" + mergeThreadCount++);
return thread; return thread;
} }
/** Runs a merge thread, which may run one or more merges /** Runs a merge thread to execute a single merge, then exits. */
* in sequence. */ protected class MergeThread extends Thread implements Comparable<MergeThread> {
protected class MergeThread extends Thread {
IndexWriter tWriter; final IndexWriter writer;
MergePolicy.OneMerge startMerge; final OneMerge merge;
MergePolicy.OneMerge runningMerge;
private volatile boolean done;
/** Sole constructor. */ /** Sole constructor. */
public MergeThread(IndexWriter writer, MergePolicy.OneMerge startMerge) { public MergeThread(IndexWriter writer, OneMerge merge) {
this.tWriter = writer; this.writer = writer;
this.startMerge = startMerge; this.merge = merge;
} }
/** Record the currently running merge. */ @Override
public synchronized void setRunningMerge(MergePolicy.OneMerge merge) { public int compareTo(MergeThread other) {
runningMerge = merge; // Larger merges sort first:
} return Long.compare(other.merge.estimatedMergeBytes, merge.estimatedMergeBytes);
/** Return the currently running merge. */
public synchronized MergePolicy.OneMerge getRunningMerge() {
return runningMerge;
}
/** Return the current merge, or null if this {@code
* MergeThread} is done. */
public synchronized MergePolicy.OneMerge getCurrentMerge() {
if (done) {
return null;
} else if (runningMerge != null) {
return runningMerge;
} else {
return startMerge;
}
}
/** Set the priority of this thread. */
public void setThreadPriority(int pri) {
try {
setPriority(pri);
} catch (NullPointerException npe) {
// Strangely, Sun's JDK 1.5 on Linux sometimes
// throws NPE out of here...
} catch (SecurityException se) {
// Ignore this because we will still run fine with
// normal thread priority
}
} }
@Override @Override
public void run() { public void run() {
// First time through the while loop we do the merge
// that we were started with:
MergePolicy.OneMerge merge = this.startMerge;
try { try {
if (verbose()) { if (verbose()) {
message(" merge thread: start"); message(" merge thread: start");
} }
while(true) { doMerge(writer, merge);
setRunningMerge(merge);
doMerge(merge);
// Subsequent times through the loop we do any new
// merge that writer says is necessary:
merge = tWriter.getNextMerge();
// Notify here in case any threads were stalled;
// they will notice that the pending merge has
// been pulled and possibly resume:
synchronized(ConcurrentMergeScheduler.this) {
ConcurrentMergeScheduler.this.notifyAll();
}
if (merge != null) {
updateMergeThreads();
if (verbose()) {
message(" merge thread: do another merge " + tWriter.segString(merge.segments));
}
} else {
break;
}
}
if (verbose()) { if (verbose()) {
message(" merge thread: done"); message(" merge thread: done");
} }
removeMergeThread(this);
// Let CMS run new merges if necessary:
try {
merge(writer, MergeTrigger.MERGE_FINISHED, true);
} catch (AlreadyClosedException ace) {
// OK
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
} catch (Throwable exc) { } catch (Throwable exc) {
// Ignore the exception if it was due to abort: if (exc instanceof MergePolicy.MergeAbortedException) {
if (!(exc instanceof MergePolicy.MergeAbortedException)) { // OK to ignore
//System.out.println(Thread.currentThread().getName() + ": CMS: exc"); } else if (suppressExceptions == false) {
//exc.printStackTrace(System.out);
if (!suppressExceptions) {
// suppressExceptions is normally only set during // suppressExceptions is normally only set during
// testing. // testing.
handleMergeException(exc); handleMergeException(writer.getDirectory(), exc);
}
} }
} finally { } finally {
done = true;
synchronized(ConcurrentMergeScheduler.this) { synchronized(ConcurrentMergeScheduler.this) {
updateMergeThreads(); updateMergeThreads();
// In case we had stalled indexing, we can now wake up
// and possibly unstall:
ConcurrentMergeScheduler.this.notifyAll(); ConcurrentMergeScheduler.this.notifyAll();
} }
} }
@ -594,7 +583,7 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
/** Called when an exception is hit in a background merge /** Called when an exception is hit in a background merge
* thread */ * thread */
protected void handleMergeException(Throwable exc) { protected void handleMergeException(Directory dir, Throwable exc) {
try { try {
// When an exception is hit during merge, IndexWriter // When an exception is hit during merge, IndexWriter
// removes any partial files and then allows another // removes any partial files and then allows another
@ -606,6 +595,7 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
throw new ThreadInterruptedException(ie); throw new ThreadInterruptedException(ie);
} }
throw new MergePolicy.MergeException(exc, dir); throw new MergePolicy.MergeException(exc, dir);
} }
@ -626,7 +616,115 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
StringBuilder sb = new StringBuilder(getClass().getSimpleName() + ": "); StringBuilder sb = new StringBuilder(getClass().getSimpleName() + ": ");
sb.append("maxThreadCount=").append(maxThreadCount).append(", "); sb.append("maxThreadCount=").append(maxThreadCount).append(", ");
sb.append("maxMergeCount=").append(maxMergeCount).append(", "); sb.append("maxMergeCount=").append(maxMergeCount).append(", ");
sb.append("mergeThreadPriority=").append(mergeThreadPriority); sb.append("ioThrottle=").append(doAutoIOThrottle);
return sb.toString(); return sb.toString();
} }
private boolean isBacklog(long now, OneMerge merge) {
double mergeMB = bytesToMB(merge.estimatedMergeBytes);
for (MergeThread mergeThread : mergeThreads) {
long mergeStartNS = mergeThread.merge.mergeStartNS;
if (mergeThread.isAlive() && mergeThread.merge != merge &&
mergeStartNS != -1 &&
mergeThread.merge.estimatedMergeBytes >= MIN_BIG_MERGE_MB*1024*1024 &&
nsToSec(now-mergeStartNS) > 3.0) {
double otherMergeMB = bytesToMB(mergeThread.merge.estimatedMergeBytes);
double ratio = otherMergeMB / mergeMB;
if (ratio > 0.3 && ratio < 3.0) {
return true;
}
}
}
return false;
}
/** Tunes IO throttle when a new merge starts. */
private synchronized void updateIOThrottle(OneMerge merge) throws IOException {
if (doAutoIOThrottle == false) {
return;
}
double mergeMB = bytesToMB(merge.estimatedMergeBytes);
if (mergeMB < MIN_BIG_MERGE_MB) {
// Only watch non-trivial merges for throttling; this is safe because the MP must eventually
// have to do larger merges:
return;
}
long now = System.nanoTime();
// Simplistic closed-loop feedback control: if we find any other similarly
// sized merges running, then we are falling behind, so we bump up the
// IO throttle, else we lower it:
boolean newBacklog = isBacklog(now, merge);
boolean curBacklog = false;
if (newBacklog == false) {
if (mergeThreads.size() > maxThreadCount) {
// If there are already more than the maximum merge threads allowed, count that as backlog:
curBacklog = true;
} else {
// Now see if any still-running merges are backlog'd:
for (MergeThread mergeThread : mergeThreads) {
if (isBacklog(now, mergeThread.merge)) {
curBacklog = true;
break;
}
}
}
}
double curMBPerSec = targetMBPerSec;
if (newBacklog) {
// This new merge adds to the backlog: increase IO throttle by 20%
targetMBPerSec *= 1.20;
if (targetMBPerSec > 10000) {
targetMBPerSec = 10000;
}
if (verbose()) {
if (curMBPerSec == targetMBPerSec) {
message(String.format(Locale.ROOT, "io throttle: new merge backlog; leave IO rate at ceiling %.1f MB/sec", targetMBPerSec));
} else {
message(String.format(Locale.ROOT, "io throttle: new merge backlog; increase IO rate to %.1f MB/sec", targetMBPerSec));
}
}
} else if (curBacklog) {
// We still have an existing backlog; leave the rate as is:
if (verbose()) {
message(String.format(Locale.ROOT, "io throttle: current merge backlog; leave IO rate at %.1f MB/sec",
targetMBPerSec));
}
} else {
// We are not falling behind: decrease IO throttle by 10%
targetMBPerSec /= 1.10;
if (targetMBPerSec < MIN_MERGE_MB_PER_SEC) {
targetMBPerSec = MIN_MERGE_MB_PER_SEC;
}
if (verbose()) {
if (curMBPerSec == targetMBPerSec) {
message(String.format(Locale.ROOT, "io throttle: no merge backlog; leave IO rate at floor %.1f MB/sec", targetMBPerSec));
} else {
message(String.format(Locale.ROOT, "io throttle: no merge backlog; decrease IO rate to %.1f MB/sec", targetMBPerSec));
}
}
}
targetMBPerSecChanged();
updateMergeThreads();
}
/** Subclass can override to tweak targetMBPerSec. */
protected void targetMBPerSecChanged() {
}
private static double nsToSec(long ns) {
return ns / 1000000000.0;
}
private static double bytesToMB(long bytes) {
return bytes/1024./1024.;
}
} }

View File

@ -20,9 +20,8 @@ package org.apache.lucene.index;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.util.Collection; import java.util.Collection;
import java.util.HashSet; import java.util.Locale;
import java.util.Queue; import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -35,7 +34,6 @@ import org.apache.lucene.search.Query;
import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory; import org.apache.lucene.store.Directory;
import org.apache.lucene.util.Accountable; import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.InfoStream; import org.apache.lucene.util.InfoStream;
/** /**
@ -553,11 +551,13 @@ final class DocumentsWriter implements Closeable, Accountable {
final double ramBufferSizeMB = config.getRAMBufferSizeMB(); final double ramBufferSizeMB = config.getRAMBufferSizeMB();
if (ramBufferSizeMB != IndexWriterConfig.DISABLE_AUTO_FLUSH && if (ramBufferSizeMB != IndexWriterConfig.DISABLE_AUTO_FLUSH &&
flushControl.getDeleteBytesUsed() > (1024*1024*ramBufferSizeMB/2)) { flushControl.getDeleteBytesUsed() > (1024*1024*ramBufferSizeMB/2)) {
if (infoStream.isEnabled("DW")) {
infoStream.message("DW", "force apply deletes bytesUsed=" + flushControl.getDeleteBytesUsed() + " vs ramBuffer=" + (1024*1024*ramBufferSizeMB));
}
hasEvents = true; hasEvents = true;
if (!this.applyAllDeletes(deleteQueue)) { if (!this.applyAllDeletes(deleteQueue)) {
if (infoStream.isEnabled("DW")) {
infoStream.message("DW", String.format(Locale.ROOT, "force apply deletes bytesUsed=%.1f MB vs ramBuffer=%.1f MB",
flushControl.getDeleteBytesUsed()/(1024.*1024.),
ramBufferSizeMB));
}
putEvent(ApplyDeletesEvent.INSTANCE); putEvent(ApplyDeletesEvent.INSTANCE);
} }
} }

View File

@ -485,7 +485,7 @@ class DocumentsWriterPerThread {
try { try {
if (indexWriterConfig.getUseCompoundFile()) { if (indexWriterConfig.getUseCompoundFile()) {
filesToDelete.addAll(IndexWriter.createCompoundFile(infoStream, directory, MergeState.CheckAbort.NONE, newSegment.info, context)); filesToDelete.addAll(IndexWriter.createCompoundFile(infoStream, directory, newSegment.info, context));
newSegment.info.setUseCompoundFile(true); newSegment.info.setUseCompoundFile(true);
} }

View File

@ -47,18 +47,22 @@ import org.apache.lucene.index.DocValuesUpdate.BinaryDocValuesUpdate;
import org.apache.lucene.index.DocValuesUpdate.NumericDocValuesUpdate; import org.apache.lucene.index.DocValuesUpdate.NumericDocValuesUpdate;
import org.apache.lucene.index.FieldInfos.FieldNumbers; import org.apache.lucene.index.FieldInfos.FieldNumbers;
import org.apache.lucene.index.IndexWriterConfig.OpenMode; import org.apache.lucene.index.IndexWriterConfig.OpenMode;
import org.apache.lucene.index.MergeState.CheckAbort;
import org.apache.lucene.search.Query; import org.apache.lucene.search.Query;
import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory; import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.FlushInfo;
import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.Lock; import org.apache.lucene.store.Lock;
import org.apache.lucene.store.LockObtainFailedException; import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.store.MergeInfo; import org.apache.lucene.store.MergeInfo;
import org.apache.lucene.store.RateLimitedIndexOutput;
import org.apache.lucene.store.TrackingDirectoryWrapper; import org.apache.lucene.store.TrackingDirectoryWrapper;
import org.apache.lucene.util.Accountable; import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.Bits; import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.CloseableThreadLocal;
import org.apache.lucene.util.Constants; import org.apache.lucene.util.Constants;
import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.InfoStream; import org.apache.lucene.util.InfoStream;
@ -247,6 +251,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
volatile Throwable tragedy; volatile Throwable tragedy;
private final Directory directory; // where this index resides private final Directory directory; // where this index resides
private final Directory mergeDirectory; // used for merging
private final Analyzer analyzer; // how to analyze text private final Analyzer analyzer; // how to analyze text
private volatile long changeCount; // increments every time a change is completed private volatile long changeCount; // increments every time a change is completed
@ -319,6 +324,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* card to make sure they can later charge you when you check out. */ * card to make sure they can later charge you when you check out. */
final AtomicLong pendingNumDocs = new AtomicLong(); final AtomicLong pendingNumDocs = new AtomicLong();
final CloseableThreadLocal<MergeRateLimiter> rateLimiters = new CloseableThreadLocal<>();
DirectoryReader getReader() throws IOException { DirectoryReader getReader() throws IOException {
return getReader(true); return getReader(true);
} }
@ -741,10 +748,17 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
public IndexWriter(Directory d, IndexWriterConfig conf) throws IOException { public IndexWriter(Directory d, IndexWriterConfig conf) throws IOException {
conf.setIndexWriter(this); // prevent reuse by other instances conf.setIndexWriter(this); // prevent reuse by other instances
config = conf; config = conf;
directory = d; directory = d;
// Directory we use for merging, so we can abort running merges, and so
// merge schedulers can optionally rate-limit per-merge IO:
mergeDirectory = addMergeRateLimiters(d);
analyzer = config.getAnalyzer(); analyzer = config.getAnalyzer();
infoStream = config.getInfoStream(); infoStream = config.getInfoStream();
mergeScheduler = config.getMergeScheduler(); mergeScheduler = config.getMergeScheduler();
mergeScheduler.setInfoStream(infoStream);
codec = config.getCodec(); codec = config.getCodec();
bufferedUpdatesStream = new BufferedUpdatesStream(infoStream); bufferedUpdatesStream = new BufferedUpdatesStream(infoStream);
@ -1696,7 +1710,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
for(int i=0;i<size;i++) { for(int i=0;i<size;i++) {
final MergePolicy.OneMerge merge = mergeExceptions.get(i); final MergePolicy.OneMerge merge = mergeExceptions.get(i);
if (merge.maxNumSegments != -1) { if (merge.maxNumSegments != -1) {
throw new IOException("background merge hit exception: " + merge.segString(directory), merge.getException()); throw new IOException("background merge hit exception: " + merge.segString(), merge.getException());
} }
} }
} }
@ -1786,7 +1800,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
} }
Throwable t = merge.getException(); Throwable t = merge.getException();
if (t != null) { if (t != null) {
throw new IOException("background merge hit exception: " + merge.segString(directory), t); throw new IOException("background merge hit exception: " + merge.segString(), t);
} }
} }
@ -1969,6 +1983,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
stopMerges = true; stopMerges = true;
} }
rateLimiters.close();
if (infoStream.isEnabled("IW")) { if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "rollback: done finish merges"); infoStream.message("IW", "rollback: done finish merges");
} }
@ -2155,7 +2171,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
if (infoStream.isEnabled("IW")) { if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "now abort pending merge " + segString(merge.segments)); infoStream.message("IW", "now abort pending merge " + segString(merge.segments));
} }
merge.abort(); merge.rateLimiter.setAbort();
mergeFinish(merge); mergeFinish(merge);
} }
pendingMerges.clear(); pendingMerges.clear();
@ -2164,7 +2180,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
if (infoStream.isEnabled("IW")) { if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "now abort running merge " + segString(merge.segments)); infoStream.message("IW", "now abort running merge " + segString(merge.segments));
} }
merge.abort(); merge.rateLimiter.setAbort();
} }
// These merges periodically check whether they have // These merges periodically check whether they have
@ -2405,7 +2421,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
infoStream.message("IW", "addIndexes: process segment origName=" + info.info.name + " newName=" + newSegName + " info=" + info); infoStream.message("IW", "addIndexes: process segment origName=" + info.info.name + " newName=" + newSegName + " info=" + info);
} }
IOContext context = new IOContext(new MergeInfo(info.info.getDocCount(), info.sizeInBytes(), true, -1)); IOContext context = new IOContext(new FlushInfo(info.info.getDocCount(), info.sizeInBytes()));
FieldInfos fis = readFieldInfos(info); FieldInfos fis = readFieldInfos(info);
for(FieldInfo fi : fis) { for(FieldInfo fi : fis) {
@ -2516,19 +2532,21 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
// exceed the limit: // exceed the limit:
reserveDocs(numDocs); reserveDocs(numDocs);
final IOContext context = new IOContext(new MergeInfo(numDocs, -1, true, -1)); final IOContext context = new IOContext(new MergeInfo(numDocs, -1, false, -1));
// TODO: somehow we should fix this merge so it's // TODO: somehow we should fix this merge so it's
// abortable so that IW.close(false) is able to stop it // abortable so that IW.close(false) is able to stop it
TrackingDirectoryWrapper trackingDir = new TrackingDirectoryWrapper(directory); TrackingDirectoryWrapper trackingDir = new TrackingDirectoryWrapper(mergeDirectory);
SegmentInfo info = new SegmentInfo(directory, Version.LATEST, mergedName, -1, SegmentInfo info = new SegmentInfo(directory, Version.LATEST, mergedName, -1,
false, codec, null, StringHelper.randomId(), new HashMap<>()); false, codec, null, StringHelper.randomId(), new HashMap<>());
SegmentMerger merger = new SegmentMerger(mergeReaders, info, infoStream, trackingDir, SegmentMerger merger = new SegmentMerger(mergeReaders, info, infoStream, trackingDir,
MergeState.CheckAbort.NONE, globalFieldNumberMap, globalFieldNumberMap,
context); context);
rateLimiters.set(new MergeRateLimiter(null));
if (!merger.shouldMerge()) { if (!merger.shouldMerge()) {
return; return;
} }
@ -2567,7 +2585,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
if (useCompoundFile) { if (useCompoundFile) {
Collection<String> filesToDelete = infoPerCommit.files(); Collection<String> filesToDelete = infoPerCommit.files();
try { try {
createCompoundFile(infoStream, directory, MergeState.CheckAbort.NONE, info, context); createCompoundFile(infoStream, mergeDirectory, info, context);
} finally { } finally {
// delete new non cfs files directly: they were never // delete new non cfs files directly: they were never
// registered with IFD // registered with IFD
@ -3040,6 +3058,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
final synchronized void applyAllDeletesAndUpdates() throws IOException { final synchronized void applyAllDeletesAndUpdates() throws IOException {
flushDeletesCount.incrementAndGet(); flushDeletesCount.incrementAndGet();
final BufferedUpdatesStream.ApplyDeletesResult result; final BufferedUpdatesStream.ApplyDeletesResult result;
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "now apply all deletes for all segments maxDoc=" + (docWriter.getNumDocs() + segmentInfos.totalDocCount()));
}
result = bufferedUpdatesStream.applyDeletesAndUpdates(readerPool, segmentInfos.asList()); result = bufferedUpdatesStream.applyDeletesAndUpdates(readerPool, segmentInfos.asList());
if (result.anyDeletes) { if (result.anyDeletes) {
checkpoint(); checkpoint();
@ -3354,7 +3375,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
// deleter.refresh() call that will remove any index // deleter.refresh() call that will remove any index
// file that current segments does not reference), we // file that current segments does not reference), we
// abort this merge // abort this merge
if (merge.isAborted()) { if (merge.rateLimiter.getAbort()) {
if (infoStream.isEnabled("IW")) { if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "commitMerge: skip: it was aborted"); infoStream.message("IW", "commitMerge: skip: it was aborted");
} }
@ -3513,6 +3534,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
boolean success = false; boolean success = false;
rateLimiters.set(merge.rateLimiter);
final long t0 = System.currentTimeMillis(); final long t0 = System.currentTimeMillis();
final MergePolicy mergePolicy = config.getMergePolicy(); final MergePolicy mergePolicy = config.getMergePolicy();
@ -3550,7 +3573,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
// This merge (and, generally, any change to the // This merge (and, generally, any change to the
// segments) may now enable new merges, so we call // segments) may now enable new merges, so we call
// merge policy & update pending merges. // merge policy & update pending merges.
if (success && !merge.isAborted() && (merge.maxNumSegments != -1 || (!closed && !closing))) { if (success && merge.rateLimiter.getAbort() == false && (merge.maxNumSegments != -1 || (!closed && !closing))) {
updatePendingMerges(mergePolicy, MergeTrigger.MERGE_FINISHED, merge.maxNumSegments); updatePendingMerges(mergePolicy, MergeTrigger.MERGE_FINISHED, merge.maxNumSegments);
} }
} }
@ -3558,7 +3581,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
} catch (OutOfMemoryError oom) { } catch (OutOfMemoryError oom) {
tragicEvent(oom, "merge"); tragicEvent(oom, "merge");
} }
if (merge.info != null && !merge.isAborted()) { if (merge.info != null && merge.rateLimiter.getAbort() == false) {
if (infoStream.isEnabled("IW")) { if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "merge time " + (System.currentTimeMillis()-t0) + " msec for " + merge.info.info.getDocCount() + " docs"); infoStream.message("IW", "merge time " + (System.currentTimeMillis()-t0) + " msec for " + merge.info.info.getDocCount() + " docs");
} }
@ -3583,7 +3606,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
assert merge.segments.size() > 0; assert merge.segments.size() > 0;
if (stopMerges) { if (stopMerges) {
merge.abort(); merge.rateLimiter.setAbort();
throw new MergePolicy.MergeAbortedException("merge is aborted: " + segString(merge.segments)); throw new MergePolicy.MergeAbortedException("merge is aborted: " + segString(merge.segments));
} }
@ -3694,7 +3717,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
return; return;
} }
if (merge.isAborted()) { if (merge.rateLimiter.getAbort()) {
return; return;
} }
@ -3703,6 +3726,10 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
// and then open them again for merging. Maybe we // and then open them again for merging. Maybe we
// could pre-pool them somehow in that case... // could pre-pool them somehow in that case...
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "now apply deletes for " + merge.segments.size() + " merging segments");
}
// Lock order: IW -> BD // Lock order: IW -> BD
final BufferedUpdatesStream.ApplyDeletesResult result = bufferedUpdatesStream.applyDeletesAndUpdates(readerPool, merge.segments); final BufferedUpdatesStream.ApplyDeletesResult result = bufferedUpdatesStream.applyDeletesAndUpdates(readerPool, merge.segments);
@ -3839,14 +3866,13 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* instance */ * instance */
private int mergeMiddle(MergePolicy.OneMerge merge, MergePolicy mergePolicy) throws IOException { private int mergeMiddle(MergePolicy.OneMerge merge, MergePolicy mergePolicy) throws IOException {
merge.checkAborted(directory); merge.rateLimiter.checkAbort();
List<SegmentCommitInfo> sourceSegments = merge.segments; List<SegmentCommitInfo> sourceSegments = merge.segments;
IOContext context = new IOContext(merge.getMergeInfo()); IOContext context = new IOContext(merge.getMergeInfo());
final MergeState.CheckAbort checkAbort = new MergeState.CheckAbort(merge, directory); final TrackingDirectoryWrapper dirWrapper = new TrackingDirectoryWrapper(mergeDirectory);
final TrackingDirectoryWrapper dirWrapper = new TrackingDirectoryWrapper(directory);
if (infoStream.isEnabled("IW")) { if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "merging " + segString(merge.segments)); infoStream.message("IW", "merging " + segString(merge.segments));
@ -3926,15 +3952,12 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
// OneMerge to return a view over the actual segments to merge // OneMerge to return a view over the actual segments to merge
final SegmentMerger merger = new SegmentMerger(merge.getMergeReaders(), final SegmentMerger merger = new SegmentMerger(merge.getMergeReaders(),
merge.info.info, infoStream, dirWrapper, merge.info.info, infoStream, dirWrapper,
checkAbort, globalFieldNumberMap, globalFieldNumberMap,
context); context);
merge.checkAborted(directory); merge.rateLimiter.checkAbort();
long mergeStartTime = 0; merge.mergeStartNS = System.nanoTime();
if (infoStream.isEnabled("IW")) {
mergeStartTime = System.nanoTime();
}
// This is where all the work happens: // This is where all the work happens:
boolean success3 = false; boolean success3 = false;
@ -3954,13 +3977,13 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
assert mergeState.segmentInfo == merge.info.info; assert mergeState.segmentInfo == merge.info.info;
merge.info.info.setFiles(new HashSet<>(dirWrapper.getCreatedFiles())); merge.info.info.setFiles(new HashSet<>(dirWrapper.getCreatedFiles()));
// Record which codec was used to write the segment
if (infoStream.isEnabled("IW")) { if (infoStream.isEnabled("IW")) {
if (merger.shouldMerge()) { if (merger.shouldMerge()) {
long t1 = System.nanoTime(); long t1 = System.nanoTime();
double sec = (t1-mergeStartTime)/1000000000.; double sec = (t1-merge.mergeStartNS)/1000000000.;
double segmentMB = (merge.info.sizeInBytes()/1024./1024.); double segmentMB = (merge.info.sizeInBytes()/1024./1024.);
double stoppedSec = merge.rateLimiter.getTotalStoppedNS()/1000000000.;
double throttleSec = merge.rateLimiter.getTotalPausedNS()/1000000000.;
infoStream.message("IW", "merge codec=" + codec + " docCount=" + merge.info.info.getDocCount() + "; merged segment has " + infoStream.message("IW", "merge codec=" + codec + " docCount=" + merge.info.info.getDocCount() + "; merged segment has " +
(mergeState.mergeFieldInfos.hasVectors() ? "vectors" : "no vectors") + "; " + (mergeState.mergeFieldInfos.hasVectors() ? "vectors" : "no vectors") + "; " +
(mergeState.mergeFieldInfos.hasNorms() ? "norms" : "no norms") + "; " + (mergeState.mergeFieldInfos.hasNorms() ? "norms" : "no norms") + "; " +
@ -3968,8 +3991,10 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
(mergeState.mergeFieldInfos.hasProx() ? "prox" : "no prox") + "; " + (mergeState.mergeFieldInfos.hasProx() ? "prox" : "no prox") + "; " +
(mergeState.mergeFieldInfos.hasProx() ? "freqs" : "no freqs") + "; " + (mergeState.mergeFieldInfos.hasProx() ? "freqs" : "no freqs") + "; " +
String.format(Locale.ROOT, String.format(Locale.ROOT,
"%d msec to merge segment [%.2f MB, %.2f MB/sec]", "%.1f sec (%.1f sec stopped, %.1f sec paused) to merge segment [%.2f MB, %.2f MB/sec]",
((t1-mergeStartTime)/1000000), sec,
stoppedSec,
throttleSec,
segmentMB, segmentMB,
segmentMB / sec)); segmentMB / sec));
} else { } else {
@ -4002,11 +4027,11 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
Collection<String> filesToRemove = merge.info.files(); Collection<String> filesToRemove = merge.info.files();
try { try {
filesToRemove = createCompoundFile(infoStream, directory, checkAbort, merge.info.info, context); filesToRemove = createCompoundFile(infoStream, mergeDirectory, merge.info.info, context);
success = true; success = true;
} catch (IOException ioe) { } catch (IOException ioe) {
synchronized(this) { synchronized(this) {
if (merge.isAborted()) { if (merge.rateLimiter.getAbort()) {
// This can happen if rollback or close(false) // This can happen if rollback or close(false)
// is called -- fall through to logic below to // is called -- fall through to logic below to
// remove the partially created CFS: // remove the partially created CFS:
@ -4042,7 +4067,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
// registered with IFD // registered with IFD
deleter.deleteNewFiles(filesToRemove); deleter.deleteNewFiles(filesToRemove);
if (merge.isAborted()) { if (merge.rateLimiter.getAbort()) {
if (infoStream.isEnabled("IW")) { if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "abort merge after building CFS"); infoStream.message("IW", "abort merge after building CFS");
} }
@ -4496,7 +4521,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* deletion files, this SegmentInfo must not reference such files when this * deletion files, this SegmentInfo must not reference such files when this
* method is called, because they are not allowed within a compound file. * method is called, because they are not allowed within a compound file.
*/ */
static final Collection<String> createCompoundFile(InfoStream infoStream, Directory directory, CheckAbort checkAbort, final SegmentInfo info, IOContext context) static final Collection<String> createCompoundFile(InfoStream infoStream, Directory directory, final SegmentInfo info, IOContext context)
throws IOException { throws IOException {
// TODO: use trackingdirectorywrapper instead of files() to know which files to delete when things fail: // TODO: use trackingdirectorywrapper instead of files() to know which files to delete when things fail:
@ -4510,7 +4535,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
boolean success = false; boolean success = false;
try { try {
info.getCodec().compoundFormat().write(directory, info, files, checkAbort, context); info.getCodec().compoundFormat().write(directory, info, files, context);
success = true; success = true;
} finally { } finally {
if (!success) { if (!success) {
@ -4643,4 +4668,24 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
throw new IllegalStateException("number of documents in the index cannot exceed " + actualMaxDocs); throw new IllegalStateException("number of documents in the index cannot exceed " + actualMaxDocs);
} }
} }
/** Wraps the incoming {@link Directory} so that we assign a per-thread
* {@link MergeRateLimiter} to all created {@link IndexOutput}s. */
private Directory addMergeRateLimiters(Directory in) {
return new FilterDirectory(in) {
@Override
public IndexOutput createOutput(String name, IOContext context) throws IOException {
ensureOpen();
// This Directory is only supposed to be used during merging,
// so all writes should have MERGE context, else there is a bug
// somewhere that is failing to pass down the right IOContext:
assert context.context == IOContext.Context.MERGE: "got context=" + context.context;
IndexOutput output = in.createOutput(name, context);
MergeRateLimiter rateLimiter = rateLimiters.get();
assert rateLimiter != null;
return new RateLimitedIndexOutput(rateLimiter, output);
}
};
}
} }

View File

@ -17,8 +17,8 @@ package org.apache.lucene.index;
* limitations under the License. * limitations under the License.
*/ */
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.index.MultiDocsAndPositionsEnum.EnumWithSlice; import org.apache.lucene.index.MultiDocsAndPositionsEnum.EnumWithSlice;
import org.apache.lucene.util.BytesRef;
import java.io.IOException; import java.io.IOException;
@ -97,13 +97,6 @@ final class MappingMultiDocsAndPositionsEnum extends DocsAndPositionsEnum {
int doc = current.nextDoc(); int doc = current.nextDoc();
if (doc != NO_MORE_DOCS) { if (doc != NO_MORE_DOCS) {
mergeState.checkAbortCount++;
if (mergeState.checkAbortCount > 60000) {
mergeState.checkAbort.work(mergeState.checkAbortCount/5.0);
mergeState.checkAbortCount = 0;
}
// compact deletions // compact deletions
doc = currentMap.get(doc); doc = currentMap.get(doc);
if (doc == -1) { if (doc == -1) {

View File

@ -97,13 +97,6 @@ final class MappingMultiDocsEnum extends DocsEnum {
int doc = current.nextDoc(); int doc = current.nextDoc();
if (doc != NO_MORE_DOCS) { if (doc != NO_MORE_DOCS) {
mergeState.checkAbortCount++;
if (mergeState.checkAbortCount > 60000) {
mergeState.checkAbort.work(mergeState.checkAbortCount/5.0);
mergeState.checkAbortCount = 0;
}
// compact deletions // compact deletions
doc = currentMap.get(doc); doc = currentMap.get(doc);
if (doc == -1) { if (doc == -1) {

View File

@ -17,16 +17,17 @@ package org.apache.lucene.index;
* limitations under the License. * limitations under the License.
*/ */
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.MergeInfo;
import org.apache.lucene.util.FixedBitSet;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.MergeInfo;
import org.apache.lucene.store.RateLimiter;
import org.apache.lucene.util.FixedBitSet;
/** /**
* <p>Expert: a MergePolicy determines the sequence of * <p>Expert: a MergePolicy determines the sequence of
* primitive merge operations.</p> * primitive merge operations.</p>
@ -107,11 +108,14 @@ public abstract class MergePolicy {
/** Segments to be merged. */ /** Segments to be merged. */
public final List<SegmentCommitInfo> segments; public final List<SegmentCommitInfo> segments;
/** A private {@link RateLimiter} for this merge, used to rate limit writes and abort. */
public final MergeRateLimiter rateLimiter;
volatile long mergeStartNS = -1;
/** Total number of documents in segments to be merged, not accounting for deletions. */ /** Total number of documents in segments to be merged, not accounting for deletions. */
public final int totalDocCount; public final int totalDocCount;
boolean aborted;
Throwable error; Throwable error;
boolean paused;
/** Sole constructor. /** Sole constructor.
* @param segments List of {@link SegmentCommitInfo}s * @param segments List of {@link SegmentCommitInfo}s
@ -127,6 +131,8 @@ public abstract class MergePolicy {
count += info.info.getDocCount(); count += info.info.getDocCount();
} }
totalDocCount = count; totalDocCount = count;
rateLimiter = new MergeRateLimiter(this);
} }
/** Called by {@link IndexWriter} after the merge is done and all readers have been closed. */ /** Called by {@link IndexWriter} after the merge is done and all readers have been closed. */
@ -186,68 +192,16 @@ public abstract class MergePolicy {
return error; return error;
} }
/** Mark this merge as aborted. If this is called
* before the merge is committed then the merge will
* not be committed. */
synchronized void abort() {
aborted = true;
notifyAll();
}
/** Returns true if this merge was aborted. */
synchronized boolean isAborted() {
return aborted;
}
/** Called periodically by {@link IndexWriter} while
* merging to see if the merge is aborted. */
public synchronized void checkAborted(Directory dir) throws MergeAbortedException {
if (aborted) {
throw new MergeAbortedException("merge is aborted: " + segString(dir));
}
while (paused) {
try {
// In theory we could wait() indefinitely, but we
// do 250 msec, defensively
wait(250);
} catch (InterruptedException ie) {
throw new RuntimeException(ie);
}
if (aborted) {
throw new MergeAbortedException("merge is aborted: " + segString(dir));
}
}
}
/** Set or clear whether this merge is paused paused (for example
* {@link ConcurrentMergeScheduler} will pause merges
* if too many are running). */
synchronized public void setPause(boolean paused) {
this.paused = paused;
if (!paused) {
// Wakeup merge thread, if it's waiting
notifyAll();
}
}
/** Returns true if this merge is paused.
*
* @see #setPause(boolean) */
synchronized public boolean getPause() {
return paused;
}
/** Returns a readable description of the current merge /** Returns a readable description of the current merge
* state. */ * state. */
public String segString(Directory dir) { public String segString() {
StringBuilder b = new StringBuilder(); StringBuilder b = new StringBuilder();
final int numSegments = segments.size(); final int numSegments = segments.size();
for(int i=0;i<numSegments;i++) { for(int i=0;i<numSegments;i++) {
if (i > 0) { if (i > 0) {
b.append(' '); b.append(' ');
} }
b.append(segments.get(i).toString(dir, 0)); b.append(segments.get(i).toString());
} }
if (info != null) { if (info != null) {
b.append(" into ").append(info.info.name); b.append(" into ").append(info.info.name);
@ -255,7 +209,7 @@ public abstract class MergePolicy {
if (maxNumSegments != -1) { if (maxNumSegments != -1) {
b.append(" [maxNumSegments=" + maxNumSegments + "]"); b.append(" [maxNumSegments=" + maxNumSegments + "]");
} }
if (aborted) { if (rateLimiter.getAbort()) {
b.append(" [ABORTED]"); b.append(" [ABORTED]");
} }
return b.toString(); return b.toString();
@ -321,7 +275,7 @@ public abstract class MergePolicy {
b.append("MergeSpec:\n"); b.append("MergeSpec:\n");
final int count = merges.size(); final int count = merges.size();
for(int i=0;i<count;i++) { for(int i=0;i<count;i++) {
b.append(" ").append(1 + i).append(": ").append(merges.get(i).segString(dir)); b.append(" ").append(1 + i).append(": ").append(merges.get(i).segString());
} }
return b.toString(); return b.toString();
} }
@ -538,5 +492,4 @@ public abstract class MergePolicy {
v *= 1024 * 1024; v *= 1024 * 1024;
this.maxCFSSegmentSize = v > Long.MAX_VALUE ? Long.MAX_VALUE : (long) v; this.maxCFSSegmentSize = v > Long.MAX_VALUE ? Long.MAX_VALUE : (long) v;
} }
} }

View File

@ -0,0 +1,194 @@
package org.apache.lucene.index;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.lucene.store.RateLimiter;
import org.apache.lucene.util.ThreadInterruptedException;
import static org.apache.lucene.store.RateLimiter.SimpleRateLimiter;
/** This is the {@link RateLimiter} that {@link IndexWriter} assigns to each running merge, to
* give {@link MergeScheduler}s ionice like control.
*
* This is similar to {@link SimpleRateLimiter}, except it's merge-private,
* it will wake up if its rate changes while it's paused, it tracks how
* much time it spent stopped and paused, and it supports aborting.
*
* @lucene.internal */
public class MergeRateLimiter extends RateLimiter {
private final static int MIN_PAUSE_CHECK_MSEC = 25;
volatile long totalBytesWritten;
// By default no IO limit:
double mbPerSec = Double.POSITIVE_INFINITY;
private long lastNS;
private long minPauseCheckBytes;
private boolean abort;
long totalPausedNS;
long totalStoppedNS;
final MergePolicy.OneMerge merge;
/** Returned by {@link #maybePause}. */
private static enum PauseResult {NO, STOPPED, PAUSED};
/** Sole constructor. */
public MergeRateLimiter(MergePolicy.OneMerge merge) {
this.merge = merge;
}
@Override
public synchronized void setMBPerSec(double mbPerSec) {
// 0.0 is allowed: it means the merge is paused
if (mbPerSec < 0.0) {
throw new IllegalArgumentException("mbPerSec must be positive; got: " + mbPerSec);
}
this.mbPerSec = mbPerSec;
// NOTE: Double.POSITIVE_INFINITY casts to Long.MAX_VALUE
minPauseCheckBytes = Math.min(1024*1024, (long) ((MIN_PAUSE_CHECK_MSEC / 1000.0) * mbPerSec * 1024 * 1024));
assert minPauseCheckBytes >= 0;
notify();
}
@Override
public synchronized double getMBPerSec() {
return mbPerSec;
}
/** Returns total bytes written by this merge. */
public long getTotalBytesWritten() {
return totalBytesWritten;
}
@Override
public long pause(long bytes) throws MergePolicy.MergeAbortedException {
totalBytesWritten += bytes;
long startNS = System.nanoTime();
long curNS = startNS;
// While loop because 1) Thread.wait doesn't always sleep long
// enough, and 2) we wake up and check again when our rate limit
// is changed while we were pausing:
long pausedNS = 0;
while (true) {
PauseResult result = maybePause(bytes, curNS);
if (result == PauseResult.NO) {
// Set to curNS, not targetNS, to enforce the instant rate, not
// the "averaaged over all history" rate:
lastNS = curNS;
break;
}
curNS = System.nanoTime();
long ns = curNS - startNS;
startNS = curNS;
// Separately track when merge was stopped vs rate limited:
if (result == PauseResult.STOPPED) {
totalStoppedNS += ns;
} else {
assert result == PauseResult.PAUSED;
totalPausedNS += ns;
}
pausedNS += ns;
}
return pausedNS;
}
/** Total NS merge was stopped. */
public synchronized long getTotalStoppedNS() {
return totalStoppedNS;
}
/** Total NS merge was paused to rate limit IO. */
public synchronized long getTotalPausedNS() {
return totalPausedNS;
}
/** Returns NO if no pause happened, STOPPED if pause because rate was 0.0 (merge is stopped), PAUSED if paused with a normal rate limit. */
private synchronized PauseResult maybePause(long bytes, long curNS) throws MergePolicy.MergeAbortedException {
double secondsToPause = (bytes/1024./1024.) / mbPerSec;
// Time we should sleep until; this is purely instantaneous
// rate (just adds seconds onto the last time we had paused to);
// maybe we should also offer decayed recent history one?
long targetNS = lastNS + (long) (1000000000 * secondsToPause);
long curPauseNS = targetNS - curNS;
// NOTE: except maybe on real-time JVMs, minimum realistic
// wait/sleep time is 1 msec; if you pass just 1 nsec the impl
// rounds up to 1 msec, so we don't bother unless it's > 2 msec:
if (curPauseNS <= 2000000) {
return PauseResult.NO;
}
// Defensive: sleep for at most 250 msec; the loop above will call us again if we should keep sleeping:
if (curPauseNS > 250L*1000000) {
curPauseNS = 250L*1000000;
}
int sleepMS = (int) (curPauseNS / 1000000);
int sleepNS = (int) (curPauseNS % 1000000);
// Now is a good time to abort the merge:
checkAbort();
double rate = mbPerSec;
try {
// CMS can wake us up here if it changes our target rate:
wait(sleepMS, sleepNS);
} catch (InterruptedException ie) {
throw new ThreadInterruptedException(ie);
}
if (rate == 0.0) {
return PauseResult.STOPPED;
} else {
return PauseResult.PAUSED;
}
}
/** Throws {@link MergePolicy.MergeAbortedException} if this merge was aborted. */
public synchronized void checkAbort() throws MergePolicy.MergeAbortedException {
if (abort) {
throw new MergePolicy.MergeAbortedException("merge is aborted: " + merge.segString());
}
}
/** Mark this merge aborted. */
public synchronized void setAbort() {
abort = true;
notify();
}
/** Returns true if this merge was aborted. */
public synchronized boolean getAbort() {
return abort;
}
@Override
public long getMinPauseCheckBytes() {
return minPauseCheckBytes;
}
}

View File

@ -20,6 +20,8 @@ package org.apache.lucene.index;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import org.apache.lucene.util.InfoStream;
/** <p>Expert: {@link IndexWriter} uses an instance /** <p>Expert: {@link IndexWriter} uses an instance
* implementing this interface to execute the merges * implementing this interface to execute the merges
* selected by a {@link MergePolicy}. The default * selected by a {@link MergePolicy}. The default
@ -46,4 +48,34 @@ public abstract class MergeScheduler implements Closeable {
/** Close this MergeScheduler. */ /** Close this MergeScheduler. */
@Override @Override
public abstract void close() throws IOException; public abstract void close() throws IOException;
/** For messages about merge scheduling */
protected InfoStream infoStream;
/** IndexWriter calls this on init. */
final void setInfoStream(InfoStream infoStream) {
this.infoStream = infoStream;
}
/**
* Returns true if infoStream messages are enabled. This method is usually used in
* conjunction with {@link #message(String)}:
*
* <pre class="prettyprint">
* if (verbose()) {
* message(&quot;your message&quot;);
* }
* </pre>
*/
protected boolean verbose() {
return infoStream != null && infoStream.isEnabled("MS");
}
/**
* Outputs the given message - this method assumes {@link #verbose()} was
* called and returned true.
*/
protected void message(String message) {
infoStream.message("MS", message);
}
} }

View File

@ -26,7 +26,6 @@ import org.apache.lucene.codecs.FieldsProducer;
import org.apache.lucene.codecs.NormsProducer; import org.apache.lucene.codecs.NormsProducer;
import org.apache.lucene.codecs.StoredFieldsReader; import org.apache.lucene.codecs.StoredFieldsReader;
import org.apache.lucene.codecs.TermVectorsReader; import org.apache.lucene.codecs.TermVectorsReader;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.Bits; import org.apache.lucene.util.Bits;
import org.apache.lucene.util.InfoStream; import org.apache.lucene.util.InfoStream;
import org.apache.lucene.util.packed.PackedInts; import org.apache.lucene.util.packed.PackedInts;
@ -73,19 +72,11 @@ public class MergeState {
/** Max docs per reader */ /** Max docs per reader */
public final int[] maxDocs; public final int[] maxDocs;
/** Holds the CheckAbort instance, which is invoked
* periodically to see if the merge has been aborted. */
public final CheckAbort checkAbort;
/** InfoStream for debugging messages. */ /** InfoStream for debugging messages. */
public final InfoStream infoStream; public final InfoStream infoStream;
/** Counter used for periodic calls to checkAbort
* @lucene.internal */
public int checkAbortCount;
/** Sole constructor. */ /** Sole constructor. */
MergeState(List<LeafReader> readers, SegmentInfo segmentInfo, InfoStream infoStream, CheckAbort checkAbort) throws IOException { MergeState(List<LeafReader> readers, SegmentInfo segmentInfo, InfoStream infoStream) throws IOException {
int numReaders = readers.size(); int numReaders = readers.size();
docMaps = new DocMap[numReaders]; docMaps = new DocMap[numReaders];
@ -148,7 +139,6 @@ public class MergeState {
this.segmentInfo = segmentInfo; this.segmentInfo = segmentInfo;
this.infoStream = infoStream; this.infoStream = infoStream;
this.checkAbort = checkAbort;
setDocMaps(readers); setDocMaps(readers);
} }
@ -333,47 +323,6 @@ public class MergeState {
segmentInfo.setDocCount(docBase); segmentInfo.setDocCount(docBase);
} }
/**
* Class for recording units of work when merging segments.
*/
public static class CheckAbort {
private double workCount;
private final MergePolicy.OneMerge merge;
private final Directory dir;
/** Creates a #CheckAbort instance. */
public CheckAbort(MergePolicy.OneMerge merge, Directory dir) {
this.merge = merge;
this.dir = dir;
}
/**
* Records the fact that roughly units amount of work
* have been done since this method was last called.
* When adding time-consuming code into SegmentMerger,
* you should test different values for units to ensure
* that the time in between calls to merge.checkAborted
* is up to ~ 1 second.
*/
public void work(double units) throws MergePolicy.MergeAbortedException {
workCount += units;
if (workCount >= 10000.0) {
merge.checkAborted(dir);
workCount = 0;
}
}
/** If you use this: IW.close(false) cannot abort your merge!
* @lucene.internal */
static final MergeState.CheckAbort NONE = new MergeState.CheckAbort(null, null) {
@Override
public void work(double units) {
// do nothing
}
};
}
/** /**
* Remaps docids around deletes during merge * Remaps docids around deletes during merge
*/ */

View File

@ -46,5 +46,4 @@ public final class NoMergeScheduler extends MergeScheduler {
public MergeScheduler clone() { public MergeScheduler clone() {
return this; return this;
} }
} }

View File

@ -49,7 +49,10 @@ final class SegmentMerger {
// note, just like in codec apis Directory 'dir' is NOT the same as segmentInfo.dir!! // note, just like in codec apis Directory 'dir' is NOT the same as segmentInfo.dir!!
SegmentMerger(List<LeafReader> readers, SegmentInfo segmentInfo, InfoStream infoStream, Directory dir, SegmentMerger(List<LeafReader> readers, SegmentInfo segmentInfo, InfoStream infoStream, Directory dir,
MergeState.CheckAbort checkAbort, FieldInfos.FieldNumbers fieldNumbers, IOContext context) throws IOException { FieldInfos.FieldNumbers fieldNumbers, IOContext context) throws IOException {
if (context.context != IOContext.Context.MERGE) {
throw new IllegalArgumentException("IOContext.context should be MERGE; got: " + context.context);
}
// validate incoming readers // validate incoming readers
for (LeafReader reader : readers) { for (LeafReader reader : readers) {
if ((reader instanceof SegmentReader) == false) { if ((reader instanceof SegmentReader) == false) {
@ -59,7 +62,7 @@ final class SegmentMerger {
} }
} }
mergeState = new MergeState(readers, segmentInfo, infoStream, checkAbort); mergeState = new MergeState(readers, segmentInfo, infoStream);
directory = dir; directory = dir;
this.codec = segmentInfo.getCodec(); this.codec = segmentInfo.getCodec();
this.context = context; this.context = context;
@ -81,12 +84,6 @@ final class SegmentMerger {
if (!shouldMerge()) { if (!shouldMerge()) {
throw new IllegalStateException("Merge would result in 0 document segment"); throw new IllegalStateException("Merge would result in 0 document segment");
} }
// NOTE: it's important to add calls to
// checkAbort.work(...) if you make any changes to this
// method that will spend alot of time. The frequency
// of this check impacts how long
// IndexWriter.close(false) takes to actually stop the
// background merge threads.
mergeFieldInfos(); mergeFieldInfos();
long t0 = 0; long t0 = 0;
if (mergeState.infoStream.isEnabled("SM")) { if (mergeState.infoStream.isEnabled("SM")) {

View File

@ -568,7 +568,7 @@ public class TieredMergePolicy extends MergePolicy {
final int numToMerge = end - maxSegmentCount + 1; final int numToMerge = end - maxSegmentCount + 1;
final OneMerge merge = new OneMerge(eligible.subList(end-numToMerge, end)); final OneMerge merge = new OneMerge(eligible.subList(end-numToMerge, end));
if (verbose(writer)) { if (verbose(writer)) {
message("add final merge=" + merge.segString(writer.getDirectory()), writer); message("add final merge=" + merge.segString(), writer);
} }
spec = new MergeSpecification(); spec = new MergeSpecification();
spec.add(merge); spec.add(merge);

View File

@ -23,7 +23,7 @@ import java.util.Collection;
/** Directory implementation that delegates calls to another directory. /** Directory implementation that delegates calls to another directory.
* This class can be used to add limitations on top of an existing * This class can be used to add limitations on top of an existing
* {@link Directory} implementation such as * {@link Directory} implementation such as
* {@link RateLimitedDirectoryWrapper rate limiting} or to add additional * {@link NRTCachingDirectory} or to add additional
* sanity checks for tests. However, if you plan to write your own * sanity checks for tests. However, if you plan to write your own
* {@link Directory} implementation, you should consider extending directly * {@link Directory} implementation, you should consider extending directly
* {@link Directory} or {@link BaseDirectory} rather than try to reuse * {@link Directory} or {@link BaseDirectory} rather than try to reuse

View File

@ -1,145 +0,0 @@
package org.apache.lucene.store;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.IOException;
import org.apache.lucene.store.IOContext.Context;
/**
*
* A {@link Directory} wrapper that allows {@link IndexOutput} rate limiting using
* {@link IOContext.Context IO context} specific {@link RateLimiter rate limiters}.
*
* @see #setRateLimiter(RateLimiter, IOContext.Context)
* @lucene.experimental
*/
public final class RateLimitedDirectoryWrapper extends FilterDirectory {
// we need to be volatile here to make sure we see all the values that are set
// / modified concurrently
private volatile RateLimiter[] contextRateLimiters = new RateLimiter[IOContext.Context
.values().length];
public RateLimitedDirectoryWrapper(Directory wrapped) {
super(wrapped);
}
@Override
public IndexOutput createOutput(String name, IOContext context)
throws IOException {
ensureOpen();
final IndexOutput output = super.createOutput(name, context);
final RateLimiter limiter = getRateLimiter(context.context);
if (limiter != null) {
return new RateLimitedIndexOutput(limiter, output);
}
return output;
}
@Override
public void copyFrom(Directory from, String src, String dest, IOContext context) throws IOException {
ensureOpen();
in.copyFrom(from, src, dest, context);
}
private RateLimiter getRateLimiter(IOContext.Context context) {
assert context != null;
return contextRateLimiters[context.ordinal()];
}
/**
* Sets the maximum (approx) MB/sec allowed by all write IO performed by
* {@link IndexOutput} created with the given {@link IOContext.Context}. Pass
* <code>null</code> to have no limit.
*
* <p>
* <b>NOTE</b>: For already created {@link IndexOutput} instances there is no
* guarantee this new rate will apply to them; it will only be guaranteed to
* apply for new created {@link IndexOutput} instances.
* <p>
* <b>NOTE</b>: this is an optional operation and might not be respected by
* all Directory implementations. Currently only {@link FSDirectory buffered}
* Directory implementations use rate-limiting.
*
* @throws IllegalArgumentException
* if context is <code>null</code>
* @throws AlreadyClosedException if the {@link Directory} is already closed
* @lucene.experimental
*/
public void setMaxWriteMBPerSec(Double mbPerSec, IOContext.Context context) {
ensureOpen();
if (context == null) {
throw new IllegalArgumentException("Context must not be null");
}
final int ord = context.ordinal();
final RateLimiter limiter = contextRateLimiters[ord];
if (mbPerSec == null) {
if (limiter != null) {
limiter.setMbPerSec(Double.MAX_VALUE);
contextRateLimiters[ord] = null;
}
} else if (limiter != null) {
limiter.setMbPerSec(mbPerSec);
contextRateLimiters[ord] = limiter; // cross the mem barrier again
} else {
contextRateLimiters[ord] = new RateLimiter.SimpleRateLimiter(mbPerSec);
}
}
/**
* Sets the rate limiter to be used to limit (approx) MB/sec allowed by all IO
* performed with the given {@link IOContext.Context context}. Pass <code>null</code> to
* have no limit.
*
* <p>
* Passing an instance of rate limiter compared to setting it using
* {@link #setMaxWriteMBPerSec(Double, IOContext.Context)}
* allows to use the same limiter instance across several directories globally
* limiting IO across them.
*
* @throws IllegalArgumentException
* if context is <code>null</code>
* @throws AlreadyClosedException if the {@link Directory} is already closed
* @lucene.experimental
*/
public void setRateLimiter(RateLimiter mergeWriteRateLimiter,
Context context) {
ensureOpen();
if (context == null) {
throw new IllegalArgumentException("Context must not be null");
}
contextRateLimiters[context.ordinal()] = mergeWriteRateLimiter;
}
/**
* See {@link #setMaxWriteMBPerSec}.
*
* @throws IllegalArgumentException
* if context is <code>null</code>
* @throws AlreadyClosedException if the {@link Directory} is already closed
* @lucene.experimental
*/
public Double getMaxWriteMBPerSec(IOContext.Context context) {
ensureOpen();
if (context == null) {
throw new IllegalArgumentException("Context must not be null");
}
RateLimiter limiter = getRateLimiter(context);
return limiter == null ? null : limiter.getMbPerSec();
}
}

View File

@ -24,7 +24,8 @@ import java.io.IOException;
* *
* @lucene.internal * @lucene.internal
*/ */
final class RateLimitedIndexOutput extends IndexOutput {
public final class RateLimitedIndexOutput extends IndexOutput {
private final IndexOutput delegate; private final IndexOutput delegate;
private final RateLimiter rateLimiter; private final RateLimiter rateLimiter;
@ -36,7 +37,7 @@ final class RateLimitedIndexOutput extends IndexOutput {
* which does volatile read. */ * which does volatile read. */
private long currentMinPauseCheckBytes; private long currentMinPauseCheckBytes;
RateLimitedIndexOutput(final RateLimiter rateLimiter, final IndexOutput delegate) { public RateLimitedIndexOutput(final RateLimiter rateLimiter, final IndexOutput delegate) {
super("RateLimitedIndexOutput(" + delegate + ")"); super("RateLimitedIndexOutput(" + delegate + ")");
this.delegate = delegate; this.delegate = delegate;
this.rateLimiter = rateLimiter; this.rateLimiter = rateLimiter;
@ -72,7 +73,7 @@ final class RateLimitedIndexOutput extends IndexOutput {
delegate.writeBytes(b, offset, length); delegate.writeBytes(b, offset, length);
} }
private void checkRate() { private void checkRate() throws IOException {
if (bytesSinceLastPause > currentMinPauseCheckBytes) { if (bytesSinceLastPause > currentMinPauseCheckBytes) {
rateLimiter.pause(bytesSinceLastPause); rateLimiter.pause(bytesSinceLastPause);
bytesSinceLastPause = 0; bytesSinceLastPause = 0;

View File

@ -17,6 +17,8 @@ package org.apache.lucene.store;
* limitations under the License. * limitations under the License.
*/ */
import java.io.IOException;
import org.apache.lucene.util.ThreadInterruptedException; import org.apache.lucene.util.ThreadInterruptedException;
/** Abstract base class to rate limit IO. Typically implementations are /** Abstract base class to rate limit IO. Typically implementations are
@ -27,14 +29,14 @@ import org.apache.lucene.util.ThreadInterruptedException;
public abstract class RateLimiter { public abstract class RateLimiter {
/** /**
* Sets an updated mb per second rate limit. * Sets an updated MB per second rate limit.
*/ */
public abstract void setMbPerSec(double mbPerSec); public abstract void setMBPerSec(double mbPerSec);
/** /**
* The current mb per second rate limit. * The current MB per second rate limit.
*/ */
public abstract double getMbPerSec(); public abstract double getMBPerSec();
/** Pauses, if necessary, to keep the instantaneous IO /** Pauses, if necessary, to keep the instantaneous IO
* rate at or below the target. * rate at or below the target.
@ -43,7 +45,7 @@ public abstract class RateLimiter {
* </p> * </p>
* @return the pause time in nano seconds * @return the pause time in nano seconds
* */ * */
public abstract long pause(long bytes); public abstract long pause(long bytes) throws IOException;
/** How many bytes caller should add up itself before invoking {@link #pause}. */ /** How many bytes caller should add up itself before invoking {@link #pause}. */
public abstract long getMinPauseCheckBytes(); public abstract long getMinPauseCheckBytes();
@ -65,7 +67,7 @@ public abstract class RateLimiter {
/** mbPerSec is the MB/sec max IO rate */ /** mbPerSec is the MB/sec max IO rate */
public SimpleRateLimiter(double mbPerSec) { public SimpleRateLimiter(double mbPerSec) {
setMbPerSec(mbPerSec); setMBPerSec(mbPerSec);
lastNS = System.nanoTime(); lastNS = System.nanoTime();
} }
@ -73,7 +75,7 @@ public abstract class RateLimiter {
* Sets an updated mb per second rate limit. * Sets an updated mb per second rate limit.
*/ */
@Override @Override
public void setMbPerSec(double mbPerSec) { public void setMBPerSec(double mbPerSec) {
this.mbPerSec = mbPerSec; this.mbPerSec = mbPerSec;
minPauseCheckBytes = (long) ((MIN_PAUSE_CHECK_MSEC / 1000.0) * mbPerSec * 1024 * 1024); minPauseCheckBytes = (long) ((MIN_PAUSE_CHECK_MSEC / 1000.0) * mbPerSec * 1024 * 1024);
} }
@ -87,7 +89,7 @@ public abstract class RateLimiter {
* The current mb per second rate limit. * The current mb per second rate limit.
*/ */
@Override @Override
public double getMbPerSec() { public double getMBPerSec() {
return this.mbPerSec; return this.mbPerSec;
} }

View File

@ -19,7 +19,10 @@ package org.apache.lucene.util;
import java.io.IOException; import java.io.IOException;
import java.io.PrintStream; import java.io.PrintStream;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date; import java.util.Date;
import java.util.Locale;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
/** /**
@ -33,6 +36,8 @@ public class PrintStreamInfoStream extends InfoStream {
private static final AtomicInteger MESSAGE_ID = new AtomicInteger(); private static final AtomicInteger MESSAGE_ID = new AtomicInteger();
protected final int messageID; protected final int messageID;
private static final DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS", Locale.ROOT);
protected final PrintStream stream; protected final PrintStream stream;
public PrintStreamInfoStream(PrintStream stream) { public PrintStreamInfoStream(PrintStream stream) {
@ -46,7 +51,7 @@ public class PrintStreamInfoStream extends InfoStream {
@Override @Override
public void message(String component, String message) { public void message(String component, String message) {
stream.println(component + " " + messageID + " [" + new Date() + "; " + Thread.currentThread().getName() + "]: " + message); stream.println(component + " " + messageID + " [" + dateFormat.format(new Date()) + "; " + Thread.currentThread().getName() + "]: " + message);
} }
@Override @Override

View File

@ -253,7 +253,7 @@ public abstract class StringHelper {
x0 = Long.parseLong(prop, 16); x0 = Long.parseLong(prop, 16);
x1 = x0; x1 = x0;
} else { } else {
// "Ghetto randomess" from 3 different sources: // Randomess from 3 different sources:
x0 = System.nanoTime(); x0 = System.nanoTime();
x1 = StringHelper.class.hashCode() << 32; x1 = StringHelper.class.hashCode() << 32;
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();

View File

@ -16,6 +16,7 @@ package org.apache.lucene;
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
import org.apache.lucene.analysis.MockAnalyzer; import org.apache.lucene.analysis.MockAnalyzer;
import org.apache.lucene.document.Document; import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field; import org.apache.lucene.document.Field;
@ -56,21 +57,20 @@ public class TestMergeSchedulerExternal extends LuceneTestCase {
@Override @Override
protected MergeThread getMergeThread(IndexWriter writer, MergePolicy.OneMerge merge) throws IOException { protected MergeThread getMergeThread(IndexWriter writer, MergePolicy.OneMerge merge) throws IOException {
MergeThread thread = new MyMergeThread(writer, merge); MergeThread thread = new MyMergeThread(writer, merge);
thread.setThreadPriority(getMergeThreadPriority());
thread.setDaemon(true); thread.setDaemon(true);
thread.setName("MyMergeThread"); thread.setName("MyMergeThread");
return thread; return thread;
} }
@Override @Override
protected void handleMergeException(Throwable t) { protected void handleMergeException(Directory dir, Throwable t) {
excCalled = true; excCalled = true;
} }
@Override ;@Override
protected void doMerge(MergePolicy.OneMerge merge) throws IOException { protected void doMerge(IndexWriter writer, MergePolicy.OneMerge merge) throws IOException {
mergeCalled = true; mergeCalled = true;
super.doMerge(merge); super.doMerge(writer, merge);
} }
} }
@ -118,7 +118,7 @@ public class TestMergeSchedulerExternal extends LuceneTestCase {
OneMerge merge = null; OneMerge merge = null;
while ((merge = writer.getNextMerge()) != null) { while ((merge = writer.getNextMerge()) != null) {
if (VERBOSE) { if (VERBOSE) {
System.out.println("executing merge " + merge.segString(writer.getDirectory())); System.out.println("executing merge " + merge.segString());
} }
writer.merge(merge); writer.merge(merge);
} }

View File

@ -293,7 +293,7 @@ public class TestConcurrentMergeScheduler extends LuceneTestCase {
ConcurrentMergeScheduler cms = new ConcurrentMergeScheduler() { ConcurrentMergeScheduler cms = new ConcurrentMergeScheduler() {
@Override @Override
protected void doMerge(MergePolicy.OneMerge merge) throws IOException { protected void doMerge(IndexWriter writer, MergePolicy.OneMerge merge) throws IOException {
try { try {
// Stall all incoming merges until we see // Stall all incoming merges until we see
// maxMergeCount: // maxMergeCount:
@ -312,7 +312,7 @@ public class TestConcurrentMergeScheduler extends LuceneTestCase {
// Then sleep a bit to give a chance for the bug // Then sleep a bit to give a chance for the bug
// (too many pending merges) to appear: // (too many pending merges) to appear:
Thread.sleep(20); Thread.sleep(20);
super.doMerge(merge); super.doMerge(writer, merge);
} finally { } finally {
runningMergeCount.decrementAndGet(); runningMergeCount.decrementAndGet();
} }
@ -358,10 +358,10 @@ public class TestConcurrentMergeScheduler extends LuceneTestCase {
} }
@Override @Override
public void doMerge(MergePolicy.OneMerge merge) throws IOException { public void doMerge(IndexWriter writer, MergePolicy.OneMerge merge) throws IOException {
totMergedBytes += merge.totalBytesSize(); totMergedBytes += merge.totalBytesSize();
atLeastOneMerge.countDown(); atLeastOneMerge.countDown();
super.doMerge(merge); super.doMerge(writer, merge);
} }
} }
@ -428,7 +428,7 @@ public class TestConcurrentMergeScheduler extends LuceneTestCase {
final AtomicInteger runningMergeCount = new AtomicInteger(); final AtomicInteger runningMergeCount = new AtomicInteger();
@Override @Override
public void doMerge(MergePolicy.OneMerge merge) throws IOException { public void doMerge(IndexWriter writer, MergePolicy.OneMerge merge) throws IOException {
int count = runningMergeCount.incrementAndGet(); int count = runningMergeCount.incrementAndGet();
// evil? // evil?
synchronized (this) { synchronized (this) {
@ -437,7 +437,7 @@ public class TestConcurrentMergeScheduler extends LuceneTestCase {
} }
} }
try { try {
super.doMerge(merge); super.doMerge(writer, merge);
} finally { } finally {
runningMergeCount.decrementAndGet(); runningMergeCount.decrementAndGet();
} }
@ -489,7 +489,7 @@ public class TestConcurrentMergeScheduler extends LuceneTestCase {
IndexWriterConfig iwc = newIndexWriterConfig(new MockAnalyzer(random())); IndexWriterConfig iwc = newIndexWriterConfig(new MockAnalyzer(random()));
iwc.setMergeScheduler(new ConcurrentMergeScheduler() { iwc.setMergeScheduler(new ConcurrentMergeScheduler() {
@Override @Override
protected void maybeStall() { protected void maybeStall(IndexWriter writer) {
wasCalled.set(true); wasCalled.set(true);
} }
}); });
@ -514,14 +514,14 @@ public class TestConcurrentMergeScheduler extends LuceneTestCase {
final CountDownLatch mergeFinish = new CountDownLatch(1); final CountDownLatch mergeFinish = new CountDownLatch(1);
ConcurrentMergeScheduler cms = new ConcurrentMergeScheduler() { ConcurrentMergeScheduler cms = new ConcurrentMergeScheduler() {
@Override @Override
protected void doMerge(MergePolicy.OneMerge merge) throws IOException { protected void doMerge(IndexWriter writer, MergePolicy.OneMerge merge) throws IOException {
mergeStart.countDown(); mergeStart.countDown();
try { try {
mergeFinish.await(); mergeFinish.await();
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
throw new RuntimeException(ie); throw new RuntimeException(ie);
} }
super.doMerge(merge); super.doMerge(writer, merge);
} }
}; };
cms.setMaxMergesAndThreads(1, 1); cms.setMaxMergesAndThreads(1, 1);
@ -629,7 +629,7 @@ public class TestConcurrentMergeScheduler extends LuceneTestCase {
ConcurrentMergeScheduler cms = new ConcurrentMergeScheduler(); ConcurrentMergeScheduler cms = new ConcurrentMergeScheduler();
cms.setDefaultMaxMergesAndThreads(true); cms.setDefaultMaxMergesAndThreads(true);
assertEquals(1, cms.getMaxThreadCount()); assertEquals(1, cms.getMaxThreadCount());
assertEquals(2, cms.getMaxMergeCount()); assertEquals(6, cms.getMaxMergeCount());
} }
public void testNonSpinningDefaults() throws Exception { public void testNonSpinningDefaults() throws Exception {
@ -637,7 +637,7 @@ public class TestConcurrentMergeScheduler extends LuceneTestCase {
cms.setDefaultMaxMergesAndThreads(false); cms.setDefaultMaxMergesAndThreads(false);
int threadCount = cms.getMaxThreadCount(); int threadCount = cms.getMaxThreadCount();
assertTrue(threadCount >= 1); assertTrue(threadCount >= 1);
assertTrue(threadCount <= 3); assertTrue(threadCount <= 4);
assertEquals(cms.getMaxMergeCount(), 2+threadCount); assertEquals(5+threadCount, cms.getMaxMergeCount());
} }
} }

View File

@ -39,8 +39,8 @@ import org.apache.lucene.document.TextField;
import org.apache.lucene.index.IndexWriterConfig.OpenMode; import org.apache.lucene.index.IndexWriterConfig.OpenMode;
import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.store.Directory; import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSLockFactory;
import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.MergeInfo;
import org.apache.lucene.store.MockDirectoryWrapper; import org.apache.lucene.store.MockDirectoryWrapper;
import org.apache.lucene.store.TrackingDirectoryWrapper; import org.apache.lucene.store.TrackingDirectoryWrapper;
import org.apache.lucene.util.InfoStream; import org.apache.lucene.util.InfoStream;
@ -48,7 +48,6 @@ import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.StringHelper; import org.apache.lucene.util.StringHelper;
import org.apache.lucene.util.Version; import org.apache.lucene.util.Version;
/** JUnit adaptation of an older test case DocTest. */ /** JUnit adaptation of an older test case DocTest. */
public class TestDoc extends LuceneTestCase { public class TestDoc extends LuceneTestCase {
@ -215,7 +214,7 @@ public class TestDoc extends LuceneTestCase {
private SegmentCommitInfo merge(Directory dir, SegmentCommitInfo si1, SegmentCommitInfo si2, String merged, boolean useCompoundFile) private SegmentCommitInfo merge(Directory dir, SegmentCommitInfo si1, SegmentCommitInfo si2, String merged, boolean useCompoundFile)
throws Exception { throws Exception {
IOContext context = newIOContext(random()); IOContext context = newIOContext(random(), new IOContext(new MergeInfo(-1, -1, false, -1)));
SegmentReader r1 = new SegmentReader(si1, context); SegmentReader r1 = new SegmentReader(si1, context);
SegmentReader r2 = new SegmentReader(si2, context); SegmentReader r2 = new SegmentReader(si2, context);
@ -225,7 +224,7 @@ public class TestDoc extends LuceneTestCase {
SegmentMerger merger = new SegmentMerger(Arrays.<LeafReader>asList(r1, r2), SegmentMerger merger = new SegmentMerger(Arrays.<LeafReader>asList(r1, r2),
si, InfoStream.getDefault(), trackingDir, si, InfoStream.getDefault(), trackingDir,
MergeState.CheckAbort.NONE, new FieldInfos.FieldNumbers(), context); new FieldInfos.FieldNumbers(), context);
MergeState mergeState = merger.merge(); MergeState mergeState = merger.merge();
r1.close(); r1.close();
@ -233,7 +232,7 @@ public class TestDoc extends LuceneTestCase {
si.setFiles(new HashSet<>(trackingDir.getCreatedFiles())); si.setFiles(new HashSet<>(trackingDir.getCreatedFiles()));
if (useCompoundFile) { if (useCompoundFile) {
Collection<String> filesToDelete = IndexWriter.createCompoundFile(InfoStream.getDefault(), dir, MergeState.CheckAbort.NONE, si, newIOContext(random())); Collection<String> filesToDelete = IndexWriter.createCompoundFile(InfoStream.getDefault(), dir, si, newIOContext(random()));
si.setUseCompoundFile(true); si.setUseCompoundFile(true);
for (final String fileToDelete : filesToDelete) { for (final String fileToDelete : filesToDelete) {
si1.info.dir.deleteFile(fileToDelete); si1.info.dir.deleteFile(fileToDelete);

View File

@ -432,7 +432,7 @@ public class TestIndexFileDeleter extends LuceneTestCase {
if (ms instanceof ConcurrentMergeScheduler) { if (ms instanceof ConcurrentMergeScheduler) {
final ConcurrentMergeScheduler suppressFakeFail = new ConcurrentMergeScheduler() { final ConcurrentMergeScheduler suppressFakeFail = new ConcurrentMergeScheduler() {
@Override @Override
protected void handleMergeException(Throwable exc) { protected void handleMergeException(Directory dir, Throwable exc) {
// suppress only FakeIOException: // suppress only FakeIOException:
if (exc instanceof RuntimeException && exc.getMessage().equals("fake fail")) { if (exc instanceof RuntimeException && exc.getMessage().equals("fake fail")) {
// ok to ignore // ok to ignore
@ -440,13 +440,12 @@ public class TestIndexFileDeleter extends LuceneTestCase {
&& exc.getCause() != null && "fake fail".equals(exc.getCause().getMessage())) { && exc.getCause() != null && "fake fail".equals(exc.getCause().getMessage())) {
// also ok to ignore // also ok to ignore
} else { } else {
super.handleMergeException(exc); super.handleMergeException(dir, exc);
} }
} }
}; };
final ConcurrentMergeScheduler cms = (ConcurrentMergeScheduler) ms; final ConcurrentMergeScheduler cms = (ConcurrentMergeScheduler) ms;
suppressFakeFail.setMaxMergesAndThreads(cms.getMaxMergeCount(), cms.getMaxThreadCount()); suppressFakeFail.setMaxMergesAndThreads(cms.getMaxMergeCount(), cms.getMaxThreadCount());
suppressFakeFail.setMergeThreadPriority(cms.getMergeThreadPriority());
iwc.setMergeScheduler(suppressFakeFail); iwc.setMergeScheduler(suppressFakeFail);
} }

View File

@ -2563,7 +2563,7 @@ public class TestIndexWriter extends LuceneTestCase {
iwc.setMergeScheduler(new ConcurrentMergeScheduler() { iwc.setMergeScheduler(new ConcurrentMergeScheduler() {
@Override @Override
public void doMerge(MergePolicy.OneMerge merge) throws IOException { public void doMerge(IndexWriter writer, MergePolicy.OneMerge merge) throws IOException {
mergeStarted.countDown(); mergeStarted.countDown();
try { try {
closeStarted.await(); closeStarted.await();
@ -2571,7 +2571,7 @@ public class TestIndexWriter extends LuceneTestCase {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
throw new RuntimeException(ie); throw new RuntimeException(ie);
} }
super.doMerge(merge); super.doMerge(writer, merge);
} }
@Override @Override

View File

@ -57,15 +57,15 @@ import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput; import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.MockDirectoryWrapper.FakeIOException;
import org.apache.lucene.store.MockDirectoryWrapper; import org.apache.lucene.store.MockDirectoryWrapper;
import org.apache.lucene.store.MockDirectoryWrapper.FakeIOException;
import org.apache.lucene.store.RAMDirectory; import org.apache.lucene.store.RAMDirectory;
import org.apache.lucene.util.Bits; import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.InfoStream; import org.apache.lucene.util.InfoStream;
import org.apache.lucene.util.LuceneTestCase; import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.TestUtil;
import org.apache.lucene.util.LuceneTestCase.SuppressCodecs; import org.apache.lucene.util.LuceneTestCase.SuppressCodecs;
import org.apache.lucene.util.TestUtil;
@SuppressCodecs("SimpleText") // too slow here @SuppressCodecs("SimpleText") // too slow here
public class TestIndexWriterExceptions extends LuceneTestCase { public class TestIndexWriterExceptions extends LuceneTestCase {
@ -1951,16 +1951,15 @@ public class TestIndexWriterExceptions extends LuceneTestCase {
if (ms instanceof ConcurrentMergeScheduler) { if (ms instanceof ConcurrentMergeScheduler) {
final ConcurrentMergeScheduler suppressFakeIOE = new ConcurrentMergeScheduler() { final ConcurrentMergeScheduler suppressFakeIOE = new ConcurrentMergeScheduler() {
@Override @Override
protected void handleMergeException(Throwable exc) { protected void handleMergeException(Directory dir, Throwable exc) {
// suppress only FakeIOException: // suppress only FakeIOException:
if (!(exc instanceof FakeIOException)) { if (!(exc instanceof FakeIOException)) {
super.handleMergeException(exc); super.handleMergeException(dir, exc);
} }
} }
}; };
final ConcurrentMergeScheduler cms = (ConcurrentMergeScheduler) ms; final ConcurrentMergeScheduler cms = (ConcurrentMergeScheduler) ms;
suppressFakeIOE.setMaxMergesAndThreads(cms.getMaxMergeCount(), cms.getMaxThreadCount()); suppressFakeIOE.setMaxMergesAndThreads(cms.getMaxMergeCount(), cms.getMaxThreadCount());
suppressFakeIOE.setMergeThreadPriority(cms.getMergeThreadPriority());
iwc.setMergeScheduler(suppressFakeIOE); iwc.setMergeScheduler(suppressFakeIOE);
} }

View File

@ -25,6 +25,8 @@ import org.apache.lucene.codecs.Codec;
import org.apache.lucene.document.Document; import org.apache.lucene.document.Document;
import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.store.Directory; import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.MergeInfo;
import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.FixedBitSet; import org.apache.lucene.util.FixedBitSet;
import org.apache.lucene.util.InfoStream; import org.apache.lucene.util.InfoStream;
@ -84,7 +86,8 @@ public class TestSegmentMerger extends LuceneTestCase {
SegmentMerger merger = new SegmentMerger(Arrays.<LeafReader>asList(reader1, reader2), SegmentMerger merger = new SegmentMerger(Arrays.<LeafReader>asList(reader1, reader2),
si, InfoStream.getDefault(), mergedDir, si, InfoStream.getDefault(), mergedDir,
MergeState.CheckAbort.NONE, new FieldInfos.FieldNumbers(), newIOContext(random())); new FieldInfos.FieldNumbers(),
newIOContext(random(), new IOContext(new MergeInfo(-1, -1, false, -1))));
MergeState mergeState = merger.merge(); MergeState mergeState = merger.merge();
int docsMerged = mergeState.segmentInfo.getDocCount(); int docsMerged = mergeState.segmentInfo.getDocCount();
assertTrue(docsMerged == 2); assertTrue(docsMerged == 2);

View File

@ -1,44 +0,0 @@
package org.apache.lucene.store;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.nio.file.Path;
import org.apache.lucene.util.TestUtil;
public class TestRateLimitedDirectoryWrapper extends BaseDirectoryTestCase {
@Override
protected Directory getDirectory(Path path) {
Directory in = newFSDirectory(path);
if (in instanceof MockDirectoryWrapper) {
// test manipulates directory directly
((MockDirectoryWrapper)in).setEnableVirusScanner(false);
}
RateLimitedDirectoryWrapper dir = new RateLimitedDirectoryWrapper(in);
RateLimiter limiter = new RateLimiter.SimpleRateLimiter(TestUtil.nextInt(random(), 10, 40));
dir.setRateLimiter(limiter, IOContext.Context.MERGE);
return dir;
}
// since we are rate-limiting, this test gets pretty slow
@Override @Nightly
public void testThreadSafety() throws Exception {
super.testThreadSafety();
}
}

View File

@ -29,8 +29,8 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.lucene.analysis.Analyzer.TokenStreamComponents;
import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.Analyzer.TokenStreamComponents;
import org.apache.lucene.analysis.MockAnalyzer; import org.apache.lucene.analysis.MockAnalyzer;
import org.apache.lucene.analysis.MockTokenFilter; import org.apache.lucene.analysis.MockTokenFilter;
import org.apache.lucene.analysis.MockTokenizer; import org.apache.lucene.analysis.MockTokenizer;
@ -387,7 +387,7 @@ public class TestIDVersionPostingsFormat extends LuceneTestCase {
if (ms instanceof ConcurrentMergeScheduler) { if (ms instanceof ConcurrentMergeScheduler) {
iwc.setMergeScheduler(new ConcurrentMergeScheduler() { iwc.setMergeScheduler(new ConcurrentMergeScheduler() {
@Override @Override
protected void handleMergeException(Throwable exc) { protected void handleMergeException(Directory dir, Throwable exc) {
assertTrue(exc instanceof IllegalArgumentException); assertTrue(exc instanceof IllegalArgumentException);
} }
}); });

View File

@ -22,7 +22,6 @@ import java.util.Collection;
import java.util.Random; import java.util.Random;
import org.apache.lucene.codecs.CompoundFormat; import org.apache.lucene.codecs.CompoundFormat;
import org.apache.lucene.index.MergeState.CheckAbort;
import org.apache.lucene.index.SegmentInfo; import org.apache.lucene.index.SegmentInfo;
import org.apache.lucene.store.Directory; import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IOContext;
@ -42,11 +41,11 @@ class CrankyCompoundFormat extends CompoundFormat {
} }
@Override @Override
public void write(Directory dir, SegmentInfo si, Collection<String> files, CheckAbort checkAbort, IOContext context) throws IOException { public void write(Directory dir, SegmentInfo si, Collection<String> files, IOContext context) throws IOException {
if (random.nextInt(100) == 0) { if (random.nextInt(100) == 0) {
throw new IOException("Fake IOException from CompoundFormat.write()"); throw new IOException("Fake IOException from CompoundFormat.write()");
} }
delegate.write(dir, si, files, checkAbort, context); delegate.write(dir, si, files, context);
} }
@Override @Override

View File

@ -55,7 +55,7 @@ public abstract class BaseCompoundFormatTestCase extends BaseIndexFileFormatTest
Directory dir = newDirectory(); Directory dir = newDirectory();
SegmentInfo si = newSegmentInfo(dir, "_123"); SegmentInfo si = newSegmentInfo(dir, "_123");
si.getCodec().compoundFormat().write(dir, si, Collections.<String>emptyList(), MergeState.CheckAbort.NONE, IOContext.DEFAULT); si.getCodec().compoundFormat().write(dir, si, Collections.<String>emptyList(), IOContext.DEFAULT);
Directory cfs = si.getCodec().compoundFormat().getCompoundReader(dir, si, IOContext.DEFAULT); Directory cfs = si.getCodec().compoundFormat().getCompoundReader(dir, si, IOContext.DEFAULT);
assertEquals(0, cfs.listAll().length); assertEquals(0, cfs.listAll().length);
cfs.close(); cfs.close();
@ -74,7 +74,7 @@ public abstract class BaseCompoundFormatTestCase extends BaseIndexFileFormatTest
createSequenceFile(dir, testfile, (byte) 0, data[i]); createSequenceFile(dir, testfile, (byte) 0, data[i]);
SegmentInfo si = newSegmentInfo(dir, "_" + i); SegmentInfo si = newSegmentInfo(dir, "_" + i);
si.getCodec().compoundFormat().write(dir, si, Collections.singleton(testfile), MergeState.CheckAbort.NONE, IOContext.DEFAULT); si.getCodec().compoundFormat().write(dir, si, Collections.singleton(testfile), IOContext.DEFAULT);
Directory cfs = si.getCodec().compoundFormat().getCompoundReader(dir, si, IOContext.DEFAULT); Directory cfs = si.getCodec().compoundFormat().getCompoundReader(dir, si, IOContext.DEFAULT);
IndexInput expected = dir.openInput(testfile, newIOContext(random())); IndexInput expected = dir.openInput(testfile, newIOContext(random()));
@ -98,7 +98,7 @@ public abstract class BaseCompoundFormatTestCase extends BaseIndexFileFormatTest
createSequenceFile(dir, files[1], (byte) 0, 114); createSequenceFile(dir, files[1], (byte) 0, 114);
SegmentInfo si = newSegmentInfo(dir, "_123"); SegmentInfo si = newSegmentInfo(dir, "_123");
si.getCodec().compoundFormat().write(dir, si, Arrays.asList(files), MergeState.CheckAbort.NONE, IOContext.DEFAULT); si.getCodec().compoundFormat().write(dir, si, Arrays.asList(files), IOContext.DEFAULT);
Directory cfs = si.getCodec().compoundFormat().getCompoundReader(dir, si, IOContext.DEFAULT); Directory cfs = si.getCodec().compoundFormat().getCompoundReader(dir, si, IOContext.DEFAULT);
for (String file : files) { for (String file : files) {
@ -124,7 +124,7 @@ public abstract class BaseCompoundFormatTestCase extends BaseIndexFileFormatTest
out.close(); out.close();
SegmentInfo si = newSegmentInfo(dir, "_123"); SegmentInfo si = newSegmentInfo(dir, "_123");
si.getCodec().compoundFormat().write(dir, si, Collections.singleton(testfile), MergeState.CheckAbort.NONE, IOContext.DEFAULT); si.getCodec().compoundFormat().write(dir, si, Collections.singleton(testfile), IOContext.DEFAULT);
Directory cfs = si.getCodec().compoundFormat().getCompoundReader(dir, si, IOContext.DEFAULT); Directory cfs = si.getCodec().compoundFormat().getCompoundReader(dir, si, IOContext.DEFAULT);
assertEquals(1, cfs.listAll().length); assertEquals(1, cfs.listAll().length);
cfs.close(); cfs.close();
@ -149,7 +149,7 @@ public abstract class BaseCompoundFormatTestCase extends BaseIndexFileFormatTest
out.close(); out.close();
SegmentInfo si = newSegmentInfo(dir, "_123"); SegmentInfo si = newSegmentInfo(dir, "_123");
si.getCodec().compoundFormat().write(dir, si, Collections.singleton(testfile), MergeState.CheckAbort.NONE, myContext); si.getCodec().compoundFormat().write(dir, si, Collections.singleton(testfile), myContext);
dir.close(); dir.close();
} }
@ -168,7 +168,7 @@ public abstract class BaseCompoundFormatTestCase extends BaseIndexFileFormatTest
out.close(); out.close();
SegmentInfo si = newSegmentInfo(dir, "_123"); SegmentInfo si = newSegmentInfo(dir, "_123");
si.getCodec().compoundFormat().write(dir, si, Collections.singleton(testfile), MergeState.CheckAbort.NONE, context); si.getCodec().compoundFormat().write(dir, si, Collections.singleton(testfile), context);
dir.close(); dir.close();
} }
@ -218,7 +218,7 @@ public abstract class BaseCompoundFormatTestCase extends BaseIndexFileFormatTest
Directory dir = newDirectory(); Directory dir = newDirectory();
SegmentInfo si = newSegmentInfo(dir, "_123"); SegmentInfo si = newSegmentInfo(dir, "_123");
si.getCodec().compoundFormat().write(dir, si, Collections.<String>emptyList(), MergeState.CheckAbort.NONE, IOContext.DEFAULT); si.getCodec().compoundFormat().write(dir, si, Collections.<String>emptyList(), IOContext.DEFAULT);
Directory cfs = si.getCodec().compoundFormat().getCompoundReader(dir, si, IOContext.DEFAULT); Directory cfs = si.getCodec().compoundFormat().getCompoundReader(dir, si, IOContext.DEFAULT);
try { try {
cfs.createOutput("bogus", IOContext.DEFAULT); cfs.createOutput("bogus", IOContext.DEFAULT);
@ -240,7 +240,7 @@ public abstract class BaseCompoundFormatTestCase extends BaseIndexFileFormatTest
out.close(); out.close();
SegmentInfo si = newSegmentInfo(dir, "_123"); SegmentInfo si = newSegmentInfo(dir, "_123");
si.getCodec().compoundFormat().write(dir, si, Collections.<String>emptyList(), MergeState.CheckAbort.NONE, IOContext.DEFAULT); si.getCodec().compoundFormat().write(dir, si, Collections.<String>emptyList(), IOContext.DEFAULT);
Directory cfs = si.getCodec().compoundFormat().getCompoundReader(dir, si, IOContext.DEFAULT); Directory cfs = si.getCodec().compoundFormat().getCompoundReader(dir, si, IOContext.DEFAULT);
try { try {
cfs.deleteFile(testfile); cfs.deleteFile(testfile);
@ -262,7 +262,7 @@ public abstract class BaseCompoundFormatTestCase extends BaseIndexFileFormatTest
out.close(); out.close();
SegmentInfo si = newSegmentInfo(dir, "_123"); SegmentInfo si = newSegmentInfo(dir, "_123");
si.getCodec().compoundFormat().write(dir, si, Collections.<String>emptyList(), MergeState.CheckAbort.NONE, IOContext.DEFAULT); si.getCodec().compoundFormat().write(dir, si, Collections.<String>emptyList(), IOContext.DEFAULT);
Directory cfs = si.getCodec().compoundFormat().getCompoundReader(dir, si, IOContext.DEFAULT); Directory cfs = si.getCodec().compoundFormat().getCompoundReader(dir, si, IOContext.DEFAULT);
try { try {
cfs.renameFile(testfile, "bogus"); cfs.renameFile(testfile, "bogus");
@ -284,7 +284,7 @@ public abstract class BaseCompoundFormatTestCase extends BaseIndexFileFormatTest
out.close(); out.close();
SegmentInfo si = newSegmentInfo(dir, "_123"); SegmentInfo si = newSegmentInfo(dir, "_123");
si.getCodec().compoundFormat().write(dir, si, Collections.<String>emptyList(), MergeState.CheckAbort.NONE, IOContext.DEFAULT); si.getCodec().compoundFormat().write(dir, si, Collections.<String>emptyList(), IOContext.DEFAULT);
Directory cfs = si.getCodec().compoundFormat().getCompoundReader(dir, si, IOContext.DEFAULT); Directory cfs = si.getCodec().compoundFormat().getCompoundReader(dir, si, IOContext.DEFAULT);
try { try {
cfs.sync(Collections.singleton(testfile)); cfs.sync(Collections.singleton(testfile));
@ -306,7 +306,7 @@ public abstract class BaseCompoundFormatTestCase extends BaseIndexFileFormatTest
out.close(); out.close();
SegmentInfo si = newSegmentInfo(dir, "_123"); SegmentInfo si = newSegmentInfo(dir, "_123");
si.getCodec().compoundFormat().write(dir, si, Collections.<String>emptyList(), MergeState.CheckAbort.NONE, IOContext.DEFAULT); si.getCodec().compoundFormat().write(dir, si, Collections.<String>emptyList(), IOContext.DEFAULT);
Directory cfs = si.getCodec().compoundFormat().getCompoundReader(dir, si, IOContext.DEFAULT); Directory cfs = si.getCodec().compoundFormat().getCompoundReader(dir, si, IOContext.DEFAULT);
try { try {
cfs.makeLock("foobar"); cfs.makeLock("foobar");
@ -345,7 +345,7 @@ public abstract class BaseCompoundFormatTestCase extends BaseIndexFileFormatTest
String files[] = dir.listAll(); String files[] = dir.listAll();
SegmentInfo si = newSegmentInfo(dir, "_123"); SegmentInfo si = newSegmentInfo(dir, "_123");
si.getCodec().compoundFormat().write(dir, si, Arrays.asList(files), MergeState.CheckAbort.NONE, IOContext.DEFAULT); si.getCodec().compoundFormat().write(dir, si, Arrays.asList(files), IOContext.DEFAULT);
Directory cfs = si.getCodec().compoundFormat().getCompoundReader(dir, si, IOContext.DEFAULT); Directory cfs = si.getCodec().compoundFormat().getCompoundReader(dir, si, IOContext.DEFAULT);
for (int i = 0; i < files.length; i++) { for (int i = 0; i < files.length; i++) {
@ -376,7 +376,7 @@ public abstract class BaseCompoundFormatTestCase extends BaseIndexFileFormatTest
assertEquals(0, dir.getFileHandleCount()); assertEquals(0, dir.getFileHandleCount());
SegmentInfo si = newSegmentInfo(dir, "_123"); SegmentInfo si = newSegmentInfo(dir, "_123");
si.getCodec().compoundFormat().write(dir, si, Arrays.asList(dir.listAll()), MergeState.CheckAbort.NONE, IOContext.DEFAULT); si.getCodec().compoundFormat().write(dir, si, Arrays.asList(dir.listAll()), IOContext.DEFAULT);
Directory cfs = si.getCodec().compoundFormat().getCompoundReader(dir, si, IOContext.DEFAULT); Directory cfs = si.getCodec().compoundFormat().getCompoundReader(dir, si, IOContext.DEFAULT);
final IndexInput[] ins = new IndexInput[FILE_COUNT]; final IndexInput[] ins = new IndexInput[FILE_COUNT];
@ -729,7 +729,7 @@ public abstract class BaseCompoundFormatTestCase extends BaseIndexFileFormatTest
} }
SegmentInfo si = newSegmentInfo(dir, "_123"); SegmentInfo si = newSegmentInfo(dir, "_123");
si.getCodec().compoundFormat().write(dir, si, files, MergeState.CheckAbort.NONE, IOContext.DEFAULT); si.getCodec().compoundFormat().write(dir, si, files, IOContext.DEFAULT);
Directory cfs = si.getCodec().compoundFormat().getCompoundReader(dir, si, IOContext.DEFAULT); Directory cfs = si.getCodec().compoundFormat().getCompoundReader(dir, si, IOContext.DEFAULT);
return cfs; return cfs;
} }

View File

@ -1,5 +1,7 @@
package org.apache.lucene.index; package org.apache.lucene.index;
import org.apache.lucene.store.Directory;
/* /*
* Licensed to the Apache Software Foundation (ASF) under one or more * Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with * contributor license agreements. See the NOTICE file distributed with
@ -17,22 +19,17 @@ package org.apache.lucene.index;
* limitations under the License. * limitations under the License.
*/ */
import java.io.IOException;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.util.IOUtils;
/** A {@link ConcurrentMergeScheduler} that ignores AlreadyClosedException. */ /** A {@link ConcurrentMergeScheduler} that ignores AlreadyClosedException. */
public abstract class SuppressingConcurrentMergeScheduler extends ConcurrentMergeScheduler { public abstract class SuppressingConcurrentMergeScheduler extends ConcurrentMergeScheduler {
@Override @Override
protected void handleMergeException(Throwable exc) { protected void handleMergeException(Directory dir, Throwable exc) {
while (true) { while (true) {
if (isOK(exc)) { if (isOK(exc)) {
return; return;
} }
exc = exc.getCause(); exc = exc.getCause();
if (exc == null) { if (exc == null) {
super.handleMergeException(exc); super.handleMergeException(dir, exc);
} }
} }
} }

View File

@ -611,7 +611,7 @@ public class MockDirectoryWrapper extends BaseDirectoryWrapper {
// throttling REALLY slows down tests, so don't do it very often for SOMETIMES. // throttling REALLY slows down tests, so don't do it very often for SOMETIMES.
if (throttling == Throttling.ALWAYS || if (throttling == Throttling.ALWAYS ||
(throttling == Throttling.SOMETIMES && randomState.nextInt(200) == 0) && !(in instanceof RateLimitedDirectoryWrapper)) { (throttling == Throttling.SOMETIMES && randomState.nextInt(200) == 0)) {
if (LuceneTestCase.VERBOSE) { if (LuceneTestCase.VERBOSE) {
System.out.println("MockDirectoryWrapper: throttling indexOutput (" + name + ")"); System.out.println("MockDirectoryWrapper: throttling indexOutput (" + name + ")");
} }

View File

@ -123,13 +123,11 @@ import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.FSLockFactory; import org.apache.lucene.store.FSLockFactory;
import org.apache.lucene.store.FlushInfo; import org.apache.lucene.store.FlushInfo;
import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IOContext.Context;
import org.apache.lucene.store.LockFactory; import org.apache.lucene.store.LockFactory;
import org.apache.lucene.store.MergeInfo; import org.apache.lucene.store.MergeInfo;
import org.apache.lucene.store.MockDirectoryWrapper; import org.apache.lucene.store.MockDirectoryWrapper;
import org.apache.lucene.store.MockDirectoryWrapper.Throttling; import org.apache.lucene.store.MockDirectoryWrapper.Throttling;
import org.apache.lucene.store.NRTCachingDirectory; import org.apache.lucene.store.NRTCachingDirectory;
import org.apache.lucene.store.RateLimitedDirectoryWrapper;
import org.apache.lucene.util.automaton.AutomatonTestUtil; import org.apache.lucene.util.automaton.AutomatonTestUtil;
import org.apache.lucene.util.automaton.CompiledAutomaton; import org.apache.lucene.util.automaton.CompiledAutomaton;
import org.apache.lucene.util.automaton.RegExp; import org.apache.lucene.util.automaton.RegExp;
@ -914,13 +912,17 @@ public abstract class LuceneTestCase extends Assert {
} else { } else {
cms = new ConcurrentMergeScheduler() { cms = new ConcurrentMergeScheduler() {
@Override @Override
protected synchronized void maybeStall() { protected synchronized void maybeStall(IndexWriter writer) {
} }
}; };
} }
int maxThreadCount = TestUtil.nextInt(r, 1, 4); int maxThreadCount = TestUtil.nextInt(r, 1, 4);
int maxMergeCount = TestUtil.nextInt(r, maxThreadCount, maxThreadCount + 4); int maxMergeCount = TestUtil.nextInt(r, maxThreadCount, maxThreadCount + 4);
cms.setMaxMergesAndThreads(maxMergeCount, maxThreadCount); cms.setMaxMergesAndThreads(maxMergeCount, maxThreadCount);
if (random().nextBoolean()) {
cms.disableAutoIOThrottle();
}
cms.setForceMergeMBPerSec(10 + 10*random().nextDouble());
c.setMergeScheduler(cms); c.setMergeScheduler(cms);
} else { } else {
// Always use consistent settings, else CMS's dynamic (SSD or not) // Always use consistent settings, else CMS's dynamic (SSD or not)
@ -1347,27 +1349,6 @@ public abstract class LuceneTestCase extends Assert {
directory = new NRTCachingDirectory(directory, random.nextDouble(), random.nextDouble()); directory = new NRTCachingDirectory(directory, random.nextDouble(), random.nextDouble());
} }
if (TEST_NIGHTLY && rarely(random) && !bare) {
final double maxMBPerSec = TestUtil.nextInt(random, 20, 40);
if (LuceneTestCase.VERBOSE) {
System.out.println("LuceneTestCase: will rate limit output IndexOutput to " + maxMBPerSec + " MB/sec");
}
final RateLimitedDirectoryWrapper rateLimitedDirectoryWrapper = new RateLimitedDirectoryWrapper(directory);
switch (random.nextInt(10)) {
case 3: // sometimes rate limit on flush
rateLimitedDirectoryWrapper.setMaxWriteMBPerSec(maxMBPerSec, Context.FLUSH);
break;
case 2: // sometimes rate limit flush & merge
rateLimitedDirectoryWrapper.setMaxWriteMBPerSec(maxMBPerSec, Context.FLUSH);
rateLimitedDirectoryWrapper.setMaxWriteMBPerSec(maxMBPerSec, Context.MERGE);
break;
default:
rateLimitedDirectoryWrapper.setMaxWriteMBPerSec(maxMBPerSec, Context.MERGE);
}
directory = rateLimitedDirectoryWrapper;
}
if (bare) { if (bare) {
BaseDirectoryWrapper base = new BaseDirectoryWrapper(directory); BaseDirectoryWrapper base = new BaseDirectoryWrapper(directory);
closeAfterSuite(new CloseableDirectory(base, suiteFailureMarker)); closeAfterSuite(new CloseableDirectory(base, suiteFailureMarker));

View File

@ -36,7 +36,6 @@ import org.apache.lucene.store.LockFactory;
import org.apache.lucene.store.NRTCachingDirectory; import org.apache.lucene.store.NRTCachingDirectory;
import org.apache.lucene.store.NativeFSLockFactory; import org.apache.lucene.store.NativeFSLockFactory;
import org.apache.lucene.store.NoLockFactory; import org.apache.lucene.store.NoLockFactory;
import org.apache.lucene.store.RateLimitedDirectoryWrapper;
import org.apache.lucene.store.SimpleFSLockFactory; import org.apache.lucene.store.SimpleFSLockFactory;
import org.apache.lucene.store.SingleInstanceLockFactory; import org.apache.lucene.store.SingleInstanceLockFactory;
import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.IOUtils;
@ -350,7 +349,6 @@ public abstract class CachingDirectoryFactory extends DirectoryFactory {
directory = create(fullPath, createLockFactory(rawLockType), dirContext); directory = create(fullPath, createLockFactory(rawLockType), dirContext);
boolean success = false; boolean success = false;
try { try {
directory = rateLimit(directory);
CacheValue newCacheValue = new CacheValue(fullPath, directory); CacheValue newCacheValue = new CacheValue(fullPath, directory);
byDirectoryCache.put(directory, newCacheValue); byDirectoryCache.put(directory, newCacheValue);
byPathCache.put(fullPath, newCacheValue); byPathCache.put(fullPath, newCacheValue);
@ -370,25 +368,6 @@ public abstract class CachingDirectoryFactory extends DirectoryFactory {
} }
} }
private Directory rateLimit(Directory directory) {
if (maxWriteMBPerSecDefault != null || maxWriteMBPerSecFlush != null || maxWriteMBPerSecMerge != null || maxWriteMBPerSecRead != null) {
directory = new RateLimitedDirectoryWrapper(directory);
if (maxWriteMBPerSecDefault != null) {
((RateLimitedDirectoryWrapper)directory).setMaxWriteMBPerSec(maxWriteMBPerSecDefault, Context.DEFAULT);
}
if (maxWriteMBPerSecFlush != null) {
((RateLimitedDirectoryWrapper)directory).setMaxWriteMBPerSec(maxWriteMBPerSecFlush, Context.FLUSH);
}
if (maxWriteMBPerSecMerge != null) {
((RateLimitedDirectoryWrapper)directory).setMaxWriteMBPerSec(maxWriteMBPerSecMerge, Context.MERGE);
}
if (maxWriteMBPerSecRead != null) {
((RateLimitedDirectoryWrapper)directory).setMaxWriteMBPerSec(maxWriteMBPerSecRead, Context.READ);
}
}
return directory;
}
/* /*
* (non-Javadoc) * (non-Javadoc)
* *

View File

@ -28,7 +28,6 @@ import org.apache.lucene.store.LockFactory;
import org.apache.lucene.store.NRTCachingDirectory; import org.apache.lucene.store.NRTCachingDirectory;
import org.apache.lucene.store.NativeFSLockFactory; import org.apache.lucene.store.NativeFSLockFactory;
import org.apache.lucene.store.NoLockFactory; import org.apache.lucene.store.NoLockFactory;
import org.apache.lucene.store.RateLimitedDirectoryWrapper;
import org.apache.lucene.store.SimpleFSLockFactory; import org.apache.lucene.store.SimpleFSLockFactory;
import org.apache.lucene.store.SingleInstanceLockFactory; import org.apache.lucene.store.SingleInstanceLockFactory;
import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException;
@ -113,8 +112,7 @@ public class StandardDirectoryFactory extends CachingDirectoryFactory {
* carefully - some Directory wrappers will * carefully - some Directory wrappers will
* cache files for example. * cache files for example.
* *
* This implementation works with two wrappers: * This implementation works with NRTCachingDirectory.
* NRTCachingDirectory and RateLimitedDirectoryWrapper.
* *
* You should first {@link Directory#sync(java.util.Collection)} any file that will be * You should first {@link Directory#sync(java.util.Collection)} any file that will be
* moved or avoid cached files through settings. * moved or avoid cached files through settings.
@ -143,13 +141,11 @@ public class StandardDirectoryFactory extends CachingDirectoryFactory {
super.move(fromDir, toDir, fileName, ioContext); super.move(fromDir, toDir, fileName, ioContext);
} }
// special hack to work with NRTCachingDirectory and RateLimitedDirectoryWrapper // special hack to work with NRTCachingDirectory
private Directory getBaseDir(Directory dir) { private Directory getBaseDir(Directory dir) {
Directory baseDir; Directory baseDir;
if (dir instanceof NRTCachingDirectory) { if (dir instanceof NRTCachingDirectory) {
baseDir = ((NRTCachingDirectory)dir).getDelegate(); baseDir = ((NRTCachingDirectory)dir).getDelegate();
} else if (dir instanceof RateLimitedDirectoryWrapper) {
baseDir = ((RateLimitedDirectoryWrapper)dir).getDelegate();
} else { } else {
baseDir = dir; baseDir = dir;
} }

View File

@ -37,7 +37,6 @@ import org.apache.lucene.index.StoredDocument;
import org.apache.lucene.store.Directory; import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext.Context; import org.apache.lucene.store.IOContext.Context;
import org.apache.lucene.store.MockDirectoryWrapper; import org.apache.lucene.store.MockDirectoryWrapper;
import org.apache.lucene.store.RateLimitedDirectoryWrapper;
import org.apache.lucene.util.English; import org.apache.lucene.util.English;
import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.CommonParams; import org.apache.solr.common.params.CommonParams;

View File

@ -25,7 +25,6 @@ import org.apache.lucene.store.LockFactory;
import org.apache.lucene.store.MockDirectoryWrapper; import org.apache.lucene.store.MockDirectoryWrapper;
import org.apache.lucene.store.NRTCachingDirectory; import org.apache.lucene.store.NRTCachingDirectory;
import org.apache.lucene.store.NoLockFactory; import org.apache.lucene.store.NoLockFactory;
import org.apache.lucene.store.RateLimitedDirectoryWrapper;
import org.apache.lucene.store.TrackingDirectoryWrapper; import org.apache.lucene.store.TrackingDirectoryWrapper;
import org.apache.lucene.util.LuceneTestCase; import org.apache.lucene.util.LuceneTestCase;
@ -86,9 +85,6 @@ public class MockDirectoryFactory extends EphemeralDirectoryFactory {
if (dir instanceof NRTCachingDirectory) { if (dir instanceof NRTCachingDirectory) {
cdir = ((NRTCachingDirectory)dir).getDelegate(); cdir = ((NRTCachingDirectory)dir).getDelegate();
} }
if (cdir instanceof RateLimitedDirectoryWrapper) {
cdir = ((RateLimitedDirectoryWrapper)dir).getDelegate();
}
if (cdir instanceof TrackingDirectoryWrapper) { if (cdir instanceof TrackingDirectoryWrapper) {
cdir = ((TrackingDirectoryWrapper)dir).getDelegate(); cdir = ((TrackingDirectoryWrapper)dir).getDelegate();
} }

View File

@ -25,7 +25,6 @@ import org.apache.lucene.store.Directory;
import org.apache.lucene.store.LockFactory; import org.apache.lucene.store.LockFactory;
import org.apache.lucene.store.MockDirectoryWrapper; import org.apache.lucene.store.MockDirectoryWrapper;
import org.apache.lucene.store.NRTCachingDirectory; import org.apache.lucene.store.NRTCachingDirectory;
import org.apache.lucene.store.RateLimitedDirectoryWrapper;
import org.apache.lucene.store.TrackingDirectoryWrapper; import org.apache.lucene.store.TrackingDirectoryWrapper;
import org.apache.lucene.util.LuceneTestCase; import org.apache.lucene.util.LuceneTestCase;
@ -69,9 +68,6 @@ public class MockFSDirectoryFactory extends StandardDirectoryFactory {
if (dir instanceof NRTCachingDirectory) { if (dir instanceof NRTCachingDirectory) {
cdir = ((NRTCachingDirectory)dir).getDelegate(); cdir = ((NRTCachingDirectory)dir).getDelegate();
} }
if (cdir instanceof RateLimitedDirectoryWrapper) {
cdir = ((RateLimitedDirectoryWrapper)dir).getDelegate();
}
if (cdir instanceof TrackingDirectoryWrapper) { if (cdir instanceof TrackingDirectoryWrapper) {
cdir = ((TrackingDirectoryWrapper)dir).getDelegate(); cdir = ((TrackingDirectoryWrapper)dir).getDelegate();
} }