LUCENE-7782: OfflineSorter now passes the number of items it will write to getWriter

This commit is contained in:
Mike McCandless 2017-04-14 08:15:32 -04:00
parent bc6ff493b0
commit 00f0c3022b
4 changed files with 41 additions and 16 deletions

View File

@ -97,6 +97,11 @@ Bug Fixes
ArrayIndexOutOfBoundsException when byte blocks larger than 32 KB
were added (Mike McCandless)
Improvements
* LUCENE-7782: OfflineSorter now passes the total number of items it
will write to getWriter (Mike McCandless)
Other
* LUCENE-7754: Inner classes should be static whenever possible.

View File

@ -881,7 +881,7 @@ final class SimpleTextBKDWriter implements Closeable {
/** We write/read fixed-byte-width file that {@link OfflinePointReader} can read. */
@Override
protected ByteSequencesWriter getWriter(IndexOutput out) {
protected ByteSequencesWriter getWriter(IndexOutput out, long count) {
return new ByteSequencesWriter(out) {
@Override
public void write(byte[] bytes, int off, int len) throws IOException {

View File

@ -24,6 +24,7 @@ import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Locale;
import java.util.stream.Collectors;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.store.ChecksumIndexInput;
@ -242,7 +243,7 @@ public class OfflineSorter {
sortInfo = new SortInfo();
sortInfo.totalTime = System.currentTimeMillis();
List<String> segments = new ArrayList<>();
List<PartitionAndCount> segments = new ArrayList<>();
int[] levelCounts = new int[1];
// So we can remove any partially written temp files on exception:
@ -257,7 +258,7 @@ public class OfflineSorter {
assert isExhausted[0];
break;
}
segments.add(sortPartition(trackingDir));
segments.add(sortPartition(trackingDir, lineCount));
sortInfo.tempMergeFiles++;
sortInfo.lineCount += lineCount;
levelCounts[0]++;
@ -291,7 +292,7 @@ public class OfflineSorter {
result = out.getName();
}
} else {
result = segments.get(0);
result = segments.get(0).fileName;
}
// We should be explicitly removing all intermediate files ourselves unless there is an exception:
@ -313,10 +314,10 @@ public class OfflineSorter {
}
/** Sort a single partition in-memory. */
protected String sortPartition(TrackingDirectoryWrapper trackingDir) throws IOException {
protected PartitionAndCount sortPartition(TrackingDirectoryWrapper trackingDir, int lineCount) throws IOException {
try (IndexOutput tempFile = trackingDir.createTempOutput(tempFileNamePrefix, "sort", IOContext.DEFAULT);
ByteSequencesWriter out = getWriter(tempFile);) {
ByteSequencesWriter out = getWriter(tempFile, lineCount);) {
BytesRef spare;
@ -324,17 +325,21 @@ public class OfflineSorter {
BytesRefIterator iter = buffer.iterator(comparator);
sortInfo.sortTime += System.currentTimeMillis() - start;
int count = 0;
while ((spare = iter.next()) != null) {
assert spare.length <= Short.MAX_VALUE;
out.write(spare);
count++;
}
assert count == lineCount;
// Clean up the buffer for the next partition.
buffer.clear();
CodecUtil.writeFooter(out.out);
return tempFile.getName();
return new PartitionAndCount(lineCount, tempFile.getName());
}
}
@ -347,16 +352,21 @@ public class OfflineSorter {
}
/** Merge the most recent {@code maxTempFile} partitions into a new partition. */
void mergePartitions(Directory trackingDir, List<String> segments) throws IOException {
void mergePartitions(Directory trackingDir, List<PartitionAndCount> segments) throws IOException {
long start = System.currentTimeMillis();
List<String> segmentsToMerge;
List<PartitionAndCount> segmentsToMerge;
if (segments.size() > maxTempFiles) {
segmentsToMerge = segments.subList(segments.size() - maxTempFiles, segments.size());
} else {
segmentsToMerge = segments;
}
long totalCount = 0;
for (PartitionAndCount segment : segmentsToMerge) {
totalCount += segment.count;
}
PriorityQueue<FileAndTop> queue = new PriorityQueue<FileAndTop>(segmentsToMerge.size()) {
@Override
protected boolean lessThan(FileAndTop a, FileAndTop b) {
@ -368,13 +378,13 @@ public class OfflineSorter {
String newSegmentName = null;
try (ByteSequencesWriter writer = getWriter(trackingDir.createTempOutput(tempFileNamePrefix, "sort", IOContext.DEFAULT))) {
try (ByteSequencesWriter writer = getWriter(trackingDir.createTempOutput(tempFileNamePrefix, "sort", IOContext.DEFAULT), totalCount)) {
newSegmentName = writer.out.getName();
// Open streams and read the top for each file
for (int i = 0; i < segmentsToMerge.size(); i++) {
streams[i] = getReader(dir.openChecksumInput(segmentsToMerge.get(i), IOContext.READONCE), segmentsToMerge.get(i));
streams[i] = getReader(dir.openChecksumInput(segmentsToMerge.get(i).fileName, IOContext.READONCE), segmentsToMerge.get(i).fileName);
BytesRef item = null;
try {
item = streams[i].next();
@ -417,10 +427,10 @@ public class OfflineSorter {
IOUtils.close(streams);
}
IOUtils.deleteFiles(trackingDir, segmentsToMerge);
IOUtils.deleteFiles(trackingDir, segmentsToMerge.stream().map(segment -> segment.fileName).collect(Collectors.toList()));
segmentsToMerge.clear();
segments.add(newSegmentName);
segments.add(new PartitionAndCount(totalCount, newSegmentName));
sortInfo.tempMergeFiles++;
}
@ -478,7 +488,7 @@ public class OfflineSorter {
}
/** Subclasses can override to change how byte sequences are written to disk. */
protected ByteSequencesWriter getWriter(IndexOutput out) throws IOException {
protected ByteSequencesWriter getWriter(IndexOutput out, long itemCount) throws IOException {
return new ByteSequencesWriter(out);
}
@ -594,5 +604,15 @@ public class OfflineSorter {
/** Returns the comparator in use to sort entries */
public Comparator<BytesRef> getComparator() {
return comparator;
}
}
private static class PartitionAndCount {
final long count;
final String fileName;
public PartitionAndCount(long count, String fileName) {
this.count = count;
this.fileName = fileName;
}
}
}

View File

@ -888,7 +888,7 @@ public class BKDWriter implements Closeable {
/** We write/read fixed-byte-width file that {@link OfflinePointReader} can read. */
@Override
protected ByteSequencesWriter getWriter(IndexOutput out) {
protected ByteSequencesWriter getWriter(IndexOutput out, long count) {
return new ByteSequencesWriter(out) {
@Override
public void write(byte[] bytes, int off, int len) throws IOException {