From 41fcc722ff07393586f7758d8a733c7e2e59bebc Mon Sep 17 00:00:00 2001 From: Michael McCandless Date: Mon, 5 Jan 2015 14:28:28 +0000 Subject: [PATCH] LUCENE-6119: CMS dynamically rate limits IO writes of each merge depending on incoming merge rate git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1649532 13f79535-47bb-0310-9956-ffa450edef68 --- lucene/CHANGES.txt | 4 + .../simpletext/SimpleTextCompoundFormat.java | 5 +- .../apache/lucene/codecs/CompoundFormat.java | 4 +- .../lucene/codecs/StoredFieldsWriter.java | 1 - .../lucene/codecs/TermVectorsWriter.java | 1 - .../CompressingStoredFieldsWriter.java | 2 - .../CompressingTermVectorsWriter.java | 4 - .../lucene50/Lucene50CompoundFormat.java | 5 +- .../lucene/index/BufferedUpdatesStream.java | 56 +- .../apache/lucene/index/CoalescedUpdates.java | 13 +- .../index/ConcurrentMergeScheduler.java | 536 +++++++++++------- .../apache/lucene/index/DocumentsWriter.java | 12 +- .../index/DocumentsWriterPerThread.java | 2 +- .../org/apache/lucene/index/IndexWriter.java | 113 ++-- .../MappingMultiDocsAndPositionsEnum.java | 9 +- .../lucene/index/MappingMultiDocsEnum.java | 7 - .../org/apache/lucene/index/MergePolicy.java | 79 +-- .../apache/lucene/index/MergeRateLimiter.java | 194 +++++++ .../apache/lucene/index/MergeScheduler.java | 32 ++ .../org/apache/lucene/index/MergeState.java | 53 +- .../apache/lucene/index/NoMergeScheduler.java | 1 - .../apache/lucene/index/SegmentMerger.java | 13 +- .../lucene/index/TieredMergePolicy.java | 2 +- .../apache/lucene/store/FilterDirectory.java | 2 +- .../store/RateLimitedDirectoryWrapper.java | 145 ----- .../lucene/store/RateLimitedIndexOutput.java | 7 +- .../org/apache/lucene/store/RateLimiter.java | 18 +- .../lucene/util/PrintStreamInfoStream.java | 7 +- .../org/apache/lucene/util/StringHelper.java | 2 +- .../lucene/TestMergeSchedulerExternal.java | 12 +- .../index/TestConcurrentMergeScheduler.java | 24 +- .../test/org/apache/lucene/index/TestDoc.java | 405 +++++++------ .../lucene/index/TestIndexFileDeleter.java | 5 +- .../apache/lucene/index/TestIndexWriter.java | 4 +- .../index/TestIndexWriterExceptions.java | 9 +- .../lucene/index/TestSegmentMerger.java | 7 +- .../TestRateLimitedDirectoryWrapper.java | 44 -- .../TestIDVersionPostingsFormat.java | 4 +- .../codecs/cranky/CrankyCompoundFormat.java | 5 +- .../index/BaseCompoundFormatTestCase.java | 28 +- .../SuppressingConcurrentMergeScheduler.java | 11 +- .../lucene/store/MockDirectoryWrapper.java | 2 +- .../apache/lucene/util/LuceneTestCase.java | 29 +- .../solr/core/CachingDirectoryFactory.java | 21 - .../solr/core/StandardDirectoryFactory.java | 8 +- .../apache/solr/BasicFunctionalityTest.java | 1 - .../solr/core/MockDirectoryFactory.java | 4 - .../solr/core/MockFSDirectoryFactory.java | 4 - 48 files changed, 995 insertions(+), 961 deletions(-) create mode 100644 lucene/core/src/java/org/apache/lucene/index/MergeRateLimiter.java delete mode 100644 lucene/core/src/java/org/apache/lucene/store/RateLimitedDirectoryWrapper.java delete mode 100644 lucene/core/src/test/org/apache/lucene/store/TestRateLimitedDirectoryWrapper.java diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt index d309a859d9e..14068e4ea8d 100644 --- a/lucene/CHANGES.txt +++ b/lucene/CHANGES.txt @@ -130,6 +130,10 @@ New Features * LUCENE-5914: Add an option to Lucene50Codec to support either BEST_SPEED or BEST_COMPRESSION for stored fields. (Adrien Grand, Robert Muir) +* LUCENE-6119: Add auto-IO-throttling to ConcurrentMergeScheduler, to + rate limit IO writes for each merge depending on incoming merge + rate. (Mike McCandless) + Optimizations * LUCENE-5960: Use a more efficient bitset, not a Set, to diff --git a/lucene/codecs/src/java/org/apache/lucene/codecs/simpletext/SimpleTextCompoundFormat.java b/lucene/codecs/src/java/org/apache/lucene/codecs/simpletext/SimpleTextCompoundFormat.java index cef75aa0c1e..f814e3d67b7 100644 --- a/lucene/codecs/src/java/org/apache/lucene/codecs/simpletext/SimpleTextCompoundFormat.java +++ b/lucene/codecs/src/java/org/apache/lucene/codecs/simpletext/SimpleTextCompoundFormat.java @@ -29,7 +29,6 @@ import java.util.Locale; import org.apache.lucene.codecs.CompoundFormat; import org.apache.lucene.index.CorruptIndexException; -import org.apache.lucene.index.MergeState.CheckAbort; import org.apache.lucene.index.IndexFileNames; import org.apache.lucene.index.SegmentInfo; import org.apache.lucene.store.Directory; @@ -157,7 +156,7 @@ public class SimpleTextCompoundFormat extends CompoundFormat { } @Override - public void write(Directory dir, SegmentInfo si, Collection files, CheckAbort checkAbort, IOContext context) throws IOException { + public void write(Directory dir, SegmentInfo si, Collection files, IOContext context) throws IOException { String dataFile = IndexFileNames.segmentFileName(si.name, "", DATA_EXTENSION); int numFiles = files.size(); @@ -181,8 +180,6 @@ public class SimpleTextCompoundFormat extends CompoundFormat { out.copyBytes(in, in.length()); } endOffsets[i] = out.getFilePointer(); - - checkAbort.work(endOffsets[i] - startOffsets[i]); } long tocPos = out.getFilePointer(); diff --git a/lucene/core/src/java/org/apache/lucene/codecs/CompoundFormat.java b/lucene/core/src/java/org/apache/lucene/codecs/CompoundFormat.java index 29556305667..87a492cefd9 100644 --- a/lucene/core/src/java/org/apache/lucene/codecs/CompoundFormat.java +++ b/lucene/core/src/java/org/apache/lucene/codecs/CompoundFormat.java @@ -20,7 +20,6 @@ package org.apache.lucene.codecs; import java.io.IOException; import java.util.Collection; -import org.apache.lucene.index.MergeState.CheckAbort; import org.apache.lucene.index.SegmentInfo; import org.apache.lucene.store.Directory; import org.apache.lucene.store.IOContext; @@ -47,8 +46,7 @@ public abstract class CompoundFormat { /** * Packs the provided files into a compound format. */ - // TODO: get checkAbort out of here, and everywhere, and have iw do it at a higher level - public abstract void write(Directory dir, SegmentInfo si, Collection files, CheckAbort checkAbort, IOContext context) throws IOException; + public abstract void write(Directory dir, SegmentInfo si, Collection files, IOContext context) throws IOException; /** * Returns the compound file names used by this segment. diff --git a/lucene/core/src/java/org/apache/lucene/codecs/StoredFieldsWriter.java b/lucene/core/src/java/org/apache/lucene/codecs/StoredFieldsWriter.java index d8be8ae8047..9d85f12c076 100644 --- a/lucene/core/src/java/org/apache/lucene/codecs/StoredFieldsWriter.java +++ b/lucene/core/src/java/org/apache/lucene/codecs/StoredFieldsWriter.java @@ -96,7 +96,6 @@ public abstract class StoredFieldsWriter implements Closeable { storedFieldsReader.visitDocument(docID, visitor); finishDocument(); docCount++; - mergeState.checkAbort.work(300); } } finish(mergeState.mergeFieldInfos, docCount); diff --git a/lucene/core/src/java/org/apache/lucene/codecs/TermVectorsWriter.java b/lucene/core/src/java/org/apache/lucene/codecs/TermVectorsWriter.java index cadc548d88b..af12f0938a6 100644 --- a/lucene/core/src/java/org/apache/lucene/codecs/TermVectorsWriter.java +++ b/lucene/core/src/java/org/apache/lucene/codecs/TermVectorsWriter.java @@ -196,7 +196,6 @@ public abstract class TermVectorsWriter implements Closeable { } addAllDocVectors(vectors, mergeState); docCount++; - mergeState.checkAbort.work(300); } } finish(mergeState.mergeFieldInfos, docCount); diff --git a/lucene/core/src/java/org/apache/lucene/codecs/compressing/CompressingStoredFieldsWriter.java b/lucene/core/src/java/org/apache/lucene/codecs/compressing/CompressingStoredFieldsWriter.java index 1a2ca2f2b74..22a11993cec 100644 --- a/lucene/core/src/java/org/apache/lucene/codecs/compressing/CompressingStoredFieldsWriter.java +++ b/lucene/core/src/java/org/apache/lucene/codecs/compressing/CompressingStoredFieldsWriter.java @@ -506,7 +506,6 @@ public final class CompressingStoredFieldsWriter extends StoredFieldsWriter { storedFieldsReader.visitDocument(docID, visitor); finishDocument(); ++docCount; - mergeState.checkAbort.work(300); } } else { // optimized merge, we copy serialized (but decompressed) bytes directly @@ -522,7 +521,6 @@ public final class CompressingStoredFieldsWriter extends StoredFieldsWriter { numStoredFieldsInDoc = doc.numStoredFields; finishDocument(); ++docCount; - mergeState.checkAbort.work(300); } } } diff --git a/lucene/core/src/java/org/apache/lucene/codecs/compressing/CompressingTermVectorsWriter.java b/lucene/core/src/java/org/apache/lucene/codecs/compressing/CompressingTermVectorsWriter.java index a9cfdc73f51..9aa42c930f8 100644 --- a/lucene/core/src/java/org/apache/lucene/codecs/compressing/CompressingTermVectorsWriter.java +++ b/lucene/core/src/java/org/apache/lucene/codecs/compressing/CompressingTermVectorsWriter.java @@ -751,7 +751,6 @@ public final class CompressingTermVectorsWriter extends TermVectorsWriter { } addAllDocVectors(vectors, mergeState); ++docCount; - mergeState.checkAbort.work(300); } } else { final CompressingStoredFieldsIndexReader index = matchingVectorsReader.getIndex(); @@ -781,7 +780,6 @@ public final class CompressingTermVectorsWriter extends TermVectorsWriter { this.vectorsStream.copyBytes(vectorsStream, chunkLength); docCount += chunkDocs; this.numDocs += chunkDocs; - mergeState.checkAbort.work(300 * chunkDocs); i = nextLiveDoc(docBase + chunkDocs, liveDocs, maxDoc); } else { for (; i < docBase + chunkDocs; i = nextLiveDoc(i + 1, liveDocs, maxDoc)) { @@ -793,7 +791,6 @@ public final class CompressingTermVectorsWriter extends TermVectorsWriter { } addAllDocVectors(vectors, mergeState); ++docCount; - mergeState.checkAbort.work(300); } } } else { @@ -805,7 +802,6 @@ public final class CompressingTermVectorsWriter extends TermVectorsWriter { } addAllDocVectors(vectors, mergeState); ++docCount; - mergeState.checkAbort.work(300); i = nextLiveDoc(i + 1, liveDocs, maxDoc); } } diff --git a/lucene/core/src/java/org/apache/lucene/codecs/lucene50/Lucene50CompoundFormat.java b/lucene/core/src/java/org/apache/lucene/codecs/lucene50/Lucene50CompoundFormat.java index 1c4443acc48..f9710b9a576 100644 --- a/lucene/core/src/java/org/apache/lucene/codecs/lucene50/Lucene50CompoundFormat.java +++ b/lucene/core/src/java/org/apache/lucene/codecs/lucene50/Lucene50CompoundFormat.java @@ -23,7 +23,6 @@ import java.util.Collection; import org.apache.lucene.codecs.CodecUtil; import org.apache.lucene.codecs.CompoundFormat; import org.apache.lucene.index.IndexFileNames; -import org.apache.lucene.index.MergeState.CheckAbort; import org.apache.lucene.index.SegmentInfo; import org.apache.lucene.store.DataOutput; import org.apache.lucene.store.Directory; @@ -73,7 +72,7 @@ public final class Lucene50CompoundFormat extends CompoundFormat { } @Override - public void write(Directory dir, SegmentInfo si, Collection files, CheckAbort checkAbort, IOContext context) throws IOException { + public void write(Directory dir, SegmentInfo si, Collection files, IOContext context) throws IOException { String dataFile = IndexFileNames.segmentFileName(si.name, "", DATA_EXTENSION); String entriesFile = IndexFileNames.segmentFileName(si.name, "", ENTRIES_EXTENSION); @@ -99,8 +98,6 @@ public final class Lucene50CompoundFormat extends CompoundFormat { entries.writeString(IndexFileNames.stripSegmentName(file)); entries.writeLong(startOffset); entries.writeLong(length); - - checkAbort.work(length); } CodecUtil.writeFooter(data); diff --git a/lucene/core/src/java/org/apache/lucene/index/BufferedUpdatesStream.java b/lucene/core/src/java/org/apache/lucene/index/BufferedUpdatesStream.java index 98b22abb549..be33c6c8105 100644 --- a/lucene/core/src/java/org/apache/lucene/index/BufferedUpdatesStream.java +++ b/lucene/core/src/java/org/apache/lucene/index/BufferedUpdatesStream.java @@ -23,6 +23,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.List; +import java.util.Locale; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -173,16 +174,19 @@ class BufferedUpdatesStream implements Accountable { Collections.sort(infos2, sortSegInfoByDelGen); CoalescedUpdates coalescedDeletes = null; - boolean anyNewDeletes = false; int infosIDX = infos2.size()-1; int delIDX = updates.size()-1; + long totDelCount = 0; + long totTermVisitedCount = 0; + List allDeleted = null; while (infosIDX >= 0) { //System.out.println("BD: cycle delIDX=" + delIDX + " infoIDX=" + infosIDX); + final long segStartNS = System.nanoTime(); final FrozenBufferedUpdates packet = delIDX >= 0 ? updates.get(delIDX) : null; final SegmentCommitInfo info = infos2.get(infosIDX); final long segGen = info.getBufferedDeletesGen(); @@ -213,12 +217,14 @@ class BufferedUpdatesStream implements Accountable { final ReadersAndUpdates rld = readerPool.get(info, true); final SegmentReader reader = rld.getReader(IOContext.READ); int delCount = 0; + long termVisitedCount = 0; final boolean segAllDeletes; try { final DocValuesFieldUpdates.Container dvUpdates = new DocValuesFieldUpdates.Container(); if (coalescedDeletes != null) { - //System.out.println(" del coalesced"); - delCount += applyTermDeletes(coalescedDeletes.termsIterable(), rld, reader); + TermDeleteCounts counts = applyTermDeletes(coalescedDeletes.termsIterable(), rld, reader); + delCount += counts.delCount; + termVisitedCount += counts.termVisitedCount; delCount += applyQueryDeletes(coalescedDeletes.queriesIterable(), rld, reader); applyDocValuesUpdates(coalescedDeletes.numericDVUpdates, rld, reader, dvUpdates); applyDocValuesUpdates(coalescedDeletes.binaryDVUpdates, rld, reader, dvUpdates); @@ -239,7 +245,8 @@ class BufferedUpdatesStream implements Accountable { rld.release(reader); readerPool.release(rld); } - anyNewDeletes |= delCount > 0; + totDelCount += delCount; + totTermVisitedCount += termVisitedCount; if (segAllDeletes) { if (allDeleted == null) { @@ -249,7 +256,7 @@ class BufferedUpdatesStream implements Accountable { } if (infoStream.isEnabled("BD")) { - infoStream.message("BD", "seg=" + info + " segGen=" + segGen + " segDeletes=[" + packet + "]; coalesced deletes=[" + (coalescedDeletes == null ? "null" : coalescedDeletes) + "] newDelCount=" + delCount + (segAllDeletes ? " 100% deleted" : "")); + infoStream.message("BD", String.format(Locale.ROOT, "%.3fs", ((System.nanoTime() - segStartNS)/1000000000.0)) + " seg=" + info + " segGen=" + segGen + " segDeletes=[" + packet + "]; coalesced deletes=[" + (coalescedDeletes == null ? "null" : coalescedDeletes) + "] newDelCount=" + delCount + " termVisitedCount=" + termVisitedCount + (segAllDeletes ? " 100% deleted" : "")); } if (coalescedDeletes == null) { @@ -274,9 +281,12 @@ class BufferedUpdatesStream implements Accountable { final ReadersAndUpdates rld = readerPool.get(info, true); final SegmentReader reader = rld.getReader(IOContext.READ); int delCount = 0; + long termVisitedCount = 0; final boolean segAllDeletes; try { - delCount += applyTermDeletes(coalescedDeletes.termsIterable(), rld, reader); + TermDeleteCounts counts = applyTermDeletes(coalescedDeletes.termsIterable(), rld, reader); + delCount += counts.delCount; + termVisitedCount += counts.termVisitedCount; delCount += applyQueryDeletes(coalescedDeletes.queriesIterable(), rld, reader); DocValuesFieldUpdates.Container dvUpdates = new DocValuesFieldUpdates.Container(); applyDocValuesUpdates(coalescedDeletes.numericDVUpdates, rld, reader, dvUpdates); @@ -291,7 +301,9 @@ class BufferedUpdatesStream implements Accountable { rld.release(reader); readerPool.release(rld); } - anyNewDeletes |= delCount > 0; + + totDelCount += delCount; + totTermVisitedCount += termVisitedCount; if (segAllDeletes) { if (allDeleted == null) { @@ -301,7 +313,7 @@ class BufferedUpdatesStream implements Accountable { } if (infoStream.isEnabled("BD")) { - infoStream.message("BD", "seg=" + info + " segGen=" + segGen + " coalesced deletes=[" + coalescedDeletes + "] newDelCount=" + delCount + (segAllDeletes ? " 100% deleted" : "")); + infoStream.message("BD", String.format(Locale.ROOT, "%.3fs", ((System.nanoTime() - segStartNS)/1000000000.0)) + " seg=" + info + " segGen=" + segGen + " coalesced deletes=[" + coalescedDeletes + "] newDelCount=" + delCount + " termVisitedCount=" + termVisitedCount + (segAllDeletes ? " 100% deleted" : "")); } } info.setBufferedDeletesGen(gen); @@ -312,11 +324,11 @@ class BufferedUpdatesStream implements Accountable { assert checkDeleteStats(); if (infoStream.isEnabled("BD")) { - infoStream.message("BD", "applyDeletes took " + (System.currentTimeMillis()-t0) + " msec"); + infoStream.message("BD", "applyDeletes took " + (System.currentTimeMillis()-t0) + " msec for " + infos.size() + " segments, " + totDelCount + " deleted docs, " + totTermVisitedCount + " visited terms"); } // assert infos != segmentInfos || !any() : "infos=" + infos + " segmentInfos=" + segmentInfos + " any=" + any; - return new ApplyDeletesResult(anyNewDeletes, gen, allDeleted); + return new ApplyDeletesResult(totDelCount > 0, gen, allDeleted); } synchronized long getNextGen() { @@ -374,9 +386,23 @@ class BufferedUpdatesStream implements Accountable { } } + private static class TermDeleteCounts { + /** How many documents were actually deleted. */ + public final int delCount; + + /** How many terms we checked. */ + public final long termVisitedCount; + + public TermDeleteCounts(int delCount, long termVisitedCount) { + this.delCount = delCount; + this.termVisitedCount = termVisitedCount; + } + } + // Delete by Term - private synchronized long applyTermDeletes(Iterable termsIter, ReadersAndUpdates rld, SegmentReader reader) throws IOException { - long delCount = 0; + private synchronized TermDeleteCounts applyTermDeletes(Iterable termsIter, ReadersAndUpdates rld, SegmentReader reader) throws IOException { + int delCount = 0; + long termVisitedCount = 0; Fields fields = reader.fields(); TermsEnum termsEnum = null; @@ -388,8 +414,10 @@ class BufferedUpdatesStream implements Accountable { boolean any = false; - //System.out.println(Thread.currentThread().getName() + " del terms reader=" + reader); + long ns = System.nanoTime(); + for (Term term : termsIter) { + termVisitedCount++; // Since we visit terms sorted, we gain performance // by re-using the same TermsEnum and seeking only // forwards @@ -440,7 +468,7 @@ class BufferedUpdatesStream implements Accountable { } } - return delCount; + return new TermDeleteCounts(delCount, termVisitedCount); } // DocValues updates diff --git a/lucene/core/src/java/org/apache/lucene/index/CoalescedUpdates.java b/lucene/core/src/java/org/apache/lucene/index/CoalescedUpdates.java index 61ecf7938d6..3580ba44c05 100644 --- a/lucene/core/src/java/org/apache/lucene/index/CoalescedUpdates.java +++ b/lucene/core/src/java/org/apache/lucene/index/CoalescedUpdates.java @@ -18,15 +18,15 @@ package org.apache.lucene.index; */ import java.util.ArrayList; -import java.util.Iterator; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; -import org.apache.lucene.search.Query; import org.apache.lucene.index.BufferedUpdatesStream.QueryAndLimit; import org.apache.lucene.index.DocValuesUpdate.BinaryDocValuesUpdate; import org.apache.lucene.index.DocValuesUpdate.NumericDocValuesUpdate; +import org.apache.lucene.search.Query; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.MergedIterator; @@ -35,16 +35,19 @@ class CoalescedUpdates { final List> iterables = new ArrayList<>(); final List numericDVUpdates = new ArrayList<>(); final List binaryDVUpdates = new ArrayList<>(); + int totalTermCount; @Override public String toString() { // note: we could add/collect more debugging information - return "CoalescedUpdates(termSets=" + iterables.size() + ",queries=" - + queries.size() + ",numericDVUpdates=" + numericDVUpdates.size() - + ",binaryDVUpdates=" + binaryDVUpdates.size() + ")"; + return "CoalescedUpdates(termSets=" + iterables.size() + + ",totalTermCount=" + totalTermCount + + ",queries=" + queries.size() + ",numericDVUpdates=" + numericDVUpdates.size() + + ",binaryDVUpdates=" + binaryDVUpdates.size() + ")"; } void update(FrozenBufferedUpdates in) { + totalTermCount += in.termCount; iterables.add(in.termsIterable()); for (int queryIdx = 0; queryIdx < in.queries.length; queryIdx++) { diff --git a/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java b/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java index 78633f588ce..586b2a0c5e6 100644 --- a/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java +++ b/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java @@ -19,9 +19,11 @@ package org.apache.lucene.index; import java.io.IOException; import java.util.ArrayList; -import java.util.Comparator; import java.util.List; +import java.util.Locale; +import org.apache.lucene.index.MergePolicy.OneMerge; +import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.Directory; import org.apache.lucene.util.CollectionUtil; import org.apache.lucene.util.IOUtils; @@ -54,18 +56,17 @@ import org.apache.lucene.util.ThreadInterruptedException; * settings for spinning or solid state disks for such * operating systems, use {@link #setDefaultMaxMergesAndThreads(boolean)}. */ + public class ConcurrentMergeScheduler extends MergeScheduler { /** Dynamic default for {@code maxThreadCount} and {@code maxMergeCount}, * used to detect whether the index is backed by an SSD or rotational disk and * set {@code maxThreadCount} accordingly. If it's an SSD, - * {@code maxThreadCount} is set to {@code max(1, min(3, cpuCoreCount/2))}, + * {@code maxThreadCount} is set to {@code max(1, min(4, cpuCoreCount/2))}, * otherwise 1. Note that detection only currently works on * Linux; other platforms will assume the index is not on an SSD. */ public static final int AUTO_DETECT_MERGES_AND_THREADS = -1; - private int mergeThreadPriority = -1; - /** List of currently active {@link MergeThread}s. */ protected final List mergeThreads = new ArrayList<>(); @@ -81,16 +82,27 @@ public class ConcurrentMergeScheduler extends MergeScheduler { // throttling the incoming threads private int maxMergeCount = AUTO_DETECT_MERGES_AND_THREADS; - /** {@link Directory} that holds the index. */ - protected Directory dir; - - /** {@link IndexWriter} that owns this instance. */ - protected IndexWriter writer; - /** How many {@link MergeThread}s have kicked off (this is use * to name them). */ protected int mergeThreadCount; + /** Floor for IO write rate limit (we will never go any lower than this) */ + private static final double MIN_MERGE_MB_PER_SEC = 5.0; + + /** Initial value for IO write rate limit when doAutoIOThrottle is true */ + private static final double START_MB_PER_SEC = 20.0; + + /** Merges below this size are not counted in the maxThreadCount, i.e. they can freely run in their own thread (up until maxMergeCount). */ + private static final double MIN_BIG_MERGE_MB = 50.0; + + /** Current IO writes throttle rate */ + protected double targetMBPerSec = START_MB_PER_SEC; + + /** true if we should rate-limit writes for each merge */ + private boolean doAutoIOThrottle = true; + + private double forceMergeMBPerSec = Double.POSITIVE_INFINITY; + /** Sole constructor, with all settings set to default * values. */ public ConcurrentMergeScheduler() { @@ -142,10 +154,48 @@ public class ConcurrentMergeScheduler extends MergeScheduler { public synchronized void setDefaultMaxMergesAndThreads(boolean spins) { if (spins) { maxThreadCount = 1; - maxMergeCount = 2; + maxMergeCount = 6; } else { - maxThreadCount = Math.max(1, Math.min(3, Runtime.getRuntime().availableProcessors()/2)); - maxMergeCount = maxThreadCount+2; + maxThreadCount = Math.max(1, Math.min(4, Runtime.getRuntime().availableProcessors()/2)); + maxMergeCount = maxThreadCount+5; + } + } + + /** Set the per-merge IO throttle rate for forced merges (default: {@code Double.POSITIVE_INFINITY}). */ + public synchronized void setForceMergeMBPerSec(double v) { + forceMergeMBPerSec = v; + updateMergeThreads(); + } + + /** Get the per-merge IO throttle rate for forced merges. */ + public synchronized double getForceMergeMBPerSec() { + return forceMergeMBPerSec; + } + + /** Turn on dynamic IO throttling, to adaptively rate limit writes + * bytes/sec to the minimal rate necessary so merges do not fall behind. + * By default this is enabled. */ + public synchronized void enableAutoIOThrottle() { + doAutoIOThrottle = true; + targetMBPerSec = START_MB_PER_SEC; + updateMergeThreads(); + } + + /** Turn off auto IO throttling. + * + * @see #enableAutoIOThrottle */ + public synchronized void disableAutoIOThrottle() { + doAutoIOThrottle = false; + updateMergeThreads(); + } + + /** Returns the currently set per-merge IO writes rate limit, if {@link #enableAutoIOThrottle} + * was called, else {@code Double.POSITIVE_INFINITY}. */ + public synchronized double getIORateLimitMBPerSec() { + if (doAutoIOThrottle) { + return targetMBPerSec; + } else { + return Double.POSITIVE_INFINITY; } } @@ -161,48 +211,18 @@ public class ConcurrentMergeScheduler extends MergeScheduler { return maxMergeCount; } - /** Return the priority that merge threads run at. By - * default the priority is 1 plus the priority of (ie, - * slightly higher priority than) the first thread that - * calls merge. */ - public synchronized int getMergeThreadPriority() { - initMergeThreadPriority(); - return mergeThreadPriority; + synchronized void removeMergeThread(MergeThread thread) { + boolean result = mergeThreads.remove(thread); + assert result; } - /** Set the base priority that merge threads run at. - * Note that CMS may increase priority of some merge - * threads beyond this base priority. It's best not to - * set this any higher than - * Thread.MAX_PRIORITY-maxThreadCount, so that CMS has - * room to set relative priority among threads. */ - public synchronized void setMergeThreadPriority(int pri) { - if (pri > Thread.MAX_PRIORITY || pri < Thread.MIN_PRIORITY) - throw new IllegalArgumentException("priority must be in range " + Thread.MIN_PRIORITY + " .. " + Thread.MAX_PRIORITY + " inclusive"); - mergeThreadPriority = pri; - updateMergeThreads(); - } - - /** Sorts {@link MergeThread}s; larger merges come first. */ - protected static final Comparator compareByMergeDocCount = new Comparator() { - @Override - public int compare(MergeThread t1, MergeThread t2) { - final MergePolicy.OneMerge m1 = t1.getCurrentMerge(); - final MergePolicy.OneMerge m2 = t2.getCurrentMerge(); - - final int c1 = m1 == null ? Integer.MAX_VALUE : m1.totalDocCount; - final int c2 = m2 == null ? Integer.MAX_VALUE : m2.totalDocCount; - - return c2 - c1; - } - }; - /** - * Called whenever the running merges have changed, to pause and unpause - * threads. This method sorts the merge threads by their merge size in + * Called whenever the running merges have changed, to set merge IO limits. + * This method sorts the merge threads by their merge size in * descending order and then pauses/unpauses threads from first to last -- * that way, smaller merges are guaranteed to run before larger ones. */ + protected synchronized void updateMergeThreads() { // Only look at threads that are alive & not in the @@ -217,93 +237,121 @@ public class ConcurrentMergeScheduler extends MergeScheduler { mergeThreads.remove(threadIdx); continue; } - if (mergeThread.getCurrentMerge() != null) { - activeMerges.add(mergeThread); - } + activeMerges.add(mergeThread); threadIdx++; } - // Sort the merge threads in descending order. - CollectionUtil.timSort(activeMerges, compareByMergeDocCount); - - int pri = mergeThreadPriority; + // Sort the merge threads, largest first: + CollectionUtil.timSort(activeMerges); + final int activeMergeCount = activeMerges.size(); - for (threadIdx=0;threadIdx=0;threadIdx--) { + MergeThread mergeThread = activeMerges.get(threadIdx); + if (mergeThread.merge.estimatedMergeBytes > MIN_BIG_MERGE_MB*1024*1024) { + bigMergeCount = 1+threadIdx; + break; } + } + + long now = System.nanoTime(); + + StringBuilder message; + if (verbose()) { + message = new StringBuilder(); + message.append(String.format(Locale.ROOT, "updateMergeThreads ioThrottle=%s targetMBPerSec=%.1f MB/sec", doAutoIOThrottle, targetMBPerSec)); + } else { + message = null; + } + + for (threadIdx=0;threadIdx - * if (verbose()) { - * message("your message"); - * } - * - */ - protected boolean verbose() { - return writer != null && writer.infoStream.isEnabled("CMS"); - } - - /** - * Outputs the given message - this method assumes {@link #verbose()} was - * called and returned true. - */ - protected void message(String message) { - writer.infoStream.message("CMS", message); - } - - private synchronized void initMergeThreadPriority() { - if (mergeThreadPriority == -1) { - // Default to slightly higher priority than our - // calling thread - mergeThreadPriority = 1+Thread.currentThread().getPriority(); - if (mergeThreadPriority > Thread.MAX_PRIORITY) - mergeThreadPriority = Thread.MAX_PRIORITY; - } - } - - private synchronized void initMaxMergesAndThreads() throws IOException { + private synchronized void initDynamicDefaults(IndexWriter writer) throws IOException { if (maxThreadCount == AUTO_DETECT_MERGES_AND_THREADS) { - assert writer != null; boolean spins = IOUtils.spins(writer.getDirectory()); setDefaultMaxMergesAndThreads(spins); if (verbose()) { - message("initMaxMergesAndThreads spins=" + spins + " maxThreadCount=" + maxThreadCount + " maxMergeCount=" + maxMergeCount); + message("initDynamicDefaults spins=" + spins + " maxThreadCount=" + maxThreadCount + " maxMergeCount=" + maxMergeCount); } } } + private static String rateToString(double mbPerSec) { + if (mbPerSec == 0.0) { + return "stopped"; + } else if (mbPerSec == Double.POSITIVE_INFINITY) { + return "unlimited"; + } else { + return String.format(Locale.ROOT, "%.1f MB/sec", mbPerSec); + } + } + @Override public void close() { sync(); @@ -346,12 +394,9 @@ public class ConcurrentMergeScheduler extends MergeScheduler { */ protected synchronized int mergeThreadCount() { int count = 0; - for (MergeThread mt : mergeThreads) { - if (mt.isAlive()) { - MergePolicy.OneMerge merge = mt.getCurrentMerge(); - if (merge != null && merge.isAborted() == false) { - count++; - } + for (MergeThread mergeThread : mergeThreads) { + if (mergeThread.isAlive() && mergeThread.merge.rateLimiter.getAbort() == false) { + count++; } } return count; @@ -362,12 +407,7 @@ public class ConcurrentMergeScheduler extends MergeScheduler { assert !Thread.holdsLock(writer); - this.writer = writer; - - initMergeThreadPriority(); - initMaxMergesAndThreads(); - - dir = writer.getDirectory(); + initDynamicDefaults(writer); // First, quickly run through the newly proposed merges // and add any orthogonal merges (ie a merge not @@ -385,9 +425,9 @@ public class ConcurrentMergeScheduler extends MergeScheduler { // pending merges, until it's empty: while (true) { - maybeStall(); + maybeStall(writer); - MergePolicy.OneMerge merge = writer.getNextMerge(); + OneMerge merge = writer.getNextMerge(); if (merge == null) { if (verbose()) { message(" no more merges pending; now return"); @@ -395,6 +435,8 @@ public class ConcurrentMergeScheduler extends MergeScheduler { return; } + updateIOThrottle(merge); + boolean success = false; try { if (verbose()) { @@ -405,17 +447,13 @@ public class ConcurrentMergeScheduler extends MergeScheduler { // merge: final MergeThread merger = getMergeThread(writer, merge); mergeThreads.add(merger); + if (verbose()) { message(" launch new thread [" + merger.getName() + "]"); } merger.start(); - // Must call this after starting the thread else - // the new thread is removed from mergeThreads - // (since it's not alive yet): - updateMergeThreads(); - success = true; } finally { if (!success) { @@ -433,7 +471,7 @@ public class ConcurrentMergeScheduler extends MergeScheduler { * as limiting how many threads are allowed to index, can do nothing * here and throttle elsewhere. */ - protected synchronized void maybeStall() { + protected synchronized void maybeStall(IndexWriter writer) { long startStallTime = 0; while (writer.hasPendingMerges() && mergeThreadCount() >= maxMergeCount) { // This means merging has fallen too far behind: we @@ -465,127 +503,78 @@ public class ConcurrentMergeScheduler extends MergeScheduler { } /** Does the actual merge, by calling {@link IndexWriter#merge} */ - protected void doMerge(MergePolicy.OneMerge merge) throws IOException { + protected void doMerge(IndexWriter writer, OneMerge merge) throws IOException { writer.merge(merge); } /** Create and return a new MergeThread */ - protected synchronized MergeThread getMergeThread(IndexWriter writer, MergePolicy.OneMerge merge) throws IOException { + protected synchronized MergeThread getMergeThread(IndexWriter writer, OneMerge merge) throws IOException { final MergeThread thread = new MergeThread(writer, merge); - thread.setThreadPriority(mergeThreadPriority); thread.setDaemon(true); thread.setName("Lucene Merge Thread #" + mergeThreadCount++); return thread; } - /** Runs a merge thread, which may run one or more merges - * in sequence. */ - protected class MergeThread extends Thread { + /** Runs a merge thread to execute a single merge, then exits. */ + protected class MergeThread extends Thread implements Comparable { - IndexWriter tWriter; - MergePolicy.OneMerge startMerge; - MergePolicy.OneMerge runningMerge; - private volatile boolean done; + final IndexWriter writer; + final OneMerge merge; /** Sole constructor. */ - public MergeThread(IndexWriter writer, MergePolicy.OneMerge startMerge) { - this.tWriter = writer; - this.startMerge = startMerge; + public MergeThread(IndexWriter writer, OneMerge merge) { + this.writer = writer; + this.merge = merge; } - - /** Record the currently running merge. */ - public synchronized void setRunningMerge(MergePolicy.OneMerge merge) { - runningMerge = merge; - } - - /** Return the currently running merge. */ - public synchronized MergePolicy.OneMerge getRunningMerge() { - return runningMerge; - } - - /** Return the current merge, or null if this {@code - * MergeThread} is done. */ - public synchronized MergePolicy.OneMerge getCurrentMerge() { - if (done) { - return null; - } else if (runningMerge != null) { - return runningMerge; - } else { - return startMerge; - } - } - - /** Set the priority of this thread. */ - public void setThreadPriority(int pri) { - try { - setPriority(pri); - } catch (NullPointerException npe) { - // Strangely, Sun's JDK 1.5 on Linux sometimes - // throws NPE out of here... - } catch (SecurityException se) { - // Ignore this because we will still run fine with - // normal thread priority - } + + @Override + public int compareTo(MergeThread other) { + // Larger merges sort first: + return Long.compare(other.merge.estimatedMergeBytes, merge.estimatedMergeBytes); } @Override public void run() { - - // First time through the while loop we do the merge - // that we were started with: - MergePolicy.OneMerge merge = this.startMerge; - + try { if (verbose()) { message(" merge thread: start"); } - while(true) { - setRunningMerge(merge); - doMerge(merge); - - // Subsequent times through the loop we do any new - // merge that writer says is necessary: - merge = tWriter.getNextMerge(); - - // Notify here in case any threads were stalled; - // they will notice that the pending merge has - // been pulled and possibly resume: - synchronized(ConcurrentMergeScheduler.this) { - ConcurrentMergeScheduler.this.notifyAll(); - } - - if (merge != null) { - updateMergeThreads(); - if (verbose()) { - message(" merge thread: do another merge " + tWriter.segString(merge.segments)); - } - } else { - break; - } - } + doMerge(writer, merge); if (verbose()) { message(" merge thread: done"); } + removeMergeThread(this); + + // Let CMS run new merges if necessary: + try { + merge(writer, MergeTrigger.MERGE_FINISHED, true); + } catch (AlreadyClosedException ace) { + // OK + } catch (IOException ioe) { + throw new RuntimeException(ioe); + } + } catch (Throwable exc) { - // Ignore the exception if it was due to abort: - if (!(exc instanceof MergePolicy.MergeAbortedException)) { - //System.out.println(Thread.currentThread().getName() + ": CMS: exc"); - //exc.printStackTrace(System.out); - if (!suppressExceptions) { - // suppressExceptions is normally only set during - // testing. - handleMergeException(exc); - } + if (exc instanceof MergePolicy.MergeAbortedException) { + // OK to ignore + } else if (suppressExceptions == false) { + // suppressExceptions is normally only set during + // testing. + handleMergeException(writer.getDirectory(), exc); } + } finally { - done = true; synchronized(ConcurrentMergeScheduler.this) { updateMergeThreads(); + + // In case we had stalled indexing, we can now wake up + // and possibly unstall: ConcurrentMergeScheduler.this.notifyAll(); } } @@ -594,7 +583,7 @@ public class ConcurrentMergeScheduler extends MergeScheduler { /** Called when an exception is hit in a background merge * thread */ - protected void handleMergeException(Throwable exc) { + protected void handleMergeException(Directory dir, Throwable exc) { try { // When an exception is hit during merge, IndexWriter // removes any partial files and then allows another @@ -606,6 +595,7 @@ public class ConcurrentMergeScheduler extends MergeScheduler { } catch (InterruptedException ie) { throw new ThreadInterruptedException(ie); } + throw new MergePolicy.MergeException(exc, dir); } @@ -626,7 +616,115 @@ public class ConcurrentMergeScheduler extends MergeScheduler { StringBuilder sb = new StringBuilder(getClass().getSimpleName() + ": "); sb.append("maxThreadCount=").append(maxThreadCount).append(", "); sb.append("maxMergeCount=").append(maxMergeCount).append(", "); - sb.append("mergeThreadPriority=").append(mergeThreadPriority); + sb.append("ioThrottle=").append(doAutoIOThrottle); return sb.toString(); } + + private boolean isBacklog(long now, OneMerge merge) { + double mergeMB = bytesToMB(merge.estimatedMergeBytes); + for (MergeThread mergeThread : mergeThreads) { + long mergeStartNS = mergeThread.merge.mergeStartNS; + if (mergeThread.isAlive() && mergeThread.merge != merge && + mergeStartNS != -1 && + mergeThread.merge.estimatedMergeBytes >= MIN_BIG_MERGE_MB*1024*1024 && + nsToSec(now-mergeStartNS) > 3.0) { + double otherMergeMB = bytesToMB(mergeThread.merge.estimatedMergeBytes); + double ratio = otherMergeMB / mergeMB; + if (ratio > 0.3 && ratio < 3.0) { + return true; + } + } + } + + return false; + } + + /** Tunes IO throttle when a new merge starts. */ + private synchronized void updateIOThrottle(OneMerge merge) throws IOException { + if (doAutoIOThrottle == false) { + return; + } + + double mergeMB = bytesToMB(merge.estimatedMergeBytes); + if (mergeMB < MIN_BIG_MERGE_MB) { + // Only watch non-trivial merges for throttling; this is safe because the MP must eventually + // have to do larger merges: + return; + } + + long now = System.nanoTime(); + + // Simplistic closed-loop feedback control: if we find any other similarly + // sized merges running, then we are falling behind, so we bump up the + // IO throttle, else we lower it: + boolean newBacklog = isBacklog(now, merge); + + boolean curBacklog = false; + + if (newBacklog == false) { + if (mergeThreads.size() > maxThreadCount) { + // If there are already more than the maximum merge threads allowed, count that as backlog: + curBacklog = true; + } else { + // Now see if any still-running merges are backlog'd: + for (MergeThread mergeThread : mergeThreads) { + if (isBacklog(now, mergeThread.merge)) { + curBacklog = true; + break; + } + } + } + } + + double curMBPerSec = targetMBPerSec; + + if (newBacklog) { + // This new merge adds to the backlog: increase IO throttle by 20% + targetMBPerSec *= 1.20; + if (targetMBPerSec > 10000) { + targetMBPerSec = 10000; + } + if (verbose()) { + if (curMBPerSec == targetMBPerSec) { + message(String.format(Locale.ROOT, "io throttle: new merge backlog; leave IO rate at ceiling %.1f MB/sec", targetMBPerSec)); + } else { + message(String.format(Locale.ROOT, "io throttle: new merge backlog; increase IO rate to %.1f MB/sec", targetMBPerSec)); + } + } + } else if (curBacklog) { + // We still have an existing backlog; leave the rate as is: + if (verbose()) { + message(String.format(Locale.ROOT, "io throttle: current merge backlog; leave IO rate at %.1f MB/sec", + targetMBPerSec)); + } + } else { + // We are not falling behind: decrease IO throttle by 10% + targetMBPerSec /= 1.10; + if (targetMBPerSec < MIN_MERGE_MB_PER_SEC) { + targetMBPerSec = MIN_MERGE_MB_PER_SEC; + } + if (verbose()) { + if (curMBPerSec == targetMBPerSec) { + message(String.format(Locale.ROOT, "io throttle: no merge backlog; leave IO rate at floor %.1f MB/sec", targetMBPerSec)); + } else { + message(String.format(Locale.ROOT, "io throttle: no merge backlog; decrease IO rate to %.1f MB/sec", targetMBPerSec)); + } + } + } + + targetMBPerSecChanged(); + updateMergeThreads(); + } + + /** Subclass can override to tweak targetMBPerSec. */ + protected void targetMBPerSecChanged() { + } + + private static double nsToSec(long ns) { + return ns / 1000000000.0; + } + + private static double bytesToMB(long bytes) { + return bytes/1024./1024.; + } } diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java index 11e25090ded..722b282760b 100644 --- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java +++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java @@ -20,9 +20,8 @@ package org.apache.lucene.index; import java.io.Closeable; import java.io.IOException; import java.util.Collection; -import java.util.HashSet; +import java.util.Locale; import java.util.Queue; -import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicInteger; @@ -35,7 +34,6 @@ import org.apache.lucene.search.Query; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.Directory; import org.apache.lucene.util.Accountable; -import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.InfoStream; /** @@ -553,11 +551,13 @@ final class DocumentsWriter implements Closeable, Accountable { final double ramBufferSizeMB = config.getRAMBufferSizeMB(); if (ramBufferSizeMB != IndexWriterConfig.DISABLE_AUTO_FLUSH && flushControl.getDeleteBytesUsed() > (1024*1024*ramBufferSizeMB/2)) { - if (infoStream.isEnabled("DW")) { - infoStream.message("DW", "force apply deletes bytesUsed=" + flushControl.getDeleteBytesUsed() + " vs ramBuffer=" + (1024*1024*ramBufferSizeMB)); - } hasEvents = true; if (!this.applyAllDeletes(deleteQueue)) { + if (infoStream.isEnabled("DW")) { + infoStream.message("DW", String.format(Locale.ROOT, "force apply deletes bytesUsed=%.1f MB vs ramBuffer=%.1f MB", + flushControl.getDeleteBytesUsed()/(1024.*1024.), + ramBufferSizeMB)); + } putEvent(ApplyDeletesEvent.INSTANCE); } } diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java index 54d65ec1351..b312c11948b 100644 --- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java +++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java @@ -485,7 +485,7 @@ class DocumentsWriterPerThread { try { if (indexWriterConfig.getUseCompoundFile()) { - filesToDelete.addAll(IndexWriter.createCompoundFile(infoStream, directory, MergeState.CheckAbort.NONE, newSegment.info, context)); + filesToDelete.addAll(IndexWriter.createCompoundFile(infoStream, directory, newSegment.info, context)); newSegment.info.setUseCompoundFile(true); } diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java index a8bda05f733..3f1ae7562df 100644 --- a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java +++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java @@ -47,18 +47,22 @@ import org.apache.lucene.index.DocValuesUpdate.BinaryDocValuesUpdate; import org.apache.lucene.index.DocValuesUpdate.NumericDocValuesUpdate; import org.apache.lucene.index.FieldInfos.FieldNumbers; import org.apache.lucene.index.IndexWriterConfig.OpenMode; -import org.apache.lucene.index.MergeState.CheckAbort; import org.apache.lucene.search.Query; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FilterDirectory; +import org.apache.lucene.store.FlushInfo; import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexOutput; import org.apache.lucene.store.Lock; import org.apache.lucene.store.LockObtainFailedException; import org.apache.lucene.store.MergeInfo; +import org.apache.lucene.store.RateLimitedIndexOutput; import org.apache.lucene.store.TrackingDirectoryWrapper; import org.apache.lucene.util.Accountable; import org.apache.lucene.util.Bits; import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.CloseableThreadLocal; import org.apache.lucene.util.Constants; import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.InfoStream; @@ -247,6 +251,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { volatile Throwable tragedy; private final Directory directory; // where this index resides + private final Directory mergeDirectory; // used for merging private final Analyzer analyzer; // how to analyze text private volatile long changeCount; // increments every time a change is completed @@ -319,6 +324,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { * card to make sure they can later charge you when you check out. */ final AtomicLong pendingNumDocs = new AtomicLong(); + final CloseableThreadLocal rateLimiters = new CloseableThreadLocal<>(); + DirectoryReader getReader() throws IOException { return getReader(true); } @@ -741,10 +748,17 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { public IndexWriter(Directory d, IndexWriterConfig conf) throws IOException { conf.setIndexWriter(this); // prevent reuse by other instances config = conf; + directory = d; + + // Directory we use for merging, so we can abort running merges, and so + // merge schedulers can optionally rate-limit per-merge IO: + mergeDirectory = addMergeRateLimiters(d); + analyzer = config.getAnalyzer(); infoStream = config.getInfoStream(); mergeScheduler = config.getMergeScheduler(); + mergeScheduler.setInfoStream(infoStream); codec = config.getCodec(); bufferedUpdatesStream = new BufferedUpdatesStream(infoStream); @@ -1696,7 +1710,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { for(int i=0;i 0; if (stopMerges) { - merge.abort(); + merge.rateLimiter.setAbort(); throw new MergePolicy.MergeAbortedException("merge is aborted: " + segString(merge.segments)); } @@ -3694,7 +3717,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { return; } - if (merge.isAborted()) { + if (merge.rateLimiter.getAbort()) { return; } @@ -3703,6 +3726,10 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { // and then open them again for merging. Maybe we // could pre-pool them somehow in that case... + if (infoStream.isEnabled("IW")) { + infoStream.message("IW", "now apply deletes for " + merge.segments.size() + " merging segments"); + } + // Lock order: IW -> BD final BufferedUpdatesStream.ApplyDeletesResult result = bufferedUpdatesStream.applyDeletesAndUpdates(readerPool, merge.segments); @@ -3839,14 +3866,13 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { * instance */ private int mergeMiddle(MergePolicy.OneMerge merge, MergePolicy mergePolicy) throws IOException { - merge.checkAborted(directory); + merge.rateLimiter.checkAbort(); List sourceSegments = merge.segments; IOContext context = new IOContext(merge.getMergeInfo()); - final MergeState.CheckAbort checkAbort = new MergeState.CheckAbort(merge, directory); - final TrackingDirectoryWrapper dirWrapper = new TrackingDirectoryWrapper(directory); + final TrackingDirectoryWrapper dirWrapper = new TrackingDirectoryWrapper(mergeDirectory); if (infoStream.isEnabled("IW")) { infoStream.message("IW", "merging " + segString(merge.segments)); @@ -3926,15 +3952,12 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { // OneMerge to return a view over the actual segments to merge final SegmentMerger merger = new SegmentMerger(merge.getMergeReaders(), merge.info.info, infoStream, dirWrapper, - checkAbort, globalFieldNumberMap, + globalFieldNumberMap, context); - merge.checkAborted(directory); + merge.rateLimiter.checkAbort(); - long mergeStartTime = 0; - if (infoStream.isEnabled("IW")) { - mergeStartTime = System.nanoTime(); - } + merge.mergeStartNS = System.nanoTime(); // This is where all the work happens: boolean success3 = false; @@ -3954,13 +3977,13 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { assert mergeState.segmentInfo == merge.info.info; merge.info.info.setFiles(new HashSet<>(dirWrapper.getCreatedFiles())); - // Record which codec was used to write the segment - if (infoStream.isEnabled("IW")) { if (merger.shouldMerge()) { long t1 = System.nanoTime(); - double sec = (t1-mergeStartTime)/1000000000.; + double sec = (t1-merge.mergeStartNS)/1000000000.; double segmentMB = (merge.info.sizeInBytes()/1024./1024.); + double stoppedSec = merge.rateLimiter.getTotalStoppedNS()/1000000000.; + double throttleSec = merge.rateLimiter.getTotalPausedNS()/1000000000.; infoStream.message("IW", "merge codec=" + codec + " docCount=" + merge.info.info.getDocCount() + "; merged segment has " + (mergeState.mergeFieldInfos.hasVectors() ? "vectors" : "no vectors") + "; " + (mergeState.mergeFieldInfos.hasNorms() ? "norms" : "no norms") + "; " + @@ -3968,8 +3991,10 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { (mergeState.mergeFieldInfos.hasProx() ? "prox" : "no prox") + "; " + (mergeState.mergeFieldInfos.hasProx() ? "freqs" : "no freqs") + "; " + String.format(Locale.ROOT, - "%d msec to merge segment [%.2f MB, %.2f MB/sec]", - ((t1-mergeStartTime)/1000000), + "%.1f sec (%.1f sec stopped, %.1f sec paused) to merge segment [%.2f MB, %.2f MB/sec]", + sec, + stoppedSec, + throttleSec, segmentMB, segmentMB / sec)); } else { @@ -4002,11 +4027,11 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { Collection filesToRemove = merge.info.files(); try { - filesToRemove = createCompoundFile(infoStream, directory, checkAbort, merge.info.info, context); + filesToRemove = createCompoundFile(infoStream, mergeDirectory, merge.info.info, context); success = true; } catch (IOException ioe) { synchronized(this) { - if (merge.isAborted()) { + if (merge.rateLimiter.getAbort()) { // This can happen if rollback or close(false) // is called -- fall through to logic below to // remove the partially created CFS: @@ -4042,7 +4067,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { // registered with IFD deleter.deleteNewFiles(filesToRemove); - if (merge.isAborted()) { + if (merge.rateLimiter.getAbort()) { if (infoStream.isEnabled("IW")) { infoStream.message("IW", "abort merge after building CFS"); } @@ -4496,7 +4521,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { * deletion files, this SegmentInfo must not reference such files when this * method is called, because they are not allowed within a compound file. */ - static final Collection createCompoundFile(InfoStream infoStream, Directory directory, CheckAbort checkAbort, final SegmentInfo info, IOContext context) + static final Collection createCompoundFile(InfoStream infoStream, Directory directory, final SegmentInfo info, IOContext context) throws IOException { // TODO: use trackingdirectorywrapper instead of files() to know which files to delete when things fail: @@ -4510,7 +4535,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { boolean success = false; try { - info.getCodec().compoundFormat().write(directory, info, files, checkAbort, context); + info.getCodec().compoundFormat().write(directory, info, files, context); success = true; } finally { if (!success) { @@ -4643,4 +4668,24 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { throw new IllegalStateException("number of documents in the index cannot exceed " + actualMaxDocs); } } + + /** Wraps the incoming {@link Directory} so that we assign a per-thread + * {@link MergeRateLimiter} to all created {@link IndexOutput}s. */ + private Directory addMergeRateLimiters(Directory in) { + return new FilterDirectory(in) { + @Override + public IndexOutput createOutput(String name, IOContext context) throws IOException { + ensureOpen(); + + // This Directory is only supposed to be used during merging, + // so all writes should have MERGE context, else there is a bug + // somewhere that is failing to pass down the right IOContext: + assert context.context == IOContext.Context.MERGE: "got context=" + context.context; + IndexOutput output = in.createOutput(name, context); + MergeRateLimiter rateLimiter = rateLimiters.get(); + assert rateLimiter != null; + return new RateLimitedIndexOutput(rateLimiter, output); + } + }; + } } diff --git a/lucene/core/src/java/org/apache/lucene/index/MappingMultiDocsAndPositionsEnum.java b/lucene/core/src/java/org/apache/lucene/index/MappingMultiDocsAndPositionsEnum.java index bcc3735ad5b..8fd316a5ca8 100644 --- a/lucene/core/src/java/org/apache/lucene/index/MappingMultiDocsAndPositionsEnum.java +++ b/lucene/core/src/java/org/apache/lucene/index/MappingMultiDocsAndPositionsEnum.java @@ -17,8 +17,8 @@ package org.apache.lucene.index; * limitations under the License. */ -import org.apache.lucene.util.BytesRef; import org.apache.lucene.index.MultiDocsAndPositionsEnum.EnumWithSlice; +import org.apache.lucene.util.BytesRef; import java.io.IOException; @@ -97,13 +97,6 @@ final class MappingMultiDocsAndPositionsEnum extends DocsAndPositionsEnum { int doc = current.nextDoc(); if (doc != NO_MORE_DOCS) { - - mergeState.checkAbortCount++; - if (mergeState.checkAbortCount > 60000) { - mergeState.checkAbort.work(mergeState.checkAbortCount/5.0); - mergeState.checkAbortCount = 0; - } - // compact deletions doc = currentMap.get(doc); if (doc == -1) { diff --git a/lucene/core/src/java/org/apache/lucene/index/MappingMultiDocsEnum.java b/lucene/core/src/java/org/apache/lucene/index/MappingMultiDocsEnum.java index 148ea5c1f9a..2aa9e5f8d23 100644 --- a/lucene/core/src/java/org/apache/lucene/index/MappingMultiDocsEnum.java +++ b/lucene/core/src/java/org/apache/lucene/index/MappingMultiDocsEnum.java @@ -97,13 +97,6 @@ final class MappingMultiDocsEnum extends DocsEnum { int doc = current.nextDoc(); if (doc != NO_MORE_DOCS) { - - mergeState.checkAbortCount++; - if (mergeState.checkAbortCount > 60000) { - mergeState.checkAbort.work(mergeState.checkAbortCount/5.0); - mergeState.checkAbortCount = 0; - } - // compact deletions doc = currentMap.get(doc); if (doc == -1) { diff --git a/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java b/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java index 1bd7ddaa10e..707d537c6d4 100644 --- a/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java +++ b/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java @@ -17,16 +17,17 @@ package org.apache.lucene.index; * limitations under the License. */ -import org.apache.lucene.store.Directory; -import org.apache.lucene.store.MergeInfo; -import org.apache.lucene.util.FixedBitSet; - import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.MergeInfo; +import org.apache.lucene.store.RateLimiter; +import org.apache.lucene.util.FixedBitSet; + /** *

Expert: a MergePolicy determines the sequence of * primitive merge operations.

@@ -107,11 +108,14 @@ public abstract class MergePolicy { /** Segments to be merged. */ public final List segments; + /** A private {@link RateLimiter} for this merge, used to rate limit writes and abort. */ + public final MergeRateLimiter rateLimiter; + + volatile long mergeStartNS = -1; + /** Total number of documents in segments to be merged, not accounting for deletions. */ public final int totalDocCount; - boolean aborted; Throwable error; - boolean paused; /** Sole constructor. * @param segments List of {@link SegmentCommitInfo}s @@ -127,6 +131,8 @@ public abstract class MergePolicy { count += info.info.getDocCount(); } totalDocCount = count; + + rateLimiter = new MergeRateLimiter(this); } /** Called by {@link IndexWriter} after the merge is done and all readers have been closed. */ @@ -186,68 +192,16 @@ public abstract class MergePolicy { return error; } - /** Mark this merge as aborted. If this is called - * before the merge is committed then the merge will - * not be committed. */ - synchronized void abort() { - aborted = true; - notifyAll(); - } - - /** Returns true if this merge was aborted. */ - synchronized boolean isAborted() { - return aborted; - } - - /** Called periodically by {@link IndexWriter} while - * merging to see if the merge is aborted. */ - public synchronized void checkAborted(Directory dir) throws MergeAbortedException { - if (aborted) { - throw new MergeAbortedException("merge is aborted: " + segString(dir)); - } - - while (paused) { - try { - // In theory we could wait() indefinitely, but we - // do 250 msec, defensively - wait(250); - } catch (InterruptedException ie) { - throw new RuntimeException(ie); - } - if (aborted) { - throw new MergeAbortedException("merge is aborted: " + segString(dir)); - } - } - } - - /** Set or clear whether this merge is paused paused (for example - * {@link ConcurrentMergeScheduler} will pause merges - * if too many are running). */ - synchronized public void setPause(boolean paused) { - this.paused = paused; - if (!paused) { - // Wakeup merge thread, if it's waiting - notifyAll(); - } - } - - /** Returns true if this merge is paused. - * - * @see #setPause(boolean) */ - synchronized public boolean getPause() { - return paused; - } - /** Returns a readable description of the current merge * state. */ - public String segString(Directory dir) { + public String segString() { StringBuilder b = new StringBuilder(); final int numSegments = segments.size(); for(int i=0;i 0) { b.append(' '); } - b.append(segments.get(i).toString(dir, 0)); + b.append(segments.get(i).toString()); } if (info != null) { b.append(" into ").append(info.info.name); @@ -255,7 +209,7 @@ public abstract class MergePolicy { if (maxNumSegments != -1) { b.append(" [maxNumSegments=" + maxNumSegments + "]"); } - if (aborted) { + if (rateLimiter.getAbort()) { b.append(" [ABORTED]"); } return b.toString(); @@ -321,7 +275,7 @@ public abstract class MergePolicy { b.append("MergeSpec:\n"); final int count = merges.size(); for(int i=0;i Long.MAX_VALUE ? Long.MAX_VALUE : (long) v; } - } diff --git a/lucene/core/src/java/org/apache/lucene/index/MergeRateLimiter.java b/lucene/core/src/java/org/apache/lucene/index/MergeRateLimiter.java new file mode 100644 index 00000000000..8bd27f407fd --- /dev/null +++ b/lucene/core/src/java/org/apache/lucene/index/MergeRateLimiter.java @@ -0,0 +1,194 @@ +package org.apache.lucene.index; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.lucene.store.RateLimiter; +import org.apache.lucene.util.ThreadInterruptedException; + +import static org.apache.lucene.store.RateLimiter.SimpleRateLimiter; + +/** This is the {@link RateLimiter} that {@link IndexWriter} assigns to each running merge, to + * give {@link MergeScheduler}s ionice like control. + * + * This is similar to {@link SimpleRateLimiter}, except it's merge-private, + * it will wake up if its rate changes while it's paused, it tracks how + * much time it spent stopped and paused, and it supports aborting. + * + * @lucene.internal */ + +public class MergeRateLimiter extends RateLimiter { + + private final static int MIN_PAUSE_CHECK_MSEC = 25; + volatile long totalBytesWritten; + + // By default no IO limit: + double mbPerSec = Double.POSITIVE_INFINITY; + private long lastNS; + private long minPauseCheckBytes; + private boolean abort; + long totalPausedNS; + long totalStoppedNS; + final MergePolicy.OneMerge merge; + + /** Returned by {@link #maybePause}. */ + private static enum PauseResult {NO, STOPPED, PAUSED}; + + /** Sole constructor. */ + public MergeRateLimiter(MergePolicy.OneMerge merge) { + this.merge = merge; + } + + @Override + public synchronized void setMBPerSec(double mbPerSec) { + // 0.0 is allowed: it means the merge is paused + if (mbPerSec < 0.0) { + throw new IllegalArgumentException("mbPerSec must be positive; got: " + mbPerSec); + } + this.mbPerSec = mbPerSec; + // NOTE: Double.POSITIVE_INFINITY casts to Long.MAX_VALUE + minPauseCheckBytes = Math.min(1024*1024, (long) ((MIN_PAUSE_CHECK_MSEC / 1000.0) * mbPerSec * 1024 * 1024)); + assert minPauseCheckBytes >= 0; + notify(); + } + + @Override + public synchronized double getMBPerSec() { + return mbPerSec; + } + + /** Returns total bytes written by this merge. */ + public long getTotalBytesWritten() { + return totalBytesWritten; + } + + @Override + public long pause(long bytes) throws MergePolicy.MergeAbortedException { + + totalBytesWritten += bytes; + + long startNS = System.nanoTime(); + long curNS = startNS; + + // While loop because 1) Thread.wait doesn't always sleep long + // enough, and 2) we wake up and check again when our rate limit + // is changed while we were pausing: + long pausedNS = 0; + while (true) { + PauseResult result = maybePause(bytes, curNS); + if (result == PauseResult.NO) { + // Set to curNS, not targetNS, to enforce the instant rate, not + // the "averaaged over all history" rate: + lastNS = curNS; + break; + } + curNS = System.nanoTime(); + long ns = curNS - startNS; + startNS = curNS; + + // Separately track when merge was stopped vs rate limited: + if (result == PauseResult.STOPPED) { + totalStoppedNS += ns; + } else { + assert result == PauseResult.PAUSED; + totalPausedNS += ns; + } + pausedNS += ns; + } + + return pausedNS; + } + + /** Total NS merge was stopped. */ + public synchronized long getTotalStoppedNS() { + return totalStoppedNS; + } + + /** Total NS merge was paused to rate limit IO. */ + public synchronized long getTotalPausedNS() { + return totalPausedNS; + } + + /** Returns NO if no pause happened, STOPPED if pause because rate was 0.0 (merge is stopped), PAUSED if paused with a normal rate limit. */ + private synchronized PauseResult maybePause(long bytes, long curNS) throws MergePolicy.MergeAbortedException { + double secondsToPause = (bytes/1024./1024.) / mbPerSec; + + // Time we should sleep until; this is purely instantaneous + // rate (just adds seconds onto the last time we had paused to); + // maybe we should also offer decayed recent history one? + long targetNS = lastNS + (long) (1000000000 * secondsToPause); + + long curPauseNS = targetNS - curNS; + + // NOTE: except maybe on real-time JVMs, minimum realistic + // wait/sleep time is 1 msec; if you pass just 1 nsec the impl + // rounds up to 1 msec, so we don't bother unless it's > 2 msec: + + if (curPauseNS <= 2000000) { + return PauseResult.NO; + } + + // Defensive: sleep for at most 250 msec; the loop above will call us again if we should keep sleeping: + if (curPauseNS > 250L*1000000) { + curPauseNS = 250L*1000000; + } + + int sleepMS = (int) (curPauseNS / 1000000); + int sleepNS = (int) (curPauseNS % 1000000); + + // Now is a good time to abort the merge: + checkAbort(); + + double rate = mbPerSec; + + try { + // CMS can wake us up here if it changes our target rate: + wait(sleepMS, sleepNS); + } catch (InterruptedException ie) { + throw new ThreadInterruptedException(ie); + } + + if (rate == 0.0) { + return PauseResult.STOPPED; + } else { + return PauseResult.PAUSED; + } + } + + /** Throws {@link MergePolicy.MergeAbortedException} if this merge was aborted. */ + public synchronized void checkAbort() throws MergePolicy.MergeAbortedException { + if (abort) { + throw new MergePolicy.MergeAbortedException("merge is aborted: " + merge.segString()); + } + } + + /** Mark this merge aborted. */ + public synchronized void setAbort() { + abort = true; + notify(); + } + + /** Returns true if this merge was aborted. */ + public synchronized boolean getAbort() { + return abort; + } + + @Override + public long getMinPauseCheckBytes() { + return minPauseCheckBytes; + } +} diff --git a/lucene/core/src/java/org/apache/lucene/index/MergeScheduler.java b/lucene/core/src/java/org/apache/lucene/index/MergeScheduler.java index b248634bbab..ae451ce9cef 100644 --- a/lucene/core/src/java/org/apache/lucene/index/MergeScheduler.java +++ b/lucene/core/src/java/org/apache/lucene/index/MergeScheduler.java @@ -20,6 +20,8 @@ package org.apache.lucene.index; import java.io.Closeable; import java.io.IOException; +import org.apache.lucene.util.InfoStream; + /**

Expert: {@link IndexWriter} uses an instance * implementing this interface to execute the merges * selected by a {@link MergePolicy}. The default @@ -46,4 +48,34 @@ public abstract class MergeScheduler implements Closeable { /** Close this MergeScheduler. */ @Override public abstract void close() throws IOException; + + /** For messages about merge scheduling */ + protected InfoStream infoStream; + + /** IndexWriter calls this on init. */ + final void setInfoStream(InfoStream infoStream) { + this.infoStream = infoStream; + } + + /** + * Returns true if infoStream messages are enabled. This method is usually used in + * conjunction with {@link #message(String)}: + * + *

+   * if (verbose()) {
+   *   message("your message");
+   * }
+   * 
+ */ + protected boolean verbose() { + return infoStream != null && infoStream.isEnabled("MS"); + } + + /** + * Outputs the given message - this method assumes {@link #verbose()} was + * called and returned true. + */ + protected void message(String message) { + infoStream.message("MS", message); + } } diff --git a/lucene/core/src/java/org/apache/lucene/index/MergeState.java b/lucene/core/src/java/org/apache/lucene/index/MergeState.java index 711cb33307e..baf1c5a1dc1 100644 --- a/lucene/core/src/java/org/apache/lucene/index/MergeState.java +++ b/lucene/core/src/java/org/apache/lucene/index/MergeState.java @@ -26,7 +26,6 @@ import org.apache.lucene.codecs.FieldsProducer; import org.apache.lucene.codecs.NormsProducer; import org.apache.lucene.codecs.StoredFieldsReader; import org.apache.lucene.codecs.TermVectorsReader; -import org.apache.lucene.store.Directory; import org.apache.lucene.util.Bits; import org.apache.lucene.util.InfoStream; import org.apache.lucene.util.packed.PackedInts; @@ -73,19 +72,11 @@ public class MergeState { /** Max docs per reader */ public final int[] maxDocs; - /** Holds the CheckAbort instance, which is invoked - * periodically to see if the merge has been aborted. */ - public final CheckAbort checkAbort; - /** InfoStream for debugging messages. */ public final InfoStream infoStream; - /** Counter used for periodic calls to checkAbort - * @lucene.internal */ - public int checkAbortCount; - /** Sole constructor. */ - MergeState(List readers, SegmentInfo segmentInfo, InfoStream infoStream, CheckAbort checkAbort) throws IOException { + MergeState(List readers, SegmentInfo segmentInfo, InfoStream infoStream) throws IOException { int numReaders = readers.size(); docMaps = new DocMap[numReaders]; @@ -148,7 +139,6 @@ public class MergeState { this.segmentInfo = segmentInfo; this.infoStream = infoStream; - this.checkAbort = checkAbort; setDocMaps(readers); } @@ -333,47 +323,6 @@ public class MergeState { segmentInfo.setDocCount(docBase); } - /** - * Class for recording units of work when merging segments. - */ - public static class CheckAbort { - private double workCount; - private final MergePolicy.OneMerge merge; - private final Directory dir; - - /** Creates a #CheckAbort instance. */ - public CheckAbort(MergePolicy.OneMerge merge, Directory dir) { - this.merge = merge; - this.dir = dir; - } - - /** - * Records the fact that roughly units amount of work - * have been done since this method was last called. - * When adding time-consuming code into SegmentMerger, - * you should test different values for units to ensure - * that the time in between calls to merge.checkAborted - * is up to ~ 1 second. - */ - public void work(double units) throws MergePolicy.MergeAbortedException { - workCount += units; - if (workCount >= 10000.0) { - merge.checkAborted(dir); - workCount = 0; - } - } - - /** If you use this: IW.close(false) cannot abort your merge! - * @lucene.internal */ - static final MergeState.CheckAbort NONE = new MergeState.CheckAbort(null, null) { - @Override - public void work(double units) { - // do nothing - } - }; - } - - /** * Remaps docids around deletes during merge */ diff --git a/lucene/core/src/java/org/apache/lucene/index/NoMergeScheduler.java b/lucene/core/src/java/org/apache/lucene/index/NoMergeScheduler.java index 9954031f129..c4a6bc1419c 100644 --- a/lucene/core/src/java/org/apache/lucene/index/NoMergeScheduler.java +++ b/lucene/core/src/java/org/apache/lucene/index/NoMergeScheduler.java @@ -46,5 +46,4 @@ public final class NoMergeScheduler extends MergeScheduler { public MergeScheduler clone() { return this; } - } diff --git a/lucene/core/src/java/org/apache/lucene/index/SegmentMerger.java b/lucene/core/src/java/org/apache/lucene/index/SegmentMerger.java index af19d6415d2..3019f943564 100644 --- a/lucene/core/src/java/org/apache/lucene/index/SegmentMerger.java +++ b/lucene/core/src/java/org/apache/lucene/index/SegmentMerger.java @@ -49,7 +49,10 @@ final class SegmentMerger { // note, just like in codec apis Directory 'dir' is NOT the same as segmentInfo.dir!! SegmentMerger(List readers, SegmentInfo segmentInfo, InfoStream infoStream, Directory dir, - MergeState.CheckAbort checkAbort, FieldInfos.FieldNumbers fieldNumbers, IOContext context) throws IOException { + FieldInfos.FieldNumbers fieldNumbers, IOContext context) throws IOException { + if (context.context != IOContext.Context.MERGE) { + throw new IllegalArgumentException("IOContext.context should be MERGE; got: " + context.context); + } // validate incoming readers for (LeafReader reader : readers) { if ((reader instanceof SegmentReader) == false) { @@ -59,7 +62,7 @@ final class SegmentMerger { } } - mergeState = new MergeState(readers, segmentInfo, infoStream, checkAbort); + mergeState = new MergeState(readers, segmentInfo, infoStream); directory = dir; this.codec = segmentInfo.getCodec(); this.context = context; @@ -81,12 +84,6 @@ final class SegmentMerger { if (!shouldMerge()) { throw new IllegalStateException("Merge would result in 0 document segment"); } - // NOTE: it's important to add calls to - // checkAbort.work(...) if you make any changes to this - // method that will spend alot of time. The frequency - // of this check impacts how long - // IndexWriter.close(false) takes to actually stop the - // background merge threads. mergeFieldInfos(); long t0 = 0; if (mergeState.infoStream.isEnabled("SM")) { diff --git a/lucene/core/src/java/org/apache/lucene/index/TieredMergePolicy.java b/lucene/core/src/java/org/apache/lucene/index/TieredMergePolicy.java index d6a691e56bd..64a88607d6a 100644 --- a/lucene/core/src/java/org/apache/lucene/index/TieredMergePolicy.java +++ b/lucene/core/src/java/org/apache/lucene/index/TieredMergePolicy.java @@ -568,7 +568,7 @@ public class TieredMergePolicy extends MergePolicy { final int numToMerge = end - maxSegmentCount + 1; final OneMerge merge = new OneMerge(eligible.subList(end-numToMerge, end)); if (verbose(writer)) { - message("add final merge=" + merge.segString(writer.getDirectory()), writer); + message("add final merge=" + merge.segString(), writer); } spec = new MergeSpecification(); spec.add(merge); diff --git a/lucene/core/src/java/org/apache/lucene/store/FilterDirectory.java b/lucene/core/src/java/org/apache/lucene/store/FilterDirectory.java index 76c4ed7a625..765b5c24c96 100644 --- a/lucene/core/src/java/org/apache/lucene/store/FilterDirectory.java +++ b/lucene/core/src/java/org/apache/lucene/store/FilterDirectory.java @@ -23,7 +23,7 @@ import java.util.Collection; /** Directory implementation that delegates calls to another directory. * This class can be used to add limitations on top of an existing * {@link Directory} implementation such as - * {@link RateLimitedDirectoryWrapper rate limiting} or to add additional + * {@link NRTCachingDirectory} or to add additional * sanity checks for tests. However, if you plan to write your own * {@link Directory} implementation, you should consider extending directly * {@link Directory} or {@link BaseDirectory} rather than try to reuse diff --git a/lucene/core/src/java/org/apache/lucene/store/RateLimitedDirectoryWrapper.java b/lucene/core/src/java/org/apache/lucene/store/RateLimitedDirectoryWrapper.java deleted file mode 100644 index 7f53ad61196..00000000000 --- a/lucene/core/src/java/org/apache/lucene/store/RateLimitedDirectoryWrapper.java +++ /dev/null @@ -1,145 +0,0 @@ -package org.apache.lucene.store; -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -import java.io.IOException; - -import org.apache.lucene.store.IOContext.Context; - -/** - * - * A {@link Directory} wrapper that allows {@link IndexOutput} rate limiting using - * {@link IOContext.Context IO context} specific {@link RateLimiter rate limiters}. - * - * @see #setRateLimiter(RateLimiter, IOContext.Context) - * @lucene.experimental - */ -public final class RateLimitedDirectoryWrapper extends FilterDirectory { - - // we need to be volatile here to make sure we see all the values that are set - // / modified concurrently - private volatile RateLimiter[] contextRateLimiters = new RateLimiter[IOContext.Context - .values().length]; - - public RateLimitedDirectoryWrapper(Directory wrapped) { - super(wrapped); - } - - @Override - public IndexOutput createOutput(String name, IOContext context) - throws IOException { - ensureOpen(); - final IndexOutput output = super.createOutput(name, context); - final RateLimiter limiter = getRateLimiter(context.context); - if (limiter != null) { - return new RateLimitedIndexOutput(limiter, output); - } - return output; - } - - @Override - public void copyFrom(Directory from, String src, String dest, IOContext context) throws IOException { - ensureOpen(); - in.copyFrom(from, src, dest, context); - } - - private RateLimiter getRateLimiter(IOContext.Context context) { - assert context != null; - return contextRateLimiters[context.ordinal()]; - } - - /** - * Sets the maximum (approx) MB/sec allowed by all write IO performed by - * {@link IndexOutput} created with the given {@link IOContext.Context}. Pass - * null to have no limit. - * - *

- * NOTE: For already created {@link IndexOutput} instances there is no - * guarantee this new rate will apply to them; it will only be guaranteed to - * apply for new created {@link IndexOutput} instances. - *

- * NOTE: this is an optional operation and might not be respected by - * all Directory implementations. Currently only {@link FSDirectory buffered} - * Directory implementations use rate-limiting. - * - * @throws IllegalArgumentException - * if context is null - * @throws AlreadyClosedException if the {@link Directory} is already closed - * @lucene.experimental - */ - public void setMaxWriteMBPerSec(Double mbPerSec, IOContext.Context context) { - ensureOpen(); - if (context == null) { - throw new IllegalArgumentException("Context must not be null"); - } - final int ord = context.ordinal(); - final RateLimiter limiter = contextRateLimiters[ord]; - if (mbPerSec == null) { - if (limiter != null) { - limiter.setMbPerSec(Double.MAX_VALUE); - contextRateLimiters[ord] = null; - } - } else if (limiter != null) { - limiter.setMbPerSec(mbPerSec); - contextRateLimiters[ord] = limiter; // cross the mem barrier again - } else { - contextRateLimiters[ord] = new RateLimiter.SimpleRateLimiter(mbPerSec); - } - } - - /** - * Sets the rate limiter to be used to limit (approx) MB/sec allowed by all IO - * performed with the given {@link IOContext.Context context}. Pass null to - * have no limit. - * - *

- * Passing an instance of rate limiter compared to setting it using - * {@link #setMaxWriteMBPerSec(Double, IOContext.Context)} - * allows to use the same limiter instance across several directories globally - * limiting IO across them. - * - * @throws IllegalArgumentException - * if context is null - * @throws AlreadyClosedException if the {@link Directory} is already closed - * @lucene.experimental - */ - public void setRateLimiter(RateLimiter mergeWriteRateLimiter, - Context context) { - ensureOpen(); - if (context == null) { - throw new IllegalArgumentException("Context must not be null"); - } - contextRateLimiters[context.ordinal()] = mergeWriteRateLimiter; - } - - /** - * See {@link #setMaxWriteMBPerSec}. - * - * @throws IllegalArgumentException - * if context is null - * @throws AlreadyClosedException if the {@link Directory} is already closed - * @lucene.experimental - */ - public Double getMaxWriteMBPerSec(IOContext.Context context) { - ensureOpen(); - if (context == null) { - throw new IllegalArgumentException("Context must not be null"); - } - RateLimiter limiter = getRateLimiter(context); - return limiter == null ? null : limiter.getMbPerSec(); - } - -} diff --git a/lucene/core/src/java/org/apache/lucene/store/RateLimitedIndexOutput.java b/lucene/core/src/java/org/apache/lucene/store/RateLimitedIndexOutput.java index f8535f22cf6..5fdc6d70b16 100644 --- a/lucene/core/src/java/org/apache/lucene/store/RateLimitedIndexOutput.java +++ b/lucene/core/src/java/org/apache/lucene/store/RateLimitedIndexOutput.java @@ -24,7 +24,8 @@ import java.io.IOException; * * @lucene.internal */ -final class RateLimitedIndexOutput extends IndexOutput { + +public final class RateLimitedIndexOutput extends IndexOutput { private final IndexOutput delegate; private final RateLimiter rateLimiter; @@ -36,7 +37,7 @@ final class RateLimitedIndexOutput extends IndexOutput { * which does volatile read. */ private long currentMinPauseCheckBytes; - RateLimitedIndexOutput(final RateLimiter rateLimiter, final IndexOutput delegate) { + public RateLimitedIndexOutput(final RateLimiter rateLimiter, final IndexOutput delegate) { super("RateLimitedIndexOutput(" + delegate + ")"); this.delegate = delegate; this.rateLimiter = rateLimiter; @@ -72,7 +73,7 @@ final class RateLimitedIndexOutput extends IndexOutput { delegate.writeBytes(b, offset, length); } - private void checkRate() { + private void checkRate() throws IOException { if (bytesSinceLastPause > currentMinPauseCheckBytes) { rateLimiter.pause(bytesSinceLastPause); bytesSinceLastPause = 0; diff --git a/lucene/core/src/java/org/apache/lucene/store/RateLimiter.java b/lucene/core/src/java/org/apache/lucene/store/RateLimiter.java index b5759f668e0..99ed3c78beb 100644 --- a/lucene/core/src/java/org/apache/lucene/store/RateLimiter.java +++ b/lucene/core/src/java/org/apache/lucene/store/RateLimiter.java @@ -17,6 +17,8 @@ package org.apache.lucene.store; * limitations under the License. */ +import java.io.IOException; + import org.apache.lucene.util.ThreadInterruptedException; /** Abstract base class to rate limit IO. Typically implementations are @@ -27,14 +29,14 @@ import org.apache.lucene.util.ThreadInterruptedException; public abstract class RateLimiter { /** - * Sets an updated mb per second rate limit. + * Sets an updated MB per second rate limit. */ - public abstract void setMbPerSec(double mbPerSec); + public abstract void setMBPerSec(double mbPerSec); /** - * The current mb per second rate limit. + * The current MB per second rate limit. */ - public abstract double getMbPerSec(); + public abstract double getMBPerSec(); /** Pauses, if necessary, to keep the instantaneous IO * rate at or below the target. @@ -43,7 +45,7 @@ public abstract class RateLimiter { *

* @return the pause time in nano seconds * */ - public abstract long pause(long bytes); + public abstract long pause(long bytes) throws IOException; /** How many bytes caller should add up itself before invoking {@link #pause}. */ public abstract long getMinPauseCheckBytes(); @@ -65,7 +67,7 @@ public abstract class RateLimiter { /** mbPerSec is the MB/sec max IO rate */ public SimpleRateLimiter(double mbPerSec) { - setMbPerSec(mbPerSec); + setMBPerSec(mbPerSec); lastNS = System.nanoTime(); } @@ -73,7 +75,7 @@ public abstract class RateLimiter { * Sets an updated mb per second rate limit. */ @Override - public void setMbPerSec(double mbPerSec) { + public void setMBPerSec(double mbPerSec) { this.mbPerSec = mbPerSec; minPauseCheckBytes = (long) ((MIN_PAUSE_CHECK_MSEC / 1000.0) * mbPerSec * 1024 * 1024); } @@ -87,7 +89,7 @@ public abstract class RateLimiter { * The current mb per second rate limit. */ @Override - public double getMbPerSec() { + public double getMBPerSec() { return this.mbPerSec; } diff --git a/lucene/core/src/java/org/apache/lucene/util/PrintStreamInfoStream.java b/lucene/core/src/java/org/apache/lucene/util/PrintStreamInfoStream.java index 33413be4a3d..190d0fa86eb 100644 --- a/lucene/core/src/java/org/apache/lucene/util/PrintStreamInfoStream.java +++ b/lucene/core/src/java/org/apache/lucene/util/PrintStreamInfoStream.java @@ -19,7 +19,10 @@ package org.apache.lucene.util; import java.io.IOException; import java.io.PrintStream; +import java.text.DateFormat; +import java.text.SimpleDateFormat; import java.util.Date; +import java.util.Locale; import java.util.concurrent.atomic.AtomicInteger; /** @@ -32,6 +35,8 @@ public class PrintStreamInfoStream extends InfoStream { // Used for printing messages private static final AtomicInteger MESSAGE_ID = new AtomicInteger(); protected final int messageID; + + private static final DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS", Locale.ROOT); protected final PrintStream stream; @@ -46,7 +51,7 @@ public class PrintStreamInfoStream extends InfoStream { @Override public void message(String component, String message) { - stream.println(component + " " + messageID + " [" + new Date() + "; " + Thread.currentThread().getName() + "]: " + message); + stream.println(component + " " + messageID + " [" + dateFormat.format(new Date()) + "; " + Thread.currentThread().getName() + "]: " + message); } @Override diff --git a/lucene/core/src/java/org/apache/lucene/util/StringHelper.java b/lucene/core/src/java/org/apache/lucene/util/StringHelper.java index 143498f5506..26686355dcd 100644 --- a/lucene/core/src/java/org/apache/lucene/util/StringHelper.java +++ b/lucene/core/src/java/org/apache/lucene/util/StringHelper.java @@ -253,7 +253,7 @@ public abstract class StringHelper { x0 = Long.parseLong(prop, 16); x1 = x0; } else { - // "Ghetto randomess" from 3 different sources: + // Randomess from 3 different sources: x0 = System.nanoTime(); x1 = StringHelper.class.hashCode() << 32; StringBuilder sb = new StringBuilder(); diff --git a/lucene/core/src/test/org/apache/lucene/TestMergeSchedulerExternal.java b/lucene/core/src/test/org/apache/lucene/TestMergeSchedulerExternal.java index 4442addf21b..0c3e15d3fc7 100644 --- a/lucene/core/src/test/org/apache/lucene/TestMergeSchedulerExternal.java +++ b/lucene/core/src/test/org/apache/lucene/TestMergeSchedulerExternal.java @@ -16,6 +16,7 @@ package org.apache.lucene; * See the License for the specific language governing permissions and * limitations under the License. */ + import org.apache.lucene.analysis.MockAnalyzer; import org.apache.lucene.document.Document; import org.apache.lucene.document.Field; @@ -56,21 +57,20 @@ public class TestMergeSchedulerExternal extends LuceneTestCase { @Override protected MergeThread getMergeThread(IndexWriter writer, MergePolicy.OneMerge merge) throws IOException { MergeThread thread = new MyMergeThread(writer, merge); - thread.setThreadPriority(getMergeThreadPriority()); thread.setDaemon(true); thread.setName("MyMergeThread"); return thread; } @Override - protected void handleMergeException(Throwable t) { + protected void handleMergeException(Directory dir, Throwable t) { excCalled = true; } - @Override - protected void doMerge(MergePolicy.OneMerge merge) throws IOException { + ;@Override + protected void doMerge(IndexWriter writer, MergePolicy.OneMerge merge) throws IOException { mergeCalled = true; - super.doMerge(merge); + super.doMerge(writer, merge); } } @@ -118,7 +118,7 @@ public class TestMergeSchedulerExternal extends LuceneTestCase { OneMerge merge = null; while ((merge = writer.getNextMerge()) != null) { if (VERBOSE) { - System.out.println("executing merge " + merge.segString(writer.getDirectory())); + System.out.println("executing merge " + merge.segString()); } writer.merge(merge); } diff --git a/lucene/core/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java b/lucene/core/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java index f1183fbb48d..3a5b7238965 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java @@ -293,7 +293,7 @@ public class TestConcurrentMergeScheduler extends LuceneTestCase { ConcurrentMergeScheduler cms = new ConcurrentMergeScheduler() { @Override - protected void doMerge(MergePolicy.OneMerge merge) throws IOException { + protected void doMerge(IndexWriter writer, MergePolicy.OneMerge merge) throws IOException { try { // Stall all incoming merges until we see // maxMergeCount: @@ -312,7 +312,7 @@ public class TestConcurrentMergeScheduler extends LuceneTestCase { // Then sleep a bit to give a chance for the bug // (too many pending merges) to appear: Thread.sleep(20); - super.doMerge(merge); + super.doMerge(writer, merge); } finally { runningMergeCount.decrementAndGet(); } @@ -358,10 +358,10 @@ public class TestConcurrentMergeScheduler extends LuceneTestCase { } @Override - public void doMerge(MergePolicy.OneMerge merge) throws IOException { + public void doMerge(IndexWriter writer, MergePolicy.OneMerge merge) throws IOException { totMergedBytes += merge.totalBytesSize(); atLeastOneMerge.countDown(); - super.doMerge(merge); + super.doMerge(writer, merge); } } @@ -428,7 +428,7 @@ public class TestConcurrentMergeScheduler extends LuceneTestCase { final AtomicInteger runningMergeCount = new AtomicInteger(); @Override - public void doMerge(MergePolicy.OneMerge merge) throws IOException { + public void doMerge(IndexWriter writer, MergePolicy.OneMerge merge) throws IOException { int count = runningMergeCount.incrementAndGet(); // evil? synchronized (this) { @@ -437,7 +437,7 @@ public class TestConcurrentMergeScheduler extends LuceneTestCase { } } try { - super.doMerge(merge); + super.doMerge(writer, merge); } finally { runningMergeCount.decrementAndGet(); } @@ -489,7 +489,7 @@ public class TestConcurrentMergeScheduler extends LuceneTestCase { IndexWriterConfig iwc = newIndexWriterConfig(new MockAnalyzer(random())); iwc.setMergeScheduler(new ConcurrentMergeScheduler() { @Override - protected void maybeStall() { + protected void maybeStall(IndexWriter writer) { wasCalled.set(true); } }); @@ -514,14 +514,14 @@ public class TestConcurrentMergeScheduler extends LuceneTestCase { final CountDownLatch mergeFinish = new CountDownLatch(1); ConcurrentMergeScheduler cms = new ConcurrentMergeScheduler() { @Override - protected void doMerge(MergePolicy.OneMerge merge) throws IOException { + protected void doMerge(IndexWriter writer, MergePolicy.OneMerge merge) throws IOException { mergeStart.countDown(); try { mergeFinish.await(); } catch (InterruptedException ie) { throw new RuntimeException(ie); } - super.doMerge(merge); + super.doMerge(writer, merge); } }; cms.setMaxMergesAndThreads(1, 1); @@ -629,7 +629,7 @@ public class TestConcurrentMergeScheduler extends LuceneTestCase { ConcurrentMergeScheduler cms = new ConcurrentMergeScheduler(); cms.setDefaultMaxMergesAndThreads(true); assertEquals(1, cms.getMaxThreadCount()); - assertEquals(2, cms.getMaxMergeCount()); + assertEquals(6, cms.getMaxMergeCount()); } public void testNonSpinningDefaults() throws Exception { @@ -637,7 +637,7 @@ public class TestConcurrentMergeScheduler extends LuceneTestCase { cms.setDefaultMaxMergesAndThreads(false); int threadCount = cms.getMaxThreadCount(); assertTrue(threadCount >= 1); - assertTrue(threadCount <= 3); - assertEquals(cms.getMaxMergeCount(), 2+threadCount); + assertTrue(threadCount <= 4); + assertEquals(5+threadCount, cms.getMaxMergeCount()); } } diff --git a/lucene/core/src/test/org/apache/lucene/index/TestDoc.java b/lucene/core/src/test/org/apache/lucene/index/TestDoc.java index 9900c7c2096..82c09e21671 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestDoc.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestDoc.java @@ -39,8 +39,8 @@ import org.apache.lucene.document.TextField; import org.apache.lucene.index.IndexWriterConfig.OpenMode; import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.store.Directory; -import org.apache.lucene.store.FSLockFactory; import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.MergeInfo; import org.apache.lucene.store.MockDirectoryWrapper; import org.apache.lucene.store.TrackingDirectoryWrapper; import org.apache.lucene.util.InfoStream; @@ -48,232 +48,231 @@ import org.apache.lucene.util.LuceneTestCase; import org.apache.lucene.util.StringHelper; import org.apache.lucene.util.Version; - /** JUnit adaptation of an older test case DocTest. */ public class TestDoc extends LuceneTestCase { - private Path workDir; - private Path indexDir; - private LinkedList files; + private Path workDir; + private Path indexDir; + private LinkedList files; - /** Set the test case. This test case needs - * a few text files created in the current working directory. - */ - @Override - public void setUp() throws Exception { - super.setUp(); - if (VERBOSE) { - System.out.println("TEST: setUp"); - } - workDir = createTempDir("TestDoc"); - indexDir = createTempDir("testIndex"); + /** Set the test case. This test case needs + * a few text files created in the current working directory. + */ + @Override + public void setUp() throws Exception { + super.setUp(); + if (VERBOSE) { + System.out.println("TEST: setUp"); + } + workDir = createTempDir("TestDoc"); + indexDir = createTempDir("testIndex"); - Directory directory = newFSDirectory(indexDir); - directory.close(); + Directory directory = newFSDirectory(indexDir); + directory.close(); - files = new LinkedList<>(); - files.add(createOutput("test.txt", - "This is the first test file" - )); + files = new LinkedList<>(); + files.add(createOutput("test.txt", + "This is the first test file" + )); - files.add(createOutput("test2.txt", - "This is the second test file" - )); + files.add(createOutput("test2.txt", + "This is the second test file" + )); + } + + private Path createOutput(String name, String text) throws IOException { + Writer fw = null; + PrintWriter pw = null; + + try { + Path path = workDir.resolve(name); + Files.deleteIfExists(path); + + fw = new OutputStreamWriter(Files.newOutputStream(path), StandardCharsets.UTF_8); + pw = new PrintWriter(fw); + pw.println(text); + return path; + + } finally { + if (pw != null) pw.close(); + if (fw != null) fw.close(); + } + } + + + /** This test executes a number of merges and compares the contents of + * the segments created when using compound file or not using one. + * + * TODO: the original test used to print the segment contents to System.out + * for visual validation. To have the same effect, a new method + * checkSegment(String name, ...) should be created that would + * assert various things about the segment. + */ + public void testIndexAndMerge() throws Exception { + StringWriter sw = new StringWriter(); + PrintWriter out = new PrintWriter(sw, true); + + Directory directory = newFSDirectory(indexDir); + + if (directory instanceof MockDirectoryWrapper) { + // We create unreferenced files (we don't even write + // a segments file): + ((MockDirectoryWrapper) directory).setAssertNoUnrefencedFilesOnClose(false); + // this test itself deletes files (has no retry mechanism) + ((MockDirectoryWrapper) directory).setEnableVirusScanner(false); } - private Path createOutput(String name, String text) throws IOException { - Writer fw = null; - PrintWriter pw = null; + IndexWriter writer = new IndexWriter( + directory, + newIndexWriterConfig(new MockAnalyzer(random())). + setOpenMode(OpenMode.CREATE). + setMaxBufferedDocs(-1). + setMergePolicy(newLogMergePolicy(10)) + ); - try { - Path path = workDir.resolve(name); - Files.deleteIfExists(path); + SegmentCommitInfo si1 = indexDoc(writer, "test.txt"); + printSegment(out, si1); - fw = new OutputStreamWriter(Files.newOutputStream(path), StandardCharsets.UTF_8); - pw = new PrintWriter(fw); - pw.println(text); - return path; + SegmentCommitInfo si2 = indexDoc(writer, "test2.txt"); + printSegment(out, si2); + writer.close(); - } finally { - if (pw != null) pw.close(); - if (fw != null) fw.close(); - } + SegmentCommitInfo siMerge = merge(directory, si1, si2, "_merge", false); + printSegment(out, siMerge); + + SegmentCommitInfo siMerge2 = merge(directory, si1, si2, "_merge2", false); + printSegment(out, siMerge2); + + SegmentCommitInfo siMerge3 = merge(directory, siMerge, siMerge2, "_merge3", false); + printSegment(out, siMerge3); + + directory.close(); + out.close(); + sw.close(); + + String multiFileOutput = sw.toString(); + //System.out.println(multiFileOutput); + + sw = new StringWriter(); + out = new PrintWriter(sw, true); + + directory = newFSDirectory(indexDir); + + if (directory instanceof MockDirectoryWrapper) { + // We create unreferenced files (we don't even write + // a segments file): + ((MockDirectoryWrapper) directory).setAssertNoUnrefencedFilesOnClose(false); + // this test itself deletes files (has no retry mechanism) + ((MockDirectoryWrapper) directory).setEnableVirusScanner(false); } + writer = new IndexWriter( + directory, + newIndexWriterConfig(new MockAnalyzer(random())). + setOpenMode(OpenMode.CREATE). + setMaxBufferedDocs(-1). + setMergePolicy(newLogMergePolicy(10)) + ); - /** This test executes a number of merges and compares the contents of - * the segments created when using compound file or not using one. - * - * TODO: the original test used to print the segment contents to System.out - * for visual validation. To have the same effect, a new method - * checkSegment(String name, ...) should be created that would - * assert various things about the segment. - */ - public void testIndexAndMerge() throws Exception { - StringWriter sw = new StringWriter(); - PrintWriter out = new PrintWriter(sw, true); + si1 = indexDoc(writer, "test.txt"); + printSegment(out, si1); + + si2 = indexDoc(writer, "test2.txt"); + printSegment(out, si2); + writer.close(); + + siMerge = merge(directory, si1, si2, "_merge", true); + printSegment(out, siMerge); + + siMerge2 = merge(directory, si1, si2, "_merge2", true); + printSegment(out, siMerge2); + + siMerge3 = merge(directory, siMerge, siMerge2, "_merge3", true); + printSegment(out, siMerge3); - Directory directory = newFSDirectory(indexDir); + directory.close(); + out.close(); + sw.close(); + String singleFileOutput = sw.toString(); - if (directory instanceof MockDirectoryWrapper) { - // We create unreferenced files (we don't even write - // a segments file): - ((MockDirectoryWrapper) directory).setAssertNoUnrefencedFilesOnClose(false); - // this test itself deletes files (has no retry mechanism) - ((MockDirectoryWrapper) directory).setEnableVirusScanner(false); - } + assertEquals(multiFileOutput, singleFileOutput); + } - IndexWriter writer = new IndexWriter( - directory, - newIndexWriterConfig(new MockAnalyzer(random())). - setOpenMode(OpenMode.CREATE). - setMaxBufferedDocs(-1). - setMergePolicy(newLogMergePolicy(10)) - ); + private SegmentCommitInfo indexDoc(IndexWriter writer, String fileName) + throws Exception + { + Path path = workDir.resolve(fileName); + Document doc = new Document(); + InputStreamReader is = new InputStreamReader(Files.newInputStream(path), StandardCharsets.UTF_8); + doc.add(new TextField("contents", is)); + writer.addDocument(doc); + writer.commit(); + is.close(); + return writer.newestSegment(); + } - SegmentCommitInfo si1 = indexDoc(writer, "test.txt"); - printSegment(out, si1); - SegmentCommitInfo si2 = indexDoc(writer, "test2.txt"); - printSegment(out, si2); - writer.close(); + private SegmentCommitInfo merge(Directory dir, SegmentCommitInfo si1, SegmentCommitInfo si2, String merged, boolean useCompoundFile) + throws Exception { + IOContext context = newIOContext(random(), new IOContext(new MergeInfo(-1, -1, false, -1))); + SegmentReader r1 = new SegmentReader(si1, context); + SegmentReader r2 = new SegmentReader(si2, context); - SegmentCommitInfo siMerge = merge(directory, si1, si2, "_merge", false); - printSegment(out, siMerge); + final Codec codec = Codec.getDefault(); + TrackingDirectoryWrapper trackingDir = new TrackingDirectoryWrapper(si1.info.dir); + final SegmentInfo si = new SegmentInfo(si1.info.dir, Version.LATEST, merged, -1, false, codec, null, StringHelper.randomId(), new HashMap<>()); - SegmentCommitInfo siMerge2 = merge(directory, si1, si2, "_merge2", false); - printSegment(out, siMerge2); + SegmentMerger merger = new SegmentMerger(Arrays.asList(r1, r2), + si, InfoStream.getDefault(), trackingDir, + new FieldInfos.FieldNumbers(), context); - SegmentCommitInfo siMerge3 = merge(directory, siMerge, siMerge2, "_merge3", false); - printSegment(out, siMerge3); + MergeState mergeState = merger.merge(); + r1.close(); + r2.close();; + si.setFiles(new HashSet<>(trackingDir.getCreatedFiles())); - directory.close(); - out.close(); - sw.close(); - - String multiFileOutput = sw.toString(); - //System.out.println(multiFileOutput); - - sw = new StringWriter(); - out = new PrintWriter(sw, true); - - directory = newFSDirectory(indexDir); - - if (directory instanceof MockDirectoryWrapper) { - // We create unreferenced files (we don't even write - // a segments file): - ((MockDirectoryWrapper) directory).setAssertNoUnrefencedFilesOnClose(false); - // this test itself deletes files (has no retry mechanism) - ((MockDirectoryWrapper) directory).setEnableVirusScanner(false); + if (useCompoundFile) { + Collection filesToDelete = IndexWriter.createCompoundFile(InfoStream.getDefault(), dir, si, newIOContext(random())); + si.setUseCompoundFile(true); + for (final String fileToDelete : filesToDelete) { + si1.info.dir.deleteFile(fileToDelete); } - - writer = new IndexWriter( - directory, - newIndexWriterConfig(new MockAnalyzer(random())). - setOpenMode(OpenMode.CREATE). - setMaxBufferedDocs(-1). - setMergePolicy(newLogMergePolicy(10)) - ); - - si1 = indexDoc(writer, "test.txt"); - printSegment(out, si1); - - si2 = indexDoc(writer, "test2.txt"); - printSegment(out, si2); - writer.close(); - - siMerge = merge(directory, si1, si2, "_merge", true); - printSegment(out, siMerge); - - siMerge2 = merge(directory, si1, si2, "_merge2", true); - printSegment(out, siMerge2); - - siMerge3 = merge(directory, siMerge, siMerge2, "_merge3", true); - printSegment(out, siMerge3); - - directory.close(); - out.close(); - sw.close(); - String singleFileOutput = sw.toString(); - - assertEquals(multiFileOutput, singleFileOutput); - } - - private SegmentCommitInfo indexDoc(IndexWriter writer, String fileName) - throws Exception - { - Path path = workDir.resolve(fileName); - Document doc = new Document(); - InputStreamReader is = new InputStreamReader(Files.newInputStream(path), StandardCharsets.UTF_8); - doc.add(new TextField("contents", is)); - writer.addDocument(doc); - writer.commit(); - is.close(); - return writer.newestSegment(); - } - - - private SegmentCommitInfo merge(Directory dir, SegmentCommitInfo si1, SegmentCommitInfo si2, String merged, boolean useCompoundFile) - throws Exception { - IOContext context = newIOContext(random()); - SegmentReader r1 = new SegmentReader(si1, context); - SegmentReader r2 = new SegmentReader(si2, context); - - final Codec codec = Codec.getDefault(); - TrackingDirectoryWrapper trackingDir = new TrackingDirectoryWrapper(si1.info.dir); - final SegmentInfo si = new SegmentInfo(si1.info.dir, Version.LATEST, merged, -1, false, codec, null, StringHelper.randomId(), new HashMap<>()); - - SegmentMerger merger = new SegmentMerger(Arrays.asList(r1, r2), - si, InfoStream.getDefault(), trackingDir, - MergeState.CheckAbort.NONE, new FieldInfos.FieldNumbers(), context); - - MergeState mergeState = merger.merge(); - r1.close(); - r2.close();; - si.setFiles(new HashSet<>(trackingDir.getCreatedFiles())); - - if (useCompoundFile) { - Collection filesToDelete = IndexWriter.createCompoundFile(InfoStream.getDefault(), dir, MergeState.CheckAbort.NONE, si, newIOContext(random())); - si.setUseCompoundFile(true); - for (final String fileToDelete : filesToDelete) { - si1.info.dir.deleteFile(fileToDelete); - } - } - - return new SegmentCommitInfo(si, 0, -1L, -1L, -1L); - } - - - private void printSegment(PrintWriter out, SegmentCommitInfo si) - throws Exception { - SegmentReader reader = new SegmentReader(si, newIOContext(random())); - - for (int i = 0; i < reader.numDocs(); i++) - out.println(reader.document(i)); - - Fields fields = reader.fields(); - for (String field : fields) { - Terms terms = fields.terms(field); - assertNotNull(terms); - TermsEnum tis = terms.iterator(null); - while(tis.next() != null) { - - out.print(" term=" + field + ":" + tis.term()); - out.println(" DF=" + tis.docFreq()); - - DocsAndPositionsEnum positions = tis.docsAndPositions(reader.getLiveDocs(), null); - - while (positions.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) { - out.print(" doc=" + positions.docID()); - out.print(" TF=" + positions.freq()); - out.print(" pos="); - out.print(positions.nextPosition()); - for (int j = 1; j < positions.freq(); j++) - out.print("," + positions.nextPosition()); - out.println(""); - } - } - } - reader.close(); } + + return new SegmentCommitInfo(si, 0, -1L, -1L, -1L); + } + + + private void printSegment(PrintWriter out, SegmentCommitInfo si) + throws Exception { + SegmentReader reader = new SegmentReader(si, newIOContext(random())); + + for (int i = 0; i < reader.numDocs(); i++) + out.println(reader.document(i)); + + Fields fields = reader.fields(); + for (String field : fields) { + Terms terms = fields.terms(field); + assertNotNull(terms); + TermsEnum tis = terms.iterator(null); + while(tis.next() != null) { + + out.print(" term=" + field + ":" + tis.term()); + out.println(" DF=" + tis.docFreq()); + + DocsAndPositionsEnum positions = tis.docsAndPositions(reader.getLiveDocs(), null); + + while (positions.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) { + out.print(" doc=" + positions.docID()); + out.print(" TF=" + positions.freq()); + out.print(" pos="); + out.print(positions.nextPosition()); + for (int j = 1; j < positions.freq(); j++) + out.print("," + positions.nextPosition()); + out.println(""); + } + } + } + reader.close(); + } } diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexFileDeleter.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexFileDeleter.java index 9717e224612..02fb165ed99 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestIndexFileDeleter.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexFileDeleter.java @@ -432,7 +432,7 @@ public class TestIndexFileDeleter extends LuceneTestCase { if (ms instanceof ConcurrentMergeScheduler) { final ConcurrentMergeScheduler suppressFakeFail = new ConcurrentMergeScheduler() { @Override - protected void handleMergeException(Throwable exc) { + protected void handleMergeException(Directory dir, Throwable exc) { // suppress only FakeIOException: if (exc instanceof RuntimeException && exc.getMessage().equals("fake fail")) { // ok to ignore @@ -440,13 +440,12 @@ public class TestIndexFileDeleter extends LuceneTestCase { && exc.getCause() != null && "fake fail".equals(exc.getCause().getMessage())) { // also ok to ignore } else { - super.handleMergeException(exc); + super.handleMergeException(dir, exc); } } }; final ConcurrentMergeScheduler cms = (ConcurrentMergeScheduler) ms; suppressFakeFail.setMaxMergesAndThreads(cms.getMaxMergeCount(), cms.getMaxThreadCount()); - suppressFakeFail.setMergeThreadPriority(cms.getMergeThreadPriority()); iwc.setMergeScheduler(suppressFakeFail); } diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java index d8aad16e83c..de56c2e82de 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java @@ -2563,7 +2563,7 @@ public class TestIndexWriter extends LuceneTestCase { iwc.setMergeScheduler(new ConcurrentMergeScheduler() { @Override - public void doMerge(MergePolicy.OneMerge merge) throws IOException { + public void doMerge(IndexWriter writer, MergePolicy.OneMerge merge) throws IOException { mergeStarted.countDown(); try { closeStarted.await(); @@ -2571,7 +2571,7 @@ public class TestIndexWriter extends LuceneTestCase { Thread.currentThread().interrupt(); throw new RuntimeException(ie); } - super.doMerge(merge); + super.doMerge(writer, merge); } @Override diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java index bee2f7b15aa..eb1359b618b 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java @@ -57,15 +57,15 @@ import org.apache.lucene.store.Directory; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexOutput; -import org.apache.lucene.store.MockDirectoryWrapper.FakeIOException; import org.apache.lucene.store.MockDirectoryWrapper; +import org.apache.lucene.store.MockDirectoryWrapper.FakeIOException; import org.apache.lucene.store.RAMDirectory; import org.apache.lucene.util.Bits; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.InfoStream; import org.apache.lucene.util.LuceneTestCase; -import org.apache.lucene.util.TestUtil; import org.apache.lucene.util.LuceneTestCase.SuppressCodecs; +import org.apache.lucene.util.TestUtil; @SuppressCodecs("SimpleText") // too slow here public class TestIndexWriterExceptions extends LuceneTestCase { @@ -1951,16 +1951,15 @@ public class TestIndexWriterExceptions extends LuceneTestCase { if (ms instanceof ConcurrentMergeScheduler) { final ConcurrentMergeScheduler suppressFakeIOE = new ConcurrentMergeScheduler() { @Override - protected void handleMergeException(Throwable exc) { + protected void handleMergeException(Directory dir, Throwable exc) { // suppress only FakeIOException: if (!(exc instanceof FakeIOException)) { - super.handleMergeException(exc); + super.handleMergeException(dir, exc); } } }; final ConcurrentMergeScheduler cms = (ConcurrentMergeScheduler) ms; suppressFakeIOE.setMaxMergesAndThreads(cms.getMaxMergeCount(), cms.getMaxThreadCount()); - suppressFakeIOE.setMergeThreadPriority(cms.getMergeThreadPriority()); iwc.setMergeScheduler(suppressFakeIOE); } diff --git a/lucene/core/src/test/org/apache/lucene/index/TestSegmentMerger.java b/lucene/core/src/test/org/apache/lucene/index/TestSegmentMerger.java index a1187032d42..c526d2b85e3 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestSegmentMerger.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestSegmentMerger.java @@ -25,6 +25,8 @@ import org.apache.lucene.codecs.Codec; import org.apache.lucene.document.Document; import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.MergeInfo; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.FixedBitSet; import org.apache.lucene.util.InfoStream; @@ -83,8 +85,9 @@ public class TestSegmentMerger extends LuceneTestCase { final SegmentInfo si = new SegmentInfo(mergedDir, Version.LATEST, mergedSegment, -1, false, codec, null, StringHelper.randomId(), new HashMap<>()); SegmentMerger merger = new SegmentMerger(Arrays.asList(reader1, reader2), - si, InfoStream.getDefault(), mergedDir, - MergeState.CheckAbort.NONE, new FieldInfos.FieldNumbers(), newIOContext(random())); + si, InfoStream.getDefault(), mergedDir, + new FieldInfos.FieldNumbers(), + newIOContext(random(), new IOContext(new MergeInfo(-1, -1, false, -1)))); MergeState mergeState = merger.merge(); int docsMerged = mergeState.segmentInfo.getDocCount(); assertTrue(docsMerged == 2); diff --git a/lucene/core/src/test/org/apache/lucene/store/TestRateLimitedDirectoryWrapper.java b/lucene/core/src/test/org/apache/lucene/store/TestRateLimitedDirectoryWrapper.java deleted file mode 100644 index c63119b95a3..00000000000 --- a/lucene/core/src/test/org/apache/lucene/store/TestRateLimitedDirectoryWrapper.java +++ /dev/null @@ -1,44 +0,0 @@ -package org.apache.lucene.store; - -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import java.nio.file.Path; - -import org.apache.lucene.util.TestUtil; - -public class TestRateLimitedDirectoryWrapper extends BaseDirectoryTestCase { - - @Override - protected Directory getDirectory(Path path) { - Directory in = newFSDirectory(path); - if (in instanceof MockDirectoryWrapper) { - // test manipulates directory directly - ((MockDirectoryWrapper)in).setEnableVirusScanner(false); - } - RateLimitedDirectoryWrapper dir = new RateLimitedDirectoryWrapper(in); - RateLimiter limiter = new RateLimiter.SimpleRateLimiter(TestUtil.nextInt(random(), 10, 40)); - dir.setRateLimiter(limiter, IOContext.Context.MERGE); - return dir; - } - - // since we are rate-limiting, this test gets pretty slow - @Override @Nightly - public void testThreadSafety() throws Exception { - super.testThreadSafety(); - } -} diff --git a/lucene/sandbox/src/test/org/apache/lucene/codecs/idversion/TestIDVersionPostingsFormat.java b/lucene/sandbox/src/test/org/apache/lucene/codecs/idversion/TestIDVersionPostingsFormat.java index f688ba6b0fc..b8cfe3dc4bb 100644 --- a/lucene/sandbox/src/test/org/apache/lucene/codecs/idversion/TestIDVersionPostingsFormat.java +++ b/lucene/sandbox/src/test/org/apache/lucene/codecs/idversion/TestIDVersionPostingsFormat.java @@ -29,8 +29,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicLong; -import org.apache.lucene.analysis.Analyzer.TokenStreamComponents; import org.apache.lucene.analysis.Analyzer; +import org.apache.lucene.analysis.Analyzer.TokenStreamComponents; import org.apache.lucene.analysis.MockAnalyzer; import org.apache.lucene.analysis.MockTokenFilter; import org.apache.lucene.analysis.MockTokenizer; @@ -387,7 +387,7 @@ public class TestIDVersionPostingsFormat extends LuceneTestCase { if (ms instanceof ConcurrentMergeScheduler) { iwc.setMergeScheduler(new ConcurrentMergeScheduler() { @Override - protected void handleMergeException(Throwable exc) { + protected void handleMergeException(Directory dir, Throwable exc) { assertTrue(exc instanceof IllegalArgumentException); } }); diff --git a/lucene/test-framework/src/java/org/apache/lucene/codecs/cranky/CrankyCompoundFormat.java b/lucene/test-framework/src/java/org/apache/lucene/codecs/cranky/CrankyCompoundFormat.java index 8f64f9f56e1..20d9d65a92d 100644 --- a/lucene/test-framework/src/java/org/apache/lucene/codecs/cranky/CrankyCompoundFormat.java +++ b/lucene/test-framework/src/java/org/apache/lucene/codecs/cranky/CrankyCompoundFormat.java @@ -22,7 +22,6 @@ import java.util.Collection; import java.util.Random; import org.apache.lucene.codecs.CompoundFormat; -import org.apache.lucene.index.MergeState.CheckAbort; import org.apache.lucene.index.SegmentInfo; import org.apache.lucene.store.Directory; import org.apache.lucene.store.IOContext; @@ -42,11 +41,11 @@ class CrankyCompoundFormat extends CompoundFormat { } @Override - public void write(Directory dir, SegmentInfo si, Collection files, CheckAbort checkAbort, IOContext context) throws IOException { + public void write(Directory dir, SegmentInfo si, Collection files, IOContext context) throws IOException { if (random.nextInt(100) == 0) { throw new IOException("Fake IOException from CompoundFormat.write()"); } - delegate.write(dir, si, files, checkAbort, context); + delegate.write(dir, si, files, context); } @Override diff --git a/lucene/test-framework/src/java/org/apache/lucene/index/BaseCompoundFormatTestCase.java b/lucene/test-framework/src/java/org/apache/lucene/index/BaseCompoundFormatTestCase.java index a0c04cf4ffd..82c943f354e 100644 --- a/lucene/test-framework/src/java/org/apache/lucene/index/BaseCompoundFormatTestCase.java +++ b/lucene/test-framework/src/java/org/apache/lucene/index/BaseCompoundFormatTestCase.java @@ -55,7 +55,7 @@ public abstract class BaseCompoundFormatTestCase extends BaseIndexFileFormatTest Directory dir = newDirectory(); SegmentInfo si = newSegmentInfo(dir, "_123"); - si.getCodec().compoundFormat().write(dir, si, Collections.emptyList(), MergeState.CheckAbort.NONE, IOContext.DEFAULT); + si.getCodec().compoundFormat().write(dir, si, Collections.emptyList(), IOContext.DEFAULT); Directory cfs = si.getCodec().compoundFormat().getCompoundReader(dir, si, IOContext.DEFAULT); assertEquals(0, cfs.listAll().length); cfs.close(); @@ -74,7 +74,7 @@ public abstract class BaseCompoundFormatTestCase extends BaseIndexFileFormatTest createSequenceFile(dir, testfile, (byte) 0, data[i]); SegmentInfo si = newSegmentInfo(dir, "_" + i); - si.getCodec().compoundFormat().write(dir, si, Collections.singleton(testfile), MergeState.CheckAbort.NONE, IOContext.DEFAULT); + si.getCodec().compoundFormat().write(dir, si, Collections.singleton(testfile), IOContext.DEFAULT); Directory cfs = si.getCodec().compoundFormat().getCompoundReader(dir, si, IOContext.DEFAULT); IndexInput expected = dir.openInput(testfile, newIOContext(random())); @@ -98,7 +98,7 @@ public abstract class BaseCompoundFormatTestCase extends BaseIndexFileFormatTest createSequenceFile(dir, files[1], (byte) 0, 114); SegmentInfo si = newSegmentInfo(dir, "_123"); - si.getCodec().compoundFormat().write(dir, si, Arrays.asList(files), MergeState.CheckAbort.NONE, IOContext.DEFAULT); + si.getCodec().compoundFormat().write(dir, si, Arrays.asList(files), IOContext.DEFAULT); Directory cfs = si.getCodec().compoundFormat().getCompoundReader(dir, si, IOContext.DEFAULT); for (String file : files) { @@ -124,7 +124,7 @@ public abstract class BaseCompoundFormatTestCase extends BaseIndexFileFormatTest out.close(); SegmentInfo si = newSegmentInfo(dir, "_123"); - si.getCodec().compoundFormat().write(dir, si, Collections.singleton(testfile), MergeState.CheckAbort.NONE, IOContext.DEFAULT); + si.getCodec().compoundFormat().write(dir, si, Collections.singleton(testfile), IOContext.DEFAULT); Directory cfs = si.getCodec().compoundFormat().getCompoundReader(dir, si, IOContext.DEFAULT); assertEquals(1, cfs.listAll().length); cfs.close(); @@ -149,7 +149,7 @@ public abstract class BaseCompoundFormatTestCase extends BaseIndexFileFormatTest out.close(); SegmentInfo si = newSegmentInfo(dir, "_123"); - si.getCodec().compoundFormat().write(dir, si, Collections.singleton(testfile), MergeState.CheckAbort.NONE, myContext); + si.getCodec().compoundFormat().write(dir, si, Collections.singleton(testfile), myContext); dir.close(); } @@ -168,7 +168,7 @@ public abstract class BaseCompoundFormatTestCase extends BaseIndexFileFormatTest out.close(); SegmentInfo si = newSegmentInfo(dir, "_123"); - si.getCodec().compoundFormat().write(dir, si, Collections.singleton(testfile), MergeState.CheckAbort.NONE, context); + si.getCodec().compoundFormat().write(dir, si, Collections.singleton(testfile), context); dir.close(); } @@ -218,7 +218,7 @@ public abstract class BaseCompoundFormatTestCase extends BaseIndexFileFormatTest Directory dir = newDirectory(); SegmentInfo si = newSegmentInfo(dir, "_123"); - si.getCodec().compoundFormat().write(dir, si, Collections.emptyList(), MergeState.CheckAbort.NONE, IOContext.DEFAULT); + si.getCodec().compoundFormat().write(dir, si, Collections.emptyList(), IOContext.DEFAULT); Directory cfs = si.getCodec().compoundFormat().getCompoundReader(dir, si, IOContext.DEFAULT); try { cfs.createOutput("bogus", IOContext.DEFAULT); @@ -240,7 +240,7 @@ public abstract class BaseCompoundFormatTestCase extends BaseIndexFileFormatTest out.close(); SegmentInfo si = newSegmentInfo(dir, "_123"); - si.getCodec().compoundFormat().write(dir, si, Collections.emptyList(), MergeState.CheckAbort.NONE, IOContext.DEFAULT); + si.getCodec().compoundFormat().write(dir, si, Collections.emptyList(), IOContext.DEFAULT); Directory cfs = si.getCodec().compoundFormat().getCompoundReader(dir, si, IOContext.DEFAULT); try { cfs.deleteFile(testfile); @@ -262,7 +262,7 @@ public abstract class BaseCompoundFormatTestCase extends BaseIndexFileFormatTest out.close(); SegmentInfo si = newSegmentInfo(dir, "_123"); - si.getCodec().compoundFormat().write(dir, si, Collections.emptyList(), MergeState.CheckAbort.NONE, IOContext.DEFAULT); + si.getCodec().compoundFormat().write(dir, si, Collections.emptyList(), IOContext.DEFAULT); Directory cfs = si.getCodec().compoundFormat().getCompoundReader(dir, si, IOContext.DEFAULT); try { cfs.renameFile(testfile, "bogus"); @@ -284,7 +284,7 @@ public abstract class BaseCompoundFormatTestCase extends BaseIndexFileFormatTest out.close(); SegmentInfo si = newSegmentInfo(dir, "_123"); - si.getCodec().compoundFormat().write(dir, si, Collections.emptyList(), MergeState.CheckAbort.NONE, IOContext.DEFAULT); + si.getCodec().compoundFormat().write(dir, si, Collections.emptyList(), IOContext.DEFAULT); Directory cfs = si.getCodec().compoundFormat().getCompoundReader(dir, si, IOContext.DEFAULT); try { cfs.sync(Collections.singleton(testfile)); @@ -306,7 +306,7 @@ public abstract class BaseCompoundFormatTestCase extends BaseIndexFileFormatTest out.close(); SegmentInfo si = newSegmentInfo(dir, "_123"); - si.getCodec().compoundFormat().write(dir, si, Collections.emptyList(), MergeState.CheckAbort.NONE, IOContext.DEFAULT); + si.getCodec().compoundFormat().write(dir, si, Collections.emptyList(), IOContext.DEFAULT); Directory cfs = si.getCodec().compoundFormat().getCompoundReader(dir, si, IOContext.DEFAULT); try { cfs.makeLock("foobar"); @@ -345,7 +345,7 @@ public abstract class BaseCompoundFormatTestCase extends BaseIndexFileFormatTest String files[] = dir.listAll(); SegmentInfo si = newSegmentInfo(dir, "_123"); - si.getCodec().compoundFormat().write(dir, si, Arrays.asList(files), MergeState.CheckAbort.NONE, IOContext.DEFAULT); + si.getCodec().compoundFormat().write(dir, si, Arrays.asList(files), IOContext.DEFAULT); Directory cfs = si.getCodec().compoundFormat().getCompoundReader(dir, si, IOContext.DEFAULT); for (int i = 0; i < files.length; i++) { @@ -376,7 +376,7 @@ public abstract class BaseCompoundFormatTestCase extends BaseIndexFileFormatTest assertEquals(0, dir.getFileHandleCount()); SegmentInfo si = newSegmentInfo(dir, "_123"); - si.getCodec().compoundFormat().write(dir, si, Arrays.asList(dir.listAll()), MergeState.CheckAbort.NONE, IOContext.DEFAULT); + si.getCodec().compoundFormat().write(dir, si, Arrays.asList(dir.listAll()), IOContext.DEFAULT); Directory cfs = si.getCodec().compoundFormat().getCompoundReader(dir, si, IOContext.DEFAULT); final IndexInput[] ins = new IndexInput[FILE_COUNT]; @@ -729,7 +729,7 @@ public abstract class BaseCompoundFormatTestCase extends BaseIndexFileFormatTest } SegmentInfo si = newSegmentInfo(dir, "_123"); - si.getCodec().compoundFormat().write(dir, si, files, MergeState.CheckAbort.NONE, IOContext.DEFAULT); + si.getCodec().compoundFormat().write(dir, si, files, IOContext.DEFAULT); Directory cfs = si.getCodec().compoundFormat().getCompoundReader(dir, si, IOContext.DEFAULT); return cfs; } diff --git a/lucene/test-framework/src/java/org/apache/lucene/index/SuppressingConcurrentMergeScheduler.java b/lucene/test-framework/src/java/org/apache/lucene/index/SuppressingConcurrentMergeScheduler.java index 004db286a8c..cebe5844ad5 100644 --- a/lucene/test-framework/src/java/org/apache/lucene/index/SuppressingConcurrentMergeScheduler.java +++ b/lucene/test-framework/src/java/org/apache/lucene/index/SuppressingConcurrentMergeScheduler.java @@ -1,5 +1,7 @@ package org.apache.lucene.index; +import org.apache.lucene.store.Directory; + /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with @@ -17,22 +19,17 @@ package org.apache.lucene.index; * limitations under the License. */ -import java.io.IOException; - -import org.apache.lucene.store.AlreadyClosedException; -import org.apache.lucene.util.IOUtils; - /** A {@link ConcurrentMergeScheduler} that ignores AlreadyClosedException. */ public abstract class SuppressingConcurrentMergeScheduler extends ConcurrentMergeScheduler { @Override - protected void handleMergeException(Throwable exc) { + protected void handleMergeException(Directory dir, Throwable exc) { while (true) { if (isOK(exc)) { return; } exc = exc.getCause(); if (exc == null) { - super.handleMergeException(exc); + super.handleMergeException(dir, exc); } } } diff --git a/lucene/test-framework/src/java/org/apache/lucene/store/MockDirectoryWrapper.java b/lucene/test-framework/src/java/org/apache/lucene/store/MockDirectoryWrapper.java index 385ccf63568..b416d5e38cc 100644 --- a/lucene/test-framework/src/java/org/apache/lucene/store/MockDirectoryWrapper.java +++ b/lucene/test-framework/src/java/org/apache/lucene/store/MockDirectoryWrapper.java @@ -611,7 +611,7 @@ public class MockDirectoryWrapper extends BaseDirectoryWrapper { // throttling REALLY slows down tests, so don't do it very often for SOMETIMES. if (throttling == Throttling.ALWAYS || - (throttling == Throttling.SOMETIMES && randomState.nextInt(200) == 0) && !(in instanceof RateLimitedDirectoryWrapper)) { + (throttling == Throttling.SOMETIMES && randomState.nextInt(200) == 0)) { if (LuceneTestCase.VERBOSE) { System.out.println("MockDirectoryWrapper: throttling indexOutput (" + name + ")"); } diff --git a/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java b/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java index e8519a1578d..72c8621c047 100644 --- a/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java +++ b/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java @@ -123,13 +123,11 @@ import org.apache.lucene.store.FSDirectory; import org.apache.lucene.store.FSLockFactory; import org.apache.lucene.store.FlushInfo; import org.apache.lucene.store.IOContext; -import org.apache.lucene.store.IOContext.Context; import org.apache.lucene.store.LockFactory; import org.apache.lucene.store.MergeInfo; import org.apache.lucene.store.MockDirectoryWrapper; import org.apache.lucene.store.MockDirectoryWrapper.Throttling; import org.apache.lucene.store.NRTCachingDirectory; -import org.apache.lucene.store.RateLimitedDirectoryWrapper; import org.apache.lucene.util.automaton.AutomatonTestUtil; import org.apache.lucene.util.automaton.CompiledAutomaton; import org.apache.lucene.util.automaton.RegExp; @@ -914,13 +912,17 @@ public abstract class LuceneTestCase extends Assert { } else { cms = new ConcurrentMergeScheduler() { @Override - protected synchronized void maybeStall() { + protected synchronized void maybeStall(IndexWriter writer) { } }; } int maxThreadCount = TestUtil.nextInt(r, 1, 4); int maxMergeCount = TestUtil.nextInt(r, maxThreadCount, maxThreadCount + 4); cms.setMaxMergesAndThreads(maxMergeCount, maxThreadCount); + if (random().nextBoolean()) { + cms.disableAutoIOThrottle(); + } + cms.setForceMergeMBPerSec(10 + 10*random().nextDouble()); c.setMergeScheduler(cms); } else { // Always use consistent settings, else CMS's dynamic (SSD or not) @@ -1347,27 +1349,6 @@ public abstract class LuceneTestCase extends Assert { directory = new NRTCachingDirectory(directory, random.nextDouble(), random.nextDouble()); } - if (TEST_NIGHTLY && rarely(random) && !bare) { - final double maxMBPerSec = TestUtil.nextInt(random, 20, 40); - if (LuceneTestCase.VERBOSE) { - System.out.println("LuceneTestCase: will rate limit output IndexOutput to " + maxMBPerSec + " MB/sec"); - } - final RateLimitedDirectoryWrapper rateLimitedDirectoryWrapper = new RateLimitedDirectoryWrapper(directory); - switch (random.nextInt(10)) { - case 3: // sometimes rate limit on flush - rateLimitedDirectoryWrapper.setMaxWriteMBPerSec(maxMBPerSec, Context.FLUSH); - break; - case 2: // sometimes rate limit flush & merge - rateLimitedDirectoryWrapper.setMaxWriteMBPerSec(maxMBPerSec, Context.FLUSH); - rateLimitedDirectoryWrapper.setMaxWriteMBPerSec(maxMBPerSec, Context.MERGE); - break; - default: - rateLimitedDirectoryWrapper.setMaxWriteMBPerSec(maxMBPerSec, Context.MERGE); - } - directory = rateLimitedDirectoryWrapper; - - } - if (bare) { BaseDirectoryWrapper base = new BaseDirectoryWrapper(directory); closeAfterSuite(new CloseableDirectory(base, suiteFailureMarker)); diff --git a/solr/core/src/java/org/apache/solr/core/CachingDirectoryFactory.java b/solr/core/src/java/org/apache/solr/core/CachingDirectoryFactory.java index 2b2f61be87e..7d3fdc026e7 100644 --- a/solr/core/src/java/org/apache/solr/core/CachingDirectoryFactory.java +++ b/solr/core/src/java/org/apache/solr/core/CachingDirectoryFactory.java @@ -36,7 +36,6 @@ import org.apache.lucene.store.LockFactory; import org.apache.lucene.store.NRTCachingDirectory; import org.apache.lucene.store.NativeFSLockFactory; import org.apache.lucene.store.NoLockFactory; -import org.apache.lucene.store.RateLimitedDirectoryWrapper; import org.apache.lucene.store.SimpleFSLockFactory; import org.apache.lucene.store.SingleInstanceLockFactory; import org.apache.lucene.util.IOUtils; @@ -350,7 +349,6 @@ public abstract class CachingDirectoryFactory extends DirectoryFactory { directory = create(fullPath, createLockFactory(rawLockType), dirContext); boolean success = false; try { - directory = rateLimit(directory); CacheValue newCacheValue = new CacheValue(fullPath, directory); byDirectoryCache.put(directory, newCacheValue); byPathCache.put(fullPath, newCacheValue); @@ -370,25 +368,6 @@ public abstract class CachingDirectoryFactory extends DirectoryFactory { } } - private Directory rateLimit(Directory directory) { - if (maxWriteMBPerSecDefault != null || maxWriteMBPerSecFlush != null || maxWriteMBPerSecMerge != null || maxWriteMBPerSecRead != null) { - directory = new RateLimitedDirectoryWrapper(directory); - if (maxWriteMBPerSecDefault != null) { - ((RateLimitedDirectoryWrapper)directory).setMaxWriteMBPerSec(maxWriteMBPerSecDefault, Context.DEFAULT); - } - if (maxWriteMBPerSecFlush != null) { - ((RateLimitedDirectoryWrapper)directory).setMaxWriteMBPerSec(maxWriteMBPerSecFlush, Context.FLUSH); - } - if (maxWriteMBPerSecMerge != null) { - ((RateLimitedDirectoryWrapper)directory).setMaxWriteMBPerSec(maxWriteMBPerSecMerge, Context.MERGE); - } - if (maxWriteMBPerSecRead != null) { - ((RateLimitedDirectoryWrapper)directory).setMaxWriteMBPerSec(maxWriteMBPerSecRead, Context.READ); - } - } - return directory; - } - /* * (non-Javadoc) * diff --git a/solr/core/src/java/org/apache/solr/core/StandardDirectoryFactory.java b/solr/core/src/java/org/apache/solr/core/StandardDirectoryFactory.java index d67ac0eecc3..198e15bd23d 100644 --- a/solr/core/src/java/org/apache/solr/core/StandardDirectoryFactory.java +++ b/solr/core/src/java/org/apache/solr/core/StandardDirectoryFactory.java @@ -28,7 +28,6 @@ import org.apache.lucene.store.LockFactory; import org.apache.lucene.store.NRTCachingDirectory; import org.apache.lucene.store.NativeFSLockFactory; import org.apache.lucene.store.NoLockFactory; -import org.apache.lucene.store.RateLimitedDirectoryWrapper; import org.apache.lucene.store.SimpleFSLockFactory; import org.apache.lucene.store.SingleInstanceLockFactory; import org.apache.solr.common.SolrException; @@ -113,8 +112,7 @@ public class StandardDirectoryFactory extends CachingDirectoryFactory { * carefully - some Directory wrappers will * cache files for example. * - * This implementation works with two wrappers: - * NRTCachingDirectory and RateLimitedDirectoryWrapper. + * This implementation works with NRTCachingDirectory. * * You should first {@link Directory#sync(java.util.Collection)} any file that will be * moved or avoid cached files through settings. @@ -143,13 +141,11 @@ public class StandardDirectoryFactory extends CachingDirectoryFactory { super.move(fromDir, toDir, fileName, ioContext); } - // special hack to work with NRTCachingDirectory and RateLimitedDirectoryWrapper + // special hack to work with NRTCachingDirectory private Directory getBaseDir(Directory dir) { Directory baseDir; if (dir instanceof NRTCachingDirectory) { baseDir = ((NRTCachingDirectory)dir).getDelegate(); - } else if (dir instanceof RateLimitedDirectoryWrapper) { - baseDir = ((RateLimitedDirectoryWrapper)dir).getDelegate(); } else { baseDir = dir; } diff --git a/solr/core/src/test/org/apache/solr/BasicFunctionalityTest.java b/solr/core/src/test/org/apache/solr/BasicFunctionalityTest.java index 2a11d4556de..7cf28dd629a 100644 --- a/solr/core/src/test/org/apache/solr/BasicFunctionalityTest.java +++ b/solr/core/src/test/org/apache/solr/BasicFunctionalityTest.java @@ -37,7 +37,6 @@ import org.apache.lucene.index.StoredDocument; import org.apache.lucene.store.Directory; import org.apache.lucene.store.IOContext.Context; import org.apache.lucene.store.MockDirectoryWrapper; -import org.apache.lucene.store.RateLimitedDirectoryWrapper; import org.apache.lucene.util.English; import org.apache.solr.common.SolrException; import org.apache.solr.common.params.CommonParams; diff --git a/solr/test-framework/src/java/org/apache/solr/core/MockDirectoryFactory.java b/solr/test-framework/src/java/org/apache/solr/core/MockDirectoryFactory.java index 37633ba357b..d76b7093d6a 100644 --- a/solr/test-framework/src/java/org/apache/solr/core/MockDirectoryFactory.java +++ b/solr/test-framework/src/java/org/apache/solr/core/MockDirectoryFactory.java @@ -25,7 +25,6 @@ import org.apache.lucene.store.LockFactory; import org.apache.lucene.store.MockDirectoryWrapper; import org.apache.lucene.store.NRTCachingDirectory; import org.apache.lucene.store.NoLockFactory; -import org.apache.lucene.store.RateLimitedDirectoryWrapper; import org.apache.lucene.store.TrackingDirectoryWrapper; import org.apache.lucene.util.LuceneTestCase; @@ -86,9 +85,6 @@ public class MockDirectoryFactory extends EphemeralDirectoryFactory { if (dir instanceof NRTCachingDirectory) { cdir = ((NRTCachingDirectory)dir).getDelegate(); } - if (cdir instanceof RateLimitedDirectoryWrapper) { - cdir = ((RateLimitedDirectoryWrapper)dir).getDelegate(); - } if (cdir instanceof TrackingDirectoryWrapper) { cdir = ((TrackingDirectoryWrapper)dir).getDelegate(); } diff --git a/solr/test-framework/src/java/org/apache/solr/core/MockFSDirectoryFactory.java b/solr/test-framework/src/java/org/apache/solr/core/MockFSDirectoryFactory.java index 9e5efa394e0..1c3bbcdc96e 100644 --- a/solr/test-framework/src/java/org/apache/solr/core/MockFSDirectoryFactory.java +++ b/solr/test-framework/src/java/org/apache/solr/core/MockFSDirectoryFactory.java @@ -25,7 +25,6 @@ import org.apache.lucene.store.Directory; import org.apache.lucene.store.LockFactory; import org.apache.lucene.store.MockDirectoryWrapper; import org.apache.lucene.store.NRTCachingDirectory; -import org.apache.lucene.store.RateLimitedDirectoryWrapper; import org.apache.lucene.store.TrackingDirectoryWrapper; import org.apache.lucene.util.LuceneTestCase; @@ -69,9 +68,6 @@ public class MockFSDirectoryFactory extends StandardDirectoryFactory { if (dir instanceof NRTCachingDirectory) { cdir = ((NRTCachingDirectory)dir).getDelegate(); } - if (cdir instanceof RateLimitedDirectoryWrapper) { - cdir = ((RateLimitedDirectoryWrapper)dir).getDelegate(); - } if (cdir instanceof TrackingDirectoryWrapper) { cdir = ((TrackingDirectoryWrapper)dir).getDelegate(); }