From 00f0c3022baa0b7f0ffc293aff4daa450ce6432f Mon Sep 17 00:00:00 2001 From: Mike McCandless Date: Fri, 14 Apr 2017 08:15:32 -0400 Subject: [PATCH] LUCENE-7782: OfflineSorter now passes the number of items it will write to getWriter --- lucene/CHANGES.txt | 5 ++ .../simpletext/SimpleTextBKDWriter.java | 2 +- .../org/apache/lucene/util/OfflineSorter.java | 48 +++++++++++++------ .../org/apache/lucene/util/bkd/BKDWriter.java | 2 +- 4 files changed, 41 insertions(+), 16 deletions(-) diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt index 6f01ee3c91c..75a9153839d 100644 --- a/lucene/CHANGES.txt +++ b/lucene/CHANGES.txt @@ -97,6 +97,11 @@ Bug Fixes ArrayIndexOutOfBoundsException when byte blocks larger than 32 KB were added (Mike McCandless) +Improvements + +* LUCENE-7782: OfflineSorter now passes the total number of items it + will write to getWriter (Mike McCandless) + Other * LUCENE-7754: Inner classes should be static whenever possible. diff --git a/lucene/codecs/src/java/org/apache/lucene/codecs/simpletext/SimpleTextBKDWriter.java b/lucene/codecs/src/java/org/apache/lucene/codecs/simpletext/SimpleTextBKDWriter.java index d7674edf369..86697ebe0b9 100644 --- a/lucene/codecs/src/java/org/apache/lucene/codecs/simpletext/SimpleTextBKDWriter.java +++ b/lucene/codecs/src/java/org/apache/lucene/codecs/simpletext/SimpleTextBKDWriter.java @@ -881,7 +881,7 @@ final class SimpleTextBKDWriter implements Closeable { /** We write/read fixed-byte-width file that {@link OfflinePointReader} can read. */ @Override - protected ByteSequencesWriter getWriter(IndexOutput out) { + protected ByteSequencesWriter getWriter(IndexOutput out, long count) { return new ByteSequencesWriter(out) { @Override public void write(byte[] bytes, int off, int len) throws IOException { diff --git a/lucene/core/src/java/org/apache/lucene/util/OfflineSorter.java b/lucene/core/src/java/org/apache/lucene/util/OfflineSorter.java index ef2d8198b01..d27305793d1 100644 --- a/lucene/core/src/java/org/apache/lucene/util/OfflineSorter.java +++ b/lucene/core/src/java/org/apache/lucene/util/OfflineSorter.java @@ -24,6 +24,7 @@ import java.util.ArrayList; import java.util.Comparator; import java.util.List; import java.util.Locale; +import java.util.stream.Collectors; import org.apache.lucene.codecs.CodecUtil; import org.apache.lucene.store.ChecksumIndexInput; @@ -242,7 +243,7 @@ public class OfflineSorter { sortInfo = new SortInfo(); sortInfo.totalTime = System.currentTimeMillis(); - List segments = new ArrayList<>(); + List segments = new ArrayList<>(); int[] levelCounts = new int[1]; // So we can remove any partially written temp files on exception: @@ -257,7 +258,7 @@ public class OfflineSorter { assert isExhausted[0]; break; } - segments.add(sortPartition(trackingDir)); + segments.add(sortPartition(trackingDir, lineCount)); sortInfo.tempMergeFiles++; sortInfo.lineCount += lineCount; levelCounts[0]++; @@ -291,7 +292,7 @@ public class OfflineSorter { result = out.getName(); } } else { - result = segments.get(0); + result = segments.get(0).fileName; } // We should be explicitly removing all intermediate files ourselves unless there is an exception: @@ -313,10 +314,10 @@ public class OfflineSorter { } /** Sort a single partition in-memory. */ - protected String sortPartition(TrackingDirectoryWrapper trackingDir) throws IOException { + protected PartitionAndCount sortPartition(TrackingDirectoryWrapper trackingDir, int lineCount) throws IOException { try (IndexOutput tempFile = trackingDir.createTempOutput(tempFileNamePrefix, "sort", IOContext.DEFAULT); - ByteSequencesWriter out = getWriter(tempFile);) { + ByteSequencesWriter out = getWriter(tempFile, lineCount);) { BytesRef spare; @@ -324,17 +325,21 @@ public class OfflineSorter { BytesRefIterator iter = buffer.iterator(comparator); sortInfo.sortTime += System.currentTimeMillis() - start; + int count = 0; while ((spare = iter.next()) != null) { assert spare.length <= Short.MAX_VALUE; out.write(spare); + count++; } + + assert count == lineCount; // Clean up the buffer for the next partition. buffer.clear(); CodecUtil.writeFooter(out.out); - return tempFile.getName(); + return new PartitionAndCount(lineCount, tempFile.getName()); } } @@ -347,16 +352,21 @@ public class OfflineSorter { } /** Merge the most recent {@code maxTempFile} partitions into a new partition. */ - void mergePartitions(Directory trackingDir, List segments) throws IOException { + void mergePartitions(Directory trackingDir, List segments) throws IOException { long start = System.currentTimeMillis(); - List segmentsToMerge; + List segmentsToMerge; if (segments.size() > maxTempFiles) { segmentsToMerge = segments.subList(segments.size() - maxTempFiles, segments.size()); } else { segmentsToMerge = segments; } + long totalCount = 0; + for (PartitionAndCount segment : segmentsToMerge) { + totalCount += segment.count; + } + PriorityQueue queue = new PriorityQueue(segmentsToMerge.size()) { @Override protected boolean lessThan(FileAndTop a, FileAndTop b) { @@ -368,13 +378,13 @@ public class OfflineSorter { String newSegmentName = null; - try (ByteSequencesWriter writer = getWriter(trackingDir.createTempOutput(tempFileNamePrefix, "sort", IOContext.DEFAULT))) { + try (ByteSequencesWriter writer = getWriter(trackingDir.createTempOutput(tempFileNamePrefix, "sort", IOContext.DEFAULT), totalCount)) { newSegmentName = writer.out.getName(); // Open streams and read the top for each file for (int i = 0; i < segmentsToMerge.size(); i++) { - streams[i] = getReader(dir.openChecksumInput(segmentsToMerge.get(i), IOContext.READONCE), segmentsToMerge.get(i)); + streams[i] = getReader(dir.openChecksumInput(segmentsToMerge.get(i).fileName, IOContext.READONCE), segmentsToMerge.get(i).fileName); BytesRef item = null; try { item = streams[i].next(); @@ -417,10 +427,10 @@ public class OfflineSorter { IOUtils.close(streams); } - IOUtils.deleteFiles(trackingDir, segmentsToMerge); + IOUtils.deleteFiles(trackingDir, segmentsToMerge.stream().map(segment -> segment.fileName).collect(Collectors.toList())); segmentsToMerge.clear(); - segments.add(newSegmentName); + segments.add(new PartitionAndCount(totalCount, newSegmentName)); sortInfo.tempMergeFiles++; } @@ -478,7 +488,7 @@ public class OfflineSorter { } /** Subclasses can override to change how byte sequences are written to disk. */ - protected ByteSequencesWriter getWriter(IndexOutput out) throws IOException { + protected ByteSequencesWriter getWriter(IndexOutput out, long itemCount) throws IOException { return new ByteSequencesWriter(out); } @@ -594,5 +604,15 @@ public class OfflineSorter { /** Returns the comparator in use to sort entries */ public Comparator getComparator() { return comparator; - } + } + + private static class PartitionAndCount { + final long count; + final String fileName; + + public PartitionAndCount(long count, String fileName) { + this.count = count; + this.fileName = fileName; + } + } } diff --git a/lucene/core/src/java/org/apache/lucene/util/bkd/BKDWriter.java b/lucene/core/src/java/org/apache/lucene/util/bkd/BKDWriter.java index eeb40fa4115..8a2356b0106 100644 --- a/lucene/core/src/java/org/apache/lucene/util/bkd/BKDWriter.java +++ b/lucene/core/src/java/org/apache/lucene/util/bkd/BKDWriter.java @@ -888,7 +888,7 @@ public class BKDWriter implements Closeable { /** We write/read fixed-byte-width file that {@link OfflinePointReader} can read. */ @Override - protected ByteSequencesWriter getWriter(IndexOutput out) { + protected ByteSequencesWriter getWriter(IndexOutput out, long count) { return new ByteSequencesWriter(out) { @Override public void write(byte[] bytes, int off, int len) throws IOException {