From e648d601efb8f500c10b9524175bbaa85345142d Mon Sep 17 00:00:00 2001 From: Adrien Grand Date: Fri, 25 Oct 2019 08:58:09 +0200 Subject: [PATCH] LUCENE-8932: Move BKDReader's index off-heap when the input is a ByteBufferIndexInput. --- lucene/CHANGES.txt | 3 + .../org/apache/lucene/util/bkd/BKDReader.java | 242 ++++++++++++++---- .../org/apache/lucene/util/bkd/TestBKD.java | 18 +- 3 files changed, 206 insertions(+), 57 deletions(-) diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt index 1d9d4b9eaf8..89406a0174e 100644 --- a/lucene/CHANGES.txt +++ b/lucene/CHANGES.txt @@ -32,6 +32,9 @@ Optimizations * LUCENE-8992: TopFieldCollector and TopScoreDocCollector can now share minimum scores across leaves concurrently. (Adrien Grand, Atri Sharma, Jim Ferenczi) +* LUCENE-8932: BKDReader's index is now stored off-heap when the IndexInput is + an instance of ByteBufferIndexInput. (Jack Conradson via Adrien Grand) + Bug Fixes * LUCENE-9001: Fix race condition in SetOnce. (Przemko Robakowski) diff --git a/lucene/core/src/java/org/apache/lucene/util/bkd/BKDReader.java b/lucene/core/src/java/org/apache/lucene/util/bkd/BKDReader.java index 9b300b92274..f2bb278d01b 100644 --- a/lucene/core/src/java/org/apache/lucene/util/bkd/BKDReader.java +++ b/lucene/core/src/java/org/apache/lucene/util/bkd/BKDReader.java @@ -17,12 +17,15 @@ package org.apache.lucene.util.bkd; import java.io.IOException; +import java.io.UncheckedIOException; import org.apache.lucene.codecs.CodecUtil; import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.PointValues; import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.store.ByteArrayDataInput; +import org.apache.lucene.store.ByteBufferIndexInput; +import org.apache.lucene.store.DataInput; import org.apache.lucene.store.IndexInput; import org.apache.lucene.util.Accountable; import org.apache.lucene.util.BytesRef; @@ -34,6 +37,124 @@ import org.apache.lucene.util.MathUtil; * @lucene.experimental */ public final class BKDReader extends PointValues implements Accountable { + + private static abstract class BKDInput extends DataInput implements Cloneable { + abstract long getMinLeafBlockFP(); + abstract long ramBytesUsed(); + + abstract int getPosition(); + abstract void setPosition(int pos) throws IOException; + + @Override + public BKDInput clone() { + return (BKDInput)super.clone(); + } + } + + private static class BKDOffHeapInput extends BKDInput implements Cloneable { + + private final IndexInput packedIndex; + private final long minLeafBlockFP; + + BKDOffHeapInput(IndexInput packedIndex) throws IOException { + this.packedIndex = packedIndex; + this.minLeafBlockFP = packedIndex.clone().readVLong(); + } + + private BKDOffHeapInput(IndexInput packedIndex, long minLeadBlockFP) { + this.packedIndex = packedIndex; + this.minLeafBlockFP = minLeadBlockFP; + } + + @Override + public BKDOffHeapInput clone() { + return new BKDOffHeapInput(packedIndex.clone(), minLeafBlockFP); + } + + @Override + long getMinLeafBlockFP() { + return minLeafBlockFP; + } + + @Override + long ramBytesUsed() { + return 0; + } + + @Override + int getPosition() { + return (int)packedIndex.getFilePointer(); + } + + @Override + void setPosition(int pos) throws IOException { + packedIndex.seek(pos); + } + + @Override + public byte readByte() throws IOException { + return packedIndex.readByte(); + } + + @Override + public void readBytes(byte[] b, int offset, int len) throws IOException { + packedIndex.readBytes(b, offset, len); + } + } + + private static class BKDOnHeapInput extends BKDInput implements Cloneable { + + private final ByteArrayDataInput packedIndex; + private final long minLeafBlockFP; + + BKDOnHeapInput(IndexInput packedIndex, int numBytes) throws IOException { + byte[] packedBytes = new byte[numBytes]; + packedIndex.readBytes(packedBytes, 0, numBytes); + this.packedIndex = new ByteArrayDataInput(packedBytes); + this.minLeafBlockFP = this.packedIndex.clone().readVLong(); + } + + private BKDOnHeapInput(ByteArrayDataInput packedIndex, long minLeadBlockFP) { + this.packedIndex = packedIndex; + this.minLeafBlockFP = minLeadBlockFP; + } + + @Override + public BKDOnHeapInput clone() { + return new BKDOnHeapInput((ByteArrayDataInput)packedIndex.clone(), minLeafBlockFP); + } + + @Override + long getMinLeafBlockFP() { + return minLeafBlockFP; + } + + @Override + long ramBytesUsed() { + return packedIndex.length(); + } + + @Override + int getPosition() { + return packedIndex.getPosition(); + } + + @Override + void setPosition(int pos) { + packedIndex.setPosition(pos); + } + + @Override + public byte readByte() throws IOException { + return packedIndex.readByte(); + } + + @Override + public void readBytes(byte[] b, int offset, int len) throws IOException { + packedIndex.readBytes(b, offset, len); + } + } + // Packed array of byte[] holding all split values in the full binary tree: final int leafNodeOffset; final int numDataDims; @@ -50,10 +171,18 @@ public final class BKDReader extends PointValues implements Accountable { protected final int packedBytesLength; protected final int packedIndexBytesLength; - final byte[] packedIndex; + final BKDInput packedIndex; /** Caller must pre-seek the provided {@link IndexInput} to the index location that {@link BKDWriter#finish} returned */ public BKDReader(IndexInput in) throws IOException { + this(in, in instanceof ByteBufferIndexInput); + } + + /** + * Caller must pre-seek the provided {@link IndexInput} to the index location that {@link BKDWriter#finish} returned + * and specify {@code true} to store BKD off-heap ({@code false} otherwise) + */ + public BKDReader(IndexInput in, boolean offHeap) throws IOException { version = CodecUtil.checkHeader(in, BKDWriter.CODEC_NAME, BKDWriter.VERSION_START, BKDWriter.VERSION_CURRENT); numDataDims = in.readVInt(); if (version >= BKDWriter.VERSION_SELECTIVE_INDEXING) { @@ -87,14 +216,18 @@ public final class BKDReader extends PointValues implements Accountable { docCount = in.readVInt(); int numBytes = in.readVInt(); - packedIndex = new byte[numBytes]; - in.readBytes(packedIndex, 0, numBytes); + IndexInput slice = in.slice("packedIndex", in.getFilePointer(), numBytes); + if (offHeap) { + packedIndex = new BKDOffHeapInput(slice); + } else { + packedIndex = new BKDOnHeapInput(slice, numBytes); + } this.in = in; } long getMinLeafBlockFP() { - return new ByteArrayDataInput(packedIndex).readVLong(); + return packedIndex.getMinLeafBlockFP(); } /** Used to walk the in-heap index. The format takes advantage of the limited @@ -108,7 +241,7 @@ public final class BKDReader extends PointValues implements Accountable { private int splitDim; private final byte[][] splitPackedValueStack; // used to read the packed byte[] - private final ByteArrayDataInput in; + private final BKDInput in; // holds the minimum (left most) leaf block file pointer for each level we've recursed to: private final long[] leafBlockFPStack; // holds the address, in the packed byte[] index, of the left-node of each level: @@ -139,7 +272,7 @@ public final class BKDReader extends PointValues implements Accountable { splitDims = new int[treeDepth+1]; negativeDeltas = new boolean[numIndexDims*(treeDepth+1)]; - in = new ByteArrayDataInput(packedIndex); + in = packedIndex.clone(); splitValuesStack[0] = new byte[packedIndexBytesLength]; readNodeData(false); scratch = new BytesRef(); @@ -156,7 +289,11 @@ public final class BKDReader extends PointValues implements Accountable { System.arraycopy(negativeDeltas, (level-1)*numIndexDims, negativeDeltas, level*numIndexDims, numIndexDims); assert splitDim != -1; negativeDeltas[level*numIndexDims+splitDim] = true; - in.setPosition(nodePosition); + try { + in.setPosition(nodePosition); + } catch (IOException e) { + throw new UncheckedIOException(e); + } readNodeData(true); } @@ -186,7 +323,11 @@ public final class BKDReader extends PointValues implements Accountable { System.arraycopy(negativeDeltas, (level-1)*numIndexDims, negativeDeltas, level*numIndexDims, numIndexDims); assert splitDim != -1; negativeDeltas[level*numIndexDims+splitDim] = false; - in.setPosition(nodePosition); + try { + in.setPosition(nodePosition); + } catch (IOException e) { + throw new UncheckedIOException(e); + } readNodeData(false); } @@ -271,51 +412,54 @@ public final class BKDReader extends PointValues implements Accountable { } private void readNodeData(boolean isLeft) { + try { + leafBlockFPStack[level] = leafBlockFPStack[level - 1]; - leafBlockFPStack[level] = leafBlockFPStack[level-1]; - - // read leaf block FP delta - if (isLeft == false) { - leafBlockFPStack[level] += in.readVLong(); - } - - if (isLeafNode()) { - splitDim = -1; - } else { - - // read split dim, prefix, firstDiffByteDelta encoded as int: - int code = in.readVInt(); - splitDim = code % numIndexDims; - splitDims[level] = splitDim; - code /= numIndexDims; - int prefix = code % (1+bytesPerDim); - int suffix = bytesPerDim - prefix; - - if (splitValuesStack[level] == null) { - splitValuesStack[level] = new byte[packedIndexBytesLength]; + // read leaf block FP delta + if (isLeft == false) { + leafBlockFPStack[level] += in.readVLong(); } - System.arraycopy(splitValuesStack[level-1], 0, splitValuesStack[level], 0, packedIndexBytesLength); - if (suffix > 0) { - int firstDiffByteDelta = code / (1+bytesPerDim); - if (negativeDeltas[level*numIndexDims + splitDim]) { - firstDiffByteDelta = -firstDiffByteDelta; + + if (isLeafNode()) { + splitDim = -1; + } else { + + // read split dim, prefix, firstDiffByteDelta encoded as int: + int code = in.readVInt(); + splitDim = code % numIndexDims; + splitDims[level] = splitDim; + code /= numIndexDims; + int prefix = code % (1 + bytesPerDim); + int suffix = bytesPerDim - prefix; + + if (splitValuesStack[level] == null) { + splitValuesStack[level] = new byte[packedIndexBytesLength]; + } + System.arraycopy(splitValuesStack[level - 1], 0, splitValuesStack[level], 0, packedIndexBytesLength); + if (suffix > 0) { + int firstDiffByteDelta = code / (1 + bytesPerDim); + if (negativeDeltas[level * numIndexDims + splitDim]) { + firstDiffByteDelta = -firstDiffByteDelta; + } + int oldByte = splitValuesStack[level][splitDim * bytesPerDim + prefix] & 0xFF; + splitValuesStack[level][splitDim * bytesPerDim + prefix] = (byte) (oldByte + firstDiffByteDelta); + in.readBytes(splitValuesStack[level], splitDim * bytesPerDim + prefix + 1, suffix - 1); + } else { + // our split value is == last split value in this dim, which can happen when there are many duplicate values } - int oldByte = splitValuesStack[level][splitDim*bytesPerDim+prefix] & 0xFF; - splitValuesStack[level][splitDim*bytesPerDim+prefix] = (byte) (oldByte + firstDiffByteDelta); - in.readBytes(splitValuesStack[level], splitDim*bytesPerDim+prefix+1, suffix-1); - } else { - // our split value is == last split value in this dim, which can happen when there are many duplicate values - } - int leftNumBytes; - if (nodeID * 2 < leafNodeOffset) { - leftNumBytes = in.readVInt(); - } else { - leftNumBytes = 0; - } + int leftNumBytes; + if (nodeID * 2 < leafNodeOffset) { + leftNumBytes = in.readVInt(); + } else { + leftNumBytes = 0; + } - leftNodePositions[level] = in.getPosition(); - rightNodePositions[level] = leftNodePositions[level] + leftNumBytes; + leftNodePositions[level] = in.getPosition(); + rightNodePositions[level] = leftNodePositions[level] + leftNumBytes; + } + } catch (IOException e) { + throw new UncheckedIOException(e); } } } @@ -738,7 +882,7 @@ public final class BKDReader extends PointValues implements Accountable { @Override public long ramBytesUsed() { - return packedIndex.length; + return packedIndex.ramBytesUsed(); } @Override diff --git a/lucene/core/src/test/org/apache/lucene/util/bkd/TestBKD.java b/lucene/core/src/test/org/apache/lucene/util/bkd/TestBKD.java index 7c537a4758d..1ea2b4607ad 100644 --- a/lucene/core/src/test/org/apache/lucene/util/bkd/TestBKD.java +++ b/lucene/core/src/test/org/apache/lucene/util/bkd/TestBKD.java @@ -46,6 +46,8 @@ import org.apache.lucene.util.LuceneTestCase; import org.apache.lucene.util.NumericUtils; import org.apache.lucene.util.TestUtil; +import static com.carrotsearch.randomizedtesting.RandomizedTest.randomBoolean; + public class TestBKD extends LuceneTestCase { public void testBasicInts1D() throws Exception { @@ -64,7 +66,7 @@ public class TestBKD extends LuceneTestCase { try (IndexInput in = dir.openInput("bkd", IOContext.DEFAULT)) { in.seek(indexFP); - BKDReader r = new BKDReader(in); + BKDReader r = new BKDReader(in, randomBoolean()); // Simple 1D range query: final int queryMin = 42; @@ -166,7 +168,7 @@ public class TestBKD extends LuceneTestCase { try (IndexInput in = dir.openInput("bkd", IOContext.DEFAULT)) { in.seek(indexFP); - BKDReader r = new BKDReader(in); + BKDReader r = new BKDReader(in, randomBoolean()); byte[] minPackedValue = r.getMinPackedValue(); byte[] maxPackedValue = r.getMaxPackedValue(); @@ -294,7 +296,7 @@ public class TestBKD extends LuceneTestCase { try (IndexInput in = dir.openInput("bkd", IOContext.DEFAULT)) { in.seek(indexFP); - BKDReader r = new BKDReader(in); + BKDReader r = new BKDReader(in, randomBoolean()); int iters = atLeast(100); for(int iter=0;iter readers = new ArrayList<>(); for(long fp : toMerge) { in.seek(fp); - readers.add(new BKDReader(in)); + readers.add(new BKDReader(in, randomBoolean())); } out = dir.createOutput("bkd2", IOContext.DEFAULT); indexFP = w.merge(out, docMaps, readers); @@ -800,7 +802,7 @@ public class TestBKD extends LuceneTestCase { } in.seek(indexFP); - BKDReader r = new BKDReader(in); + BKDReader r = new BKDReader(in, randomBoolean()); int iters = atLeast(100); for(int iter=0;iter