diff --git a/lucene/core/src/java/org/apache/lucene/util/bkd/BKDWriter.java b/lucene/core/src/java/org/apache/lucene/util/bkd/BKDWriter.java index 488fa43af8b..5f36155efab 100644 --- a/lucene/core/src/java/org/apache/lucene/util/bkd/BKDWriter.java +++ b/lucene/core/src/java/org/apache/lucene/util/bkd/BKDWriter.java @@ -227,7 +227,7 @@ public class BKDWriter implements Closeable { private void spillToOffline() throws IOException { // For each .add we just append to this input file, then in .finish we sort this input and resursively build the tree: - offlinePointWriter = new OfflinePointWriter(tempDir, tempFileNamePrefix, packedBytesLength, longOrds, "spill", singleValuePerDoc); + offlinePointWriter = new OfflinePointWriter(tempDir, tempFileNamePrefix, packedBytesLength, longOrds, "spill", 0, singleValuePerDoc); tempInput = offlinePointWriter.out; PointReader reader = heapPointWriter.getReader(0, pointCount); for(int i=0;i toCloseHeroically = new ArrayList<>(); + boolean success = false; try { //long t0 = System.nanoTime(); @@ -872,7 +875,8 @@ public class BKDWriter implements Closeable { ordBitSet, out, minPackedValue, maxPackedValue, splitPackedValues, - leafBlockFPs); + leafBlockFPs, + toCloseHeroically); for(PathSlice slice : sortedPointWriters) { slice.writer.destroy(); @@ -886,11 +890,8 @@ public class BKDWriter implements Closeable { success = true; } finally { if (success == false) { - if (tempInput != null) { - IOUtils.closeWhileHandlingException(tempInput); - } IOUtils.deleteFilesIgnoringExceptions(tempDir, tempDir.getCreatedFiles()); - tempInput = null; + IOUtils.closeWhileHandlingException(toCloseHeroically); } } @@ -1010,6 +1011,8 @@ public class BKDWriter implements Closeable { // Now we mark ords that fall into the right half, so we can partition on all other dims that are not the split dim: // Read the split value, then mark all ords in the right tree (larger than the split value): + + // TODO: find a way to also checksum this reader? If we changed to markLeftTree, and scanned the final chunk, it could work? try (PointReader reader = source.writer.getReader(source.start + source.count - rightCount, rightCount)) { boolean result = reader.next(); assert result; @@ -1069,12 +1072,11 @@ public class BKDWriter implements Closeable { } /** Pull a partition back into heap once the point count is low enough while recursing. */ - private PathSlice switchToHeap(PathSlice source) throws IOException { + private PathSlice switchToHeap(PathSlice source, List toCloseHeroically) throws IOException { int count = Math.toIntExact(source.count); - try ( - PointWriter writer = new HeapPointWriter(count, count, packedBytesLength, longOrds, singleValuePerDoc); - PointReader reader = source.writer.getReader(source.start, source.count); - ) { + // Not inside the try because we don't want to close it here: + PointReader reader = source.writer.getSharedReader(source.start, source.count, toCloseHeroically); + try (PointWriter writer = new HeapPointWriter(count, count, packedBytesLength, longOrds, singleValuePerDoc)) { for(int i=0;i toCloseHeroically) throws IOException { for(PathSlice slice : slices) { assert slice.count == slices[0].count; @@ -1104,7 +1107,7 @@ public class BKDWriter implements Closeable { if (numDims == 1 && slices[0].writer instanceof OfflinePointWriter && slices[0].count <= maxPointsSortInHeap) { // Special case for 1D, to cutover to heap once we recurse deeply enough: - slices[0] = switchToHeap(slices[0]); + slices[0] = switchToHeap(slices[0], toCloseHeroically); } if (nodeID >= leafNodeOffset) { @@ -1114,7 +1117,7 @@ public class BKDWriter implements Closeable { if (slices[dim].writer instanceof HeapPointWriter == false) { // Adversarial cases can cause this, e.g. very lopsided data, all equal points, such that we started // offline, but then kept splitting only in one dimension, and so never had to rewrite into heap writer - slices[dim] = switchToHeap(slices[dim]); + slices[dim] = switchToHeap(slices[dim], toCloseHeroically); } PathSlice source = slices[dim]; @@ -1212,7 +1215,8 @@ public class BKDWriter implements Closeable { for(int dim=0;dim toCloseHeroically) { + return new HeapPointReader(blocks, valuesPerBlock, packedBytesLength, ords, ordsLong, docIDs, (int) start, nextWrite, singleValuePerDoc); + } + @Override public void close() { closed = true; diff --git a/lucene/core/src/java/org/apache/lucene/util/bkd/OfflinePointReader.java b/lucene/core/src/java/org/apache/lucene/util/bkd/OfflinePointReader.java index 18afda4659c..ebb2fd14a7f 100644 --- a/lucene/core/src/java/org/apache/lucene/util/bkd/OfflinePointReader.java +++ b/lucene/core/src/java/org/apache/lucene/util/bkd/OfflinePointReader.java @@ -39,6 +39,9 @@ final class OfflinePointReader extends PointReader { private boolean longOrds; private boolean checked; + // File name we are reading + final String name; + OfflinePointReader(Directory tempDir, String tempFileName, int packedBytesLength, long start, long length, boolean longOrds, boolean singleValuePerDoc) throws IOException { this.singleValuePerDoc = singleValuePerDoc; @@ -67,6 +70,7 @@ final class OfflinePointReader extends PointReader { // at another level of the BKDWriter recursion in = tempDir.openInput(tempFileName, IOContext.READONCE); } + name = tempFileName; long seekFP = start * bytesPerDoc; in.seek(seekFP); @@ -121,6 +125,7 @@ final class OfflinePointReader extends PointReader { public void close() throws IOException { try { if (countLeft == 0 && in instanceof ChecksumIndexInput && checked == false) { + //System.out.println("NOW CHECK: " + name); checked = true; CodecUtil.checkFooter((ChecksumIndexInput) in); } diff --git a/lucene/core/src/java/org/apache/lucene/util/bkd/OfflinePointWriter.java b/lucene/core/src/java/org/apache/lucene/util/bkd/OfflinePointWriter.java index 53148769fe8..2261b81380d 100644 --- a/lucene/core/src/java/org/apache/lucene/util/bkd/OfflinePointWriter.java +++ b/lucene/core/src/java/org/apache/lucene/util/bkd/OfflinePointWriter.java @@ -16,9 +16,12 @@ */ package org.apache.lucene.util.bkd; +import java.io.Closeable; import java.io.IOException; +import java.util.List; import org.apache.lucene.codecs.CodecUtil; +import org.apache.lucene.store.ChecksumIndexInput; import org.apache.lucene.store.Directory; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexOutput; @@ -34,13 +37,19 @@ final class OfflinePointWriter implements PointWriter { private boolean closed; // true if ords are written as long (8 bytes), else 4 bytes private boolean longOrds; + private OfflinePointReader sharedReader; + private long nextSharedRead; + final long expectedCount; - public OfflinePointWriter(Directory tempDir, String tempFileNamePrefix, int packedBytesLength, boolean longOrds, String desc, boolean singleValuePerDoc) throws IOException { + /** Create a new writer with an unknown number of incoming points */ + public OfflinePointWriter(Directory tempDir, String tempFileNamePrefix, int packedBytesLength, + boolean longOrds, String desc, long expectedCount, boolean singleValuePerDoc) throws IOException { this.out = tempDir.createTempOutput(tempFileNamePrefix, "bkd_" + desc, IOContext.DEFAULT); this.tempDir = tempDir; this.packedBytesLength = packedBytesLength; this.longOrds = longOrds; this.singleValuePerDoc = singleValuePerDoc; + this.expectedCount = expectedCount; } /** Initializes on an already written/closed file, just so consumers can use {@link #getReader} to read the file. */ @@ -52,6 +61,7 @@ final class OfflinePointWriter implements PointWriter { closed = true; this.longOrds = longOrds; this.singleValuePerDoc = singleValuePerDoc; + this.expectedCount = 0; } @Override @@ -68,18 +78,37 @@ final class OfflinePointWriter implements PointWriter { } out.writeInt(docID); count++; + assert expectedCount == 0 || count <= expectedCount; } @Override public PointReader getReader(long start, long length) throws IOException { assert closed; assert start + length <= count: "start=" + start + " length=" + length + " count=" + count; + assert expectedCount == 0 || count == expectedCount; return new OfflinePointReader(tempDir, out.getName(), packedBytesLength, start, length, longOrds, singleValuePerDoc); } + @Override + public PointReader getSharedReader(long start, long length, List toCloseHeroically) throws IOException { + if (sharedReader == null) { + assert start == 0; + assert length <= count; + sharedReader = new OfflinePointReader(tempDir, out.getName(), packedBytesLength, 0, count, longOrds, singleValuePerDoc); + toCloseHeroically.add(sharedReader); + // Make sure the OfflinePointReader intends to verify its checksum: + assert sharedReader.in instanceof ChecksumIndexInput; + } else { + assert start == nextSharedRead: "start=" + start + " length=" + length + " nextSharedRead=" + nextSharedRead; + } + nextSharedRead += length; + return sharedReader; + } + @Override public void close() throws IOException { if (closed == false) { + assert sharedReader == null; try { CodecUtil.writeFooter(out); } finally { @@ -91,6 +120,12 @@ final class OfflinePointWriter implements PointWriter { @Override public void destroy() throws IOException { + if (sharedReader != null) { + // At this point, the shared reader should have done a full sweep of the file: + assert nextSharedRead == count; + sharedReader.close(); + sharedReader = null; + } tempDir.deleteFile(out.getName()); } diff --git a/lucene/core/src/java/org/apache/lucene/util/bkd/PointWriter.java b/lucene/core/src/java/org/apache/lucene/util/bkd/PointWriter.java index 2f94967cb8a..d19f6e547b2 100644 --- a/lucene/core/src/java/org/apache/lucene/util/bkd/PointWriter.java +++ b/lucene/core/src/java/org/apache/lucene/util/bkd/PointWriter.java @@ -19,6 +19,7 @@ package org.apache.lucene.util.bkd; import java.io.Closeable; import java.io.IOException; +import java.util.List; /** Appends many points, and then at the end provides a {@link PointReader} to iterate * those points. This abstracts away whether we write to disk, or use simple arrays @@ -30,6 +31,9 @@ interface PointWriter extends Closeable { /** Returns a {@link PointReader} iterator to step through all previously added points */ PointReader getReader(long startPoint, long length) throws IOException; + /** Returns the single shared reader, used at multiple times during the recursion, to read previously added points */ + PointReader getSharedReader(long startPoint, long length, List toCloseHeroically) throws IOException; + /** Removes any temp files behind this writer */ void destroy() throws IOException; }