diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt index 0897c1a2f18..27b238cbafc 100644 --- a/lucene/CHANGES.txt +++ b/lucene/CHANGES.txt @@ -189,6 +189,9 @@ Bug Fixes On top of that with score mode average, the explain would fail with a NPE. (Martijn van Groningen) +* LUCENE-7101: OfflineSorter had O(N^2) merge cost, and used too many + temporary file descriptors, for large sorts (Mike McCandless) + Other * LUCENE-7035: Upgrade icu4j to 56.1/unicode 8. (Robert Muir) diff --git a/lucene/core/src/java/org/apache/lucene/util/OfflineSorter.java b/lucene/core/src/java/org/apache/lucene/util/OfflineSorter.java index 18e421b7bed..3a22e33cb0c 100644 --- a/lucene/core/src/java/org/apache/lucene/util/OfflineSorter.java +++ b/lucene/core/src/java/org/apache/lucene/util/OfflineSorter.java @@ -64,7 +64,7 @@ public class OfflineSorter { /** * Maximum number of temporary files before doing an intermediate merge. */ - public final static int MAX_TEMPFILES = 128; + public final static int MAX_TEMPFILES = 10; private final Directory dir; @@ -232,6 +232,7 @@ public class OfflineSorter { sortInfo.totalTime = System.currentTimeMillis(); List segments = new ArrayList<>(); + int[] levelCounts = new int[1]; // So we can remove any partially written temp files on exception: TrackingDirectoryWrapper trackingDir = new TrackingDirectoryWrapper(dir); @@ -244,15 +245,26 @@ public class OfflineSorter { segments.add(sortPartition(trackingDir)); sortInfo.tempMergeFiles++; sortInfo.lineCount += lineCount; + levelCounts[0]++; - // Handle intermediate merges. - if (segments.size() == maxTempFiles) { + // Handle intermediate merges; we need a while loop to "cascade" the merge when necessary: + int mergeLevel = 0; + while (levelCounts[mergeLevel] == maxTempFiles) { mergePartitions(trackingDir, segments); + if (mergeLevel+2 > levelCounts.length) { + levelCounts = ArrayUtil.grow(levelCounts, mergeLevel+2); + } + levelCounts[mergeLevel+1]++; + levelCounts[mergeLevel] = 0; + mergeLevel++; } } + + // TODO: we shouldn't have to do this? Can't we return a merged reader to + // the caller, who often consumes the result just once, instead? - // Merge the partitions to the output file with a priority queue. - if (segments.size() > 1) { + // Merge all partitions down to 1 (basically a forceMerge(1)): + while (segments.size() > 1) { mergePartitions(trackingDir, segments); } @@ -304,19 +316,25 @@ public class OfflineSorter { } } - /** Merge a list of sorted temporary files (partitions) into an output file. Note that this closes the - * incoming {@link IndexOutput}. */ + /** Merge the most recent {@code maxTempFile} partitions into a new partition. */ void mergePartitions(Directory trackingDir, List segments) throws IOException { long start = System.currentTimeMillis(); - PriorityQueue queue = new PriorityQueue(segments.size()) { + List segmentsToMerge; + if (segments.size() > maxTempFiles) { + segmentsToMerge = segments.subList(segments.size() - maxTempFiles, segments.size()); + } else { + segmentsToMerge = segments; + } + + PriorityQueue queue = new PriorityQueue(segmentsToMerge.size()) { @Override protected boolean lessThan(FileAndTop a, FileAndTop b) { return comparator.compare(a.current.get(), b.current.get()) < 0; } }; - ByteSequencesReader[] streams = new ByteSequencesReader[segments.size()]; + ByteSequencesReader[] streams = new ByteSequencesReader[segmentsToMerge.size()]; String newSegmentName = null; @@ -326,8 +344,8 @@ public class OfflineSorter { newSegmentName = out.getName(); // Open streams and read the top for each file - for (int i = 0; i < segments.size(); i++) { - streams[i] = getReader(dir.openInput(segments.get(i), IOContext.READONCE)); + for (int i = 0; i < segmentsToMerge.size(); i++) { + streams[i] = getReader(dir.openInput(segmentsToMerge.get(i), IOContext.READONCE)); BytesRefBuilder bytes = new BytesRefBuilder(); boolean result = streams[i].read(bytes); assert result; @@ -354,9 +372,9 @@ public class OfflineSorter { IOUtils.close(streams); } - IOUtils.deleteFiles(trackingDir, segments); + IOUtils.deleteFiles(trackingDir, segmentsToMerge); - segments.clear(); + segmentsToMerge.clear(); segments.add(newSegmentName); sortInfo.tempMergeFiles++; diff --git a/lucene/core/src/java/org/apache/lucene/util/bkd/BKDWriter.java b/lucene/core/src/java/org/apache/lucene/util/bkd/BKDWriter.java index c5cdc303195..10d97e3db46 100644 --- a/lucene/core/src/java/org/apache/lucene/util/bkd/BKDWriter.java +++ b/lucene/core/src/java/org/apache/lucene/util/bkd/BKDWriter.java @@ -212,11 +212,11 @@ public class BKDWriter implements Closeable { } } - /** If the current segment has too many points then we switchover to temp files / offline sort. */ - private void switchToOffline() throws IOException { + /** If the current segment has too many points then we spill over to temp files / offline sort. */ + private void spillToOffline() throws IOException { // For each .add we just append to this input file, then in .finish we sort this input and resursively build the tree: - offlinePointWriter = new OfflinePointWriter(tempDir, tempFileNamePrefix, packedBytesLength, longOrds, "switch"); + offlinePointWriter = new OfflinePointWriter(tempDir, tempFileNamePrefix, packedBytesLength, longOrds, "spill"); tempInput = offlinePointWriter.out; PointReader reader = heapPointWriter.getReader(0); for(int i=0;i= maxPointsSortInHeap) { if (offlinePointWriter == null) { - switchToOffline(); + spillToOffline(); } offlinePointWriter.append(packedValue, pointCount, docID); } else { @@ -733,7 +733,7 @@ public class BKDWriter implements Closeable { // TODO: this is sort of sneaky way to get the final OfflinePointWriter from OfflineSorter: IndexOutput[] lastWriter = new IndexOutput[1]; - OfflineSorter sorter = new OfflineSorter(tempDir, tempFileNamePrefix, cmp, OfflineSorter.BufferSize.megabytes(Math.max(1, (long) maxMBSortInHeap)), OfflineSorter.MAX_TEMPFILES) { + OfflineSorter sorter = new OfflineSorter(tempDir, tempFileNamePrefix + "_bkd" + dim, cmp, OfflineSorter.BufferSize.megabytes(Math.max(1, (long) maxMBSortInHeap)), OfflineSorter.MAX_TEMPFILES) { /** We write/read fixed-byte-width file that {@link OfflinePointReader} can read. */ @Override @@ -742,9 +742,7 @@ public class BKDWriter implements Closeable { return new ByteSequencesWriter(out) { @Override public void write(byte[] bytes, int off, int len) throws IOException { - if (len != bytesPerDoc) { - throw new IllegalArgumentException("len=" + len + " bytesPerDoc=" + bytesPerDoc); - } + assert len == bytesPerDoc: "len=" + len + " bytesPerDoc=" + bytesPerDoc; out.writeBytes(bytes, off, len); } }; diff --git a/lucene/core/src/test/org/apache/lucene/index/Test2BPoints.java b/lucene/core/src/test/org/apache/lucene/index/Test2BPoints.java index 5766b911230..f15ba19ed38 100644 --- a/lucene/core/src/test/org/apache/lucene/index/Test2BPoints.java +++ b/lucene/core/src/test/org/apache/lucene/index/Test2BPoints.java @@ -49,7 +49,6 @@ import com.carrotsearch.randomizedtesting.annotations.TimeoutSuite; public class Test2BPoints extends LuceneTestCase { public void test1D() throws Exception { Directory dir = FSDirectory.open(createTempDir("2BPoints1D")); - System.out.println("DIR: " + ((FSDirectory) dir).getDirectory()); IndexWriterConfig iwc = new IndexWriterConfig(new MockAnalyzer(random())) .setCodec(getCodec()) @@ -144,24 +143,6 @@ public class Test2BPoints extends LuceneTestCase { } private static Codec getCodec() { - - return new FilterCodec("Lucene60", Codec.forName("Lucene60")) { - @Override - public PointsFormat pointsFormat() { - return new PointsFormat() { - @Override - public PointsWriter fieldsWriter(SegmentWriteState writeState) throws IOException { - int maxPointsInLeafNode = 1024; - double maxMBSortInHeap = 256.0; - return new Lucene60PointsWriter(writeState, maxPointsInLeafNode, maxMBSortInHeap); - } - - @Override - public PointsReader fieldsReader(SegmentReadState readState) throws IOException { - return new Lucene60PointsReader(readState); - } - }; - } - }; + return Codec.forName("Lucene60"); } } diff --git a/lucene/core/src/test/org/apache/lucene/util/TestOfflineSorter.java b/lucene/core/src/test/org/apache/lucene/util/TestOfflineSorter.java index 53225313ade..b45a3bbb53f 100644 --- a/lucene/core/src/test/org/apache/lucene/util/TestOfflineSorter.java +++ b/lucene/core/src/test/org/apache/lucene/util/TestOfflineSorter.java @@ -82,7 +82,7 @@ public class TestOfflineSorter extends LuceneTestCase { try (Directory dir = newDirectory()) { SortInfo sortInfo = checkSort(dir, new OfflineSorter(dir, "foo", OfflineSorter.DEFAULT_COMPARATOR, BufferSize.megabytes(1), OfflineSorter.MAX_TEMPFILES), generateRandom((int)OfflineSorter.MB * 20)); - assertEquals(1, sortInfo.mergeRounds); + assertEquals(3, sortInfo.mergeRounds); } } diff --git a/lucene/core/src/test/org/apache/lucene/util/bkd/Test2BBKDPoints.java b/lucene/core/src/test/org/apache/lucene/util/bkd/Test2BBKDPoints.java index 4df2ebe1929..cfb98b09c78 100644 --- a/lucene/core/src/test/org/apache/lucene/util/bkd/Test2BBKDPoints.java +++ b/lucene/core/src/test/org/apache/lucene/util/bkd/Test2BBKDPoints.java @@ -55,7 +55,7 @@ public class Test2BBKDPoints extends LuceneTestCase { final int numDocs = (Integer.MAX_VALUE / 26) + 100; - BKDWriter w = new BKDWriter(numDocs, dir, "_0", 1, Long.BYTES, 1024, 256, 26L * numDocs); + BKDWriter w = new BKDWriter(numDocs, dir, "_0", 1, Long.BYTES, 26L * numDocs); int counter = 0; byte[] packedBytes = new byte[Long.BYTES]; for (int docID = 0; docID < numDocs; docID++) { @@ -88,7 +88,7 @@ public class Test2BBKDPoints extends LuceneTestCase { final int numDocs = (Integer.MAX_VALUE / 26) + 100; - BKDWriter w = new BKDWriter(numDocs, dir, "_0", 2, Long.BYTES, 1024, 256, 26L * numDocs); + BKDWriter w = new BKDWriter(numDocs, dir, "_0", 2, Long.BYTES, 26L * numDocs); int counter = 0; byte[] packedBytes = new byte[2*Long.BYTES]; for (int docID = 0; docID < numDocs; docID++) {