LUCENE-7101: OfflineSorter had O(N^2) merge cost, and used too many temporary file descriptors, for large sorts

This commit is contained in:
Mike McCandless 2016-03-15 07:36:02 -04:00
parent 4e3f376ca4
commit 287d6c8be6
6 changed files with 44 additions and 44 deletions

View File

@ -189,6 +189,9 @@ Bug Fixes
On top of that with score mode average, the explain would fail with a NPE.
(Martijn van Groningen)
* LUCENE-7101: OfflineSorter had O(N^2) merge cost, and used too many
temporary file descriptors, for large sorts (Mike McCandless)
Other
* LUCENE-7035: Upgrade icu4j to 56.1/unicode 8. (Robert Muir)

View File

@ -64,7 +64,7 @@ public class OfflineSorter {
/**
* Maximum number of temporary files before doing an intermediate merge.
*/
public final static int MAX_TEMPFILES = 128;
public final static int MAX_TEMPFILES = 10;
private final Directory dir;
@ -232,6 +232,7 @@ public class OfflineSorter {
sortInfo.totalTime = System.currentTimeMillis();
List<String> segments = new ArrayList<>();
int[] levelCounts = new int[1];
// So we can remove any partially written temp files on exception:
TrackingDirectoryWrapper trackingDir = new TrackingDirectoryWrapper(dir);
@ -244,15 +245,26 @@ public class OfflineSorter {
segments.add(sortPartition(trackingDir));
sortInfo.tempMergeFiles++;
sortInfo.lineCount += lineCount;
levelCounts[0]++;
// Handle intermediate merges.
if (segments.size() == maxTempFiles) {
// Handle intermediate merges; we need a while loop to "cascade" the merge when necessary:
int mergeLevel = 0;
while (levelCounts[mergeLevel] == maxTempFiles) {
mergePartitions(trackingDir, segments);
if (mergeLevel+2 > levelCounts.length) {
levelCounts = ArrayUtil.grow(levelCounts, mergeLevel+2);
}
levelCounts[mergeLevel+1]++;
levelCounts[mergeLevel] = 0;
mergeLevel++;
}
}
// TODO: we shouldn't have to do this? Can't we return a merged reader to
// the caller, who often consumes the result just once, instead?
// Merge the partitions to the output file with a priority queue.
if (segments.size() > 1) {
// Merge all partitions down to 1 (basically a forceMerge(1)):
while (segments.size() > 1) {
mergePartitions(trackingDir, segments);
}
@ -304,19 +316,25 @@ public class OfflineSorter {
}
}
/** Merge a list of sorted temporary files (partitions) into an output file. Note that this closes the
* incoming {@link IndexOutput}. */
/** Merge the most recent {@code maxTempFile} partitions into a new partition. */
void mergePartitions(Directory trackingDir, List<String> segments) throws IOException {
long start = System.currentTimeMillis();
PriorityQueue<FileAndTop> queue = new PriorityQueue<FileAndTop>(segments.size()) {
List<String> segmentsToMerge;
if (segments.size() > maxTempFiles) {
segmentsToMerge = segments.subList(segments.size() - maxTempFiles, segments.size());
} else {
segmentsToMerge = segments;
}
PriorityQueue<FileAndTop> queue = new PriorityQueue<FileAndTop>(segmentsToMerge.size()) {
@Override
protected boolean lessThan(FileAndTop a, FileAndTop b) {
return comparator.compare(a.current.get(), b.current.get()) < 0;
}
};
ByteSequencesReader[] streams = new ByteSequencesReader[segments.size()];
ByteSequencesReader[] streams = new ByteSequencesReader[segmentsToMerge.size()];
String newSegmentName = null;
@ -326,8 +344,8 @@ public class OfflineSorter {
newSegmentName = out.getName();
// Open streams and read the top for each file
for (int i = 0; i < segments.size(); i++) {
streams[i] = getReader(dir.openInput(segments.get(i), IOContext.READONCE));
for (int i = 0; i < segmentsToMerge.size(); i++) {
streams[i] = getReader(dir.openInput(segmentsToMerge.get(i), IOContext.READONCE));
BytesRefBuilder bytes = new BytesRefBuilder();
boolean result = streams[i].read(bytes);
assert result;
@ -354,9 +372,9 @@ public class OfflineSorter {
IOUtils.close(streams);
}
IOUtils.deleteFiles(trackingDir, segments);
IOUtils.deleteFiles(trackingDir, segmentsToMerge);
segments.clear();
segmentsToMerge.clear();
segments.add(newSegmentName);
sortInfo.tempMergeFiles++;

View File

@ -212,11 +212,11 @@ public class BKDWriter implements Closeable {
}
}
/** If the current segment has too many points then we switchover to temp files / offline sort. */
private void switchToOffline() throws IOException {
/** If the current segment has too many points then we spill over to temp files / offline sort. */
private void spillToOffline() throws IOException {
// For each .add we just append to this input file, then in .finish we sort this input and resursively build the tree:
offlinePointWriter = new OfflinePointWriter(tempDir, tempFileNamePrefix, packedBytesLength, longOrds, "switch");
offlinePointWriter = new OfflinePointWriter(tempDir, tempFileNamePrefix, packedBytesLength, longOrds, "spill");
tempInput = offlinePointWriter.out;
PointReader reader = heapPointWriter.getReader(0);
for(int i=0;i<pointCount;i++) {
@ -235,7 +235,7 @@ public class BKDWriter implements Closeable {
if (pointCount >= maxPointsSortInHeap) {
if (offlinePointWriter == null) {
switchToOffline();
spillToOffline();
}
offlinePointWriter.append(packedValue, pointCount, docID);
} else {
@ -733,7 +733,7 @@ public class BKDWriter implements Closeable {
// TODO: this is sort of sneaky way to get the final OfflinePointWriter from OfflineSorter:
IndexOutput[] lastWriter = new IndexOutput[1];
OfflineSorter sorter = new OfflineSorter(tempDir, tempFileNamePrefix, cmp, OfflineSorter.BufferSize.megabytes(Math.max(1, (long) maxMBSortInHeap)), OfflineSorter.MAX_TEMPFILES) {
OfflineSorter sorter = new OfflineSorter(tempDir, tempFileNamePrefix + "_bkd" + dim, cmp, OfflineSorter.BufferSize.megabytes(Math.max(1, (long) maxMBSortInHeap)), OfflineSorter.MAX_TEMPFILES) {
/** We write/read fixed-byte-width file that {@link OfflinePointReader} can read. */
@Override
@ -742,9 +742,7 @@ public class BKDWriter implements Closeable {
return new ByteSequencesWriter(out) {
@Override
public void write(byte[] bytes, int off, int len) throws IOException {
if (len != bytesPerDoc) {
throw new IllegalArgumentException("len=" + len + " bytesPerDoc=" + bytesPerDoc);
}
assert len == bytesPerDoc: "len=" + len + " bytesPerDoc=" + bytesPerDoc;
out.writeBytes(bytes, off, len);
}
};

View File

@ -49,7 +49,6 @@ import com.carrotsearch.randomizedtesting.annotations.TimeoutSuite;
public class Test2BPoints extends LuceneTestCase {
public void test1D() throws Exception {
Directory dir = FSDirectory.open(createTempDir("2BPoints1D"));
System.out.println("DIR: " + ((FSDirectory) dir).getDirectory());
IndexWriterConfig iwc = new IndexWriterConfig(new MockAnalyzer(random()))
.setCodec(getCodec())
@ -144,24 +143,6 @@ public class Test2BPoints extends LuceneTestCase {
}
private static Codec getCodec() {
return new FilterCodec("Lucene60", Codec.forName("Lucene60")) {
@Override
public PointsFormat pointsFormat() {
return new PointsFormat() {
@Override
public PointsWriter fieldsWriter(SegmentWriteState writeState) throws IOException {
int maxPointsInLeafNode = 1024;
double maxMBSortInHeap = 256.0;
return new Lucene60PointsWriter(writeState, maxPointsInLeafNode, maxMBSortInHeap);
}
@Override
public PointsReader fieldsReader(SegmentReadState readState) throws IOException {
return new Lucene60PointsReader(readState);
}
};
}
};
return Codec.forName("Lucene60");
}
}

View File

@ -82,7 +82,7 @@ public class TestOfflineSorter extends LuceneTestCase {
try (Directory dir = newDirectory()) {
SortInfo sortInfo = checkSort(dir, new OfflineSorter(dir, "foo", OfflineSorter.DEFAULT_COMPARATOR, BufferSize.megabytes(1), OfflineSorter.MAX_TEMPFILES),
generateRandom((int)OfflineSorter.MB * 20));
assertEquals(1, sortInfo.mergeRounds);
assertEquals(3, sortInfo.mergeRounds);
}
}

View File

@ -55,7 +55,7 @@ public class Test2BBKDPoints extends LuceneTestCase {
final int numDocs = (Integer.MAX_VALUE / 26) + 100;
BKDWriter w = new BKDWriter(numDocs, dir, "_0", 1, Long.BYTES, 1024, 256, 26L * numDocs);
BKDWriter w = new BKDWriter(numDocs, dir, "_0", 1, Long.BYTES, 26L * numDocs);
int counter = 0;
byte[] packedBytes = new byte[Long.BYTES];
for (int docID = 0; docID < numDocs; docID++) {
@ -88,7 +88,7 @@ public class Test2BBKDPoints extends LuceneTestCase {
final int numDocs = (Integer.MAX_VALUE / 26) + 100;
BKDWriter w = new BKDWriter(numDocs, dir, "_0", 2, Long.BYTES, 1024, 256, 26L * numDocs);
BKDWriter w = new BKDWriter(numDocs, dir, "_0", 2, Long.BYTES, 26L * numDocs);
int counter = 0;
byte[] packedBytes = new byte[2*Long.BYTES];
for (int docID = 0; docID < numDocs; docID++) {