LUCENE-8932: Move BKDReader's index off-heap when the input is a ByteBufferIndexInput.

This commit is contained in:
Adrien Grand 2019-10-25 08:58:09 +02:00
parent 3524494025
commit a939c08dba
3 changed files with 206 additions and 57 deletions

View File

@ -93,6 +93,9 @@ Optimizations
* LUCENE-8992: TopFieldCollector and TopScoreDocCollector can now share minimum scores across leaves * LUCENE-8992: TopFieldCollector and TopScoreDocCollector can now share minimum scores across leaves
concurrently. (Adrien Grand, Atri Sharma, Jim Ferenczi) concurrently. (Adrien Grand, Atri Sharma, Jim Ferenczi)
* LUCENE-8932: BKDReader's index is now stored off-heap when the IndexInput is
an instance of ByteBufferIndexInput. (Jack Conradson via Adrien Grand)
Bug Fixes Bug Fixes
* LUCENE-9001: Fix race condition in SetOnce. (Przemko Robakowski) * LUCENE-9001: Fix race condition in SetOnce. (Przemko Robakowski)

View File

@ -17,6 +17,7 @@
package org.apache.lucene.util.bkd; package org.apache.lucene.util.bkd;
import java.io.IOException; import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Arrays; import java.util.Arrays;
import org.apache.lucene.codecs.CodecUtil; import org.apache.lucene.codecs.CodecUtil;
@ -24,6 +25,8 @@ import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.PointValues; import org.apache.lucene.index.PointValues;
import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.store.ByteArrayDataInput; import org.apache.lucene.store.ByteArrayDataInput;
import org.apache.lucene.store.ByteBufferIndexInput;
import org.apache.lucene.store.DataInput;
import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexInput;
import org.apache.lucene.util.Accountable; import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRef;
@ -34,6 +37,124 @@ import org.apache.lucene.util.MathUtil;
* @lucene.experimental */ * @lucene.experimental */
public final class BKDReader extends PointValues implements Accountable { public final class BKDReader extends PointValues implements Accountable {
private static abstract class BKDInput extends DataInput implements Cloneable {
abstract long getMinLeafBlockFP();
abstract long ramBytesUsed();
abstract int getPosition();
abstract void setPosition(int pos) throws IOException;
@Override
public BKDInput clone() {
return (BKDInput)super.clone();
}
}
private static class BKDOffHeapInput extends BKDInput implements Cloneable {
private final IndexInput packedIndex;
private final long minLeafBlockFP;
BKDOffHeapInput(IndexInput packedIndex) throws IOException {
this.packedIndex = packedIndex;
this.minLeafBlockFP = packedIndex.clone().readVLong();
}
private BKDOffHeapInput(IndexInput packedIndex, long minLeadBlockFP) {
this.packedIndex = packedIndex;
this.minLeafBlockFP = minLeadBlockFP;
}
@Override
public BKDOffHeapInput clone() {
return new BKDOffHeapInput(packedIndex.clone(), minLeafBlockFP);
}
@Override
long getMinLeafBlockFP() {
return minLeafBlockFP;
}
@Override
long ramBytesUsed() {
return 0;
}
@Override
int getPosition() {
return (int)packedIndex.getFilePointer();
}
@Override
void setPosition(int pos) throws IOException {
packedIndex.seek(pos);
}
@Override
public byte readByte() throws IOException {
return packedIndex.readByte();
}
@Override
public void readBytes(byte[] b, int offset, int len) throws IOException {
packedIndex.readBytes(b, offset, len);
}
}
private static class BKDOnHeapInput extends BKDInput implements Cloneable {
private final ByteArrayDataInput packedIndex;
private final long minLeafBlockFP;
BKDOnHeapInput(IndexInput packedIndex, int numBytes) throws IOException {
byte[] packedBytes = new byte[numBytes];
packedIndex.readBytes(packedBytes, 0, numBytes);
this.packedIndex = new ByteArrayDataInput(packedBytes);
this.minLeafBlockFP = this.packedIndex.clone().readVLong();
}
private BKDOnHeapInput(ByteArrayDataInput packedIndex, long minLeadBlockFP) {
this.packedIndex = packedIndex;
this.minLeafBlockFP = minLeadBlockFP;
}
@Override
public BKDOnHeapInput clone() {
return new BKDOnHeapInput((ByteArrayDataInput)packedIndex.clone(), minLeafBlockFP);
}
@Override
long getMinLeafBlockFP() {
return minLeafBlockFP;
}
@Override
long ramBytesUsed() {
return packedIndex.length();
}
@Override
int getPosition() {
return packedIndex.getPosition();
}
@Override
void setPosition(int pos) {
packedIndex.setPosition(pos);
}
@Override
public byte readByte() throws IOException {
return packedIndex.readByte();
}
@Override
public void readBytes(byte[] b, int offset, int len) throws IOException {
packedIndex.readBytes(b, offset, len);
}
}
// Packed array of byte[] holding all split values in the full binary tree: // Packed array of byte[] holding all split values in the full binary tree:
final int leafNodeOffset; final int leafNodeOffset;
final int numDataDims; final int numDataDims;
@ -50,10 +171,18 @@ public final class BKDReader extends PointValues implements Accountable {
protected final int packedBytesLength; protected final int packedBytesLength;
protected final int packedIndexBytesLength; protected final int packedIndexBytesLength;
final byte[] packedIndex; final BKDInput packedIndex;
/** Caller must pre-seek the provided {@link IndexInput} to the index location that {@link BKDWriter#finish} returned */ /** Caller must pre-seek the provided {@link IndexInput} to the index location that {@link BKDWriter#finish} returned */
public BKDReader(IndexInput in) throws IOException { public BKDReader(IndexInput in) throws IOException {
this(in, in instanceof ByteBufferIndexInput);
}
/**
* Caller must pre-seek the provided {@link IndexInput} to the index location that {@link BKDWriter#finish} returned
* and specify {@code true} to store BKD off-heap ({@code false} otherwise)
*/
public BKDReader(IndexInput in, boolean offHeap) throws IOException {
version = CodecUtil.checkHeader(in, BKDWriter.CODEC_NAME, BKDWriter.VERSION_START, BKDWriter.VERSION_CURRENT); version = CodecUtil.checkHeader(in, BKDWriter.CODEC_NAME, BKDWriter.VERSION_START, BKDWriter.VERSION_CURRENT);
numDataDims = in.readVInt(); numDataDims = in.readVInt();
if (version >= BKDWriter.VERSION_SELECTIVE_INDEXING) { if (version >= BKDWriter.VERSION_SELECTIVE_INDEXING) {
@ -87,14 +216,18 @@ public final class BKDReader extends PointValues implements Accountable {
docCount = in.readVInt(); docCount = in.readVInt();
int numBytes = in.readVInt(); int numBytes = in.readVInt();
packedIndex = new byte[numBytes]; IndexInput slice = in.slice("packedIndex", in.getFilePointer(), numBytes);
in.readBytes(packedIndex, 0, numBytes); if (offHeap) {
packedIndex = new BKDOffHeapInput(slice);
} else {
packedIndex = new BKDOnHeapInput(slice, numBytes);
}
this.in = in; this.in = in;
} }
long getMinLeafBlockFP() { long getMinLeafBlockFP() {
return new ByteArrayDataInput(packedIndex).readVLong(); return packedIndex.getMinLeafBlockFP();
} }
/** Used to walk the in-heap index. The format takes advantage of the limited /** Used to walk the in-heap index. The format takes advantage of the limited
@ -108,7 +241,7 @@ public final class BKDReader extends PointValues implements Accountable {
private int splitDim; private int splitDim;
private final byte[][] splitPackedValueStack; private final byte[][] splitPackedValueStack;
// used to read the packed byte[] // used to read the packed byte[]
private final ByteArrayDataInput in; private final BKDInput in;
// holds the minimum (left most) leaf block file pointer for each level we've recursed to: // holds the minimum (left most) leaf block file pointer for each level we've recursed to:
private final long[] leafBlockFPStack; private final long[] leafBlockFPStack;
// holds the address, in the packed byte[] index, of the left-node of each level: // holds the address, in the packed byte[] index, of the left-node of each level:
@ -139,7 +272,7 @@ public final class BKDReader extends PointValues implements Accountable {
splitDims = new int[treeDepth+1]; splitDims = new int[treeDepth+1];
negativeDeltas = new boolean[numIndexDims*(treeDepth+1)]; negativeDeltas = new boolean[numIndexDims*(treeDepth+1)];
in = new ByteArrayDataInput(packedIndex); in = packedIndex.clone();
splitValuesStack[0] = new byte[packedIndexBytesLength]; splitValuesStack[0] = new byte[packedIndexBytesLength];
readNodeData(false); readNodeData(false);
scratch = new BytesRef(); scratch = new BytesRef();
@ -156,7 +289,11 @@ public final class BKDReader extends PointValues implements Accountable {
System.arraycopy(negativeDeltas, (level-1)*numIndexDims, negativeDeltas, level*numIndexDims, numIndexDims); System.arraycopy(negativeDeltas, (level-1)*numIndexDims, negativeDeltas, level*numIndexDims, numIndexDims);
assert splitDim != -1; assert splitDim != -1;
negativeDeltas[level*numIndexDims+splitDim] = true; negativeDeltas[level*numIndexDims+splitDim] = true;
try {
in.setPosition(nodePosition); in.setPosition(nodePosition);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
readNodeData(true); readNodeData(true);
} }
@ -186,7 +323,11 @@ public final class BKDReader extends PointValues implements Accountable {
System.arraycopy(negativeDeltas, (level-1)*numIndexDims, negativeDeltas, level*numIndexDims, numIndexDims); System.arraycopy(negativeDeltas, (level-1)*numIndexDims, negativeDeltas, level*numIndexDims, numIndexDims);
assert splitDim != -1; assert splitDim != -1;
negativeDeltas[level*numIndexDims+splitDim] = false; negativeDeltas[level*numIndexDims+splitDim] = false;
try {
in.setPosition(nodePosition); in.setPosition(nodePosition);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
readNodeData(false); readNodeData(false);
} }
@ -271,7 +412,7 @@ public final class BKDReader extends PointValues implements Accountable {
} }
private void readNodeData(boolean isLeft) { private void readNodeData(boolean isLeft) {
try {
leafBlockFPStack[level] = leafBlockFPStack[level - 1]; leafBlockFPStack[level] = leafBlockFPStack[level - 1];
// read leaf block FP delta // read leaf block FP delta
@ -317,6 +458,9 @@ public final class BKDReader extends PointValues implements Accountable {
leftNodePositions[level] = in.getPosition(); leftNodePositions[level] = in.getPosition();
rightNodePositions[level] = leftNodePositions[level] + leftNumBytes; rightNodePositions[level] = leftNodePositions[level] + leftNumBytes;
} }
} catch (IOException e) {
throw new UncheckedIOException(e);
}
} }
} }
@ -738,7 +882,7 @@ public final class BKDReader extends PointValues implements Accountable {
@Override @Override
public long ramBytesUsed() { public long ramBytesUsed() {
return packedIndex.length; return packedIndex.ramBytesUsed();
} }
@Override @Override

View File

@ -45,6 +45,8 @@ import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.NumericUtils; import org.apache.lucene.util.NumericUtils;
import org.apache.lucene.util.TestUtil; import org.apache.lucene.util.TestUtil;
import static com.carrotsearch.randomizedtesting.RandomizedTest.randomBoolean;
public class TestBKD extends LuceneTestCase { public class TestBKD extends LuceneTestCase {
public void testBasicInts1D() throws Exception { public void testBasicInts1D() throws Exception {
@ -63,7 +65,7 @@ public class TestBKD extends LuceneTestCase {
try (IndexInput in = dir.openInput("bkd", IOContext.DEFAULT)) { try (IndexInput in = dir.openInput("bkd", IOContext.DEFAULT)) {
in.seek(indexFP); in.seek(indexFP);
BKDReader r = new BKDReader(in); BKDReader r = new BKDReader(in, randomBoolean());
// Simple 1D range query: // Simple 1D range query:
final int queryMin = 42; final int queryMin = 42;
@ -165,7 +167,7 @@ public class TestBKD extends LuceneTestCase {
try (IndexInput in = dir.openInput("bkd", IOContext.DEFAULT)) { try (IndexInput in = dir.openInput("bkd", IOContext.DEFAULT)) {
in.seek(indexFP); in.seek(indexFP);
BKDReader r = new BKDReader(in); BKDReader r = new BKDReader(in, randomBoolean());
byte[] minPackedValue = r.getMinPackedValue(); byte[] minPackedValue = r.getMinPackedValue();
byte[] maxPackedValue = r.getMaxPackedValue(); byte[] maxPackedValue = r.getMaxPackedValue();
@ -293,7 +295,7 @@ public class TestBKD extends LuceneTestCase {
try (IndexInput in = dir.openInput("bkd", IOContext.DEFAULT)) { try (IndexInput in = dir.openInput("bkd", IOContext.DEFAULT)) {
in.seek(indexFP); in.seek(indexFP);
BKDReader r = new BKDReader(in); BKDReader r = new BKDReader(in, randomBoolean());
int iters = atLeast(100); int iters = atLeast(100);
for(int iter=0;iter<iters;iter++) { for(int iter=0;iter<iters;iter++) {
@ -785,7 +787,7 @@ public class TestBKD extends LuceneTestCase {
List<BKDReader> readers = new ArrayList<>(); List<BKDReader> readers = new ArrayList<>();
for(long fp : toMerge) { for(long fp : toMerge) {
in.seek(fp); in.seek(fp);
readers.add(new BKDReader(in)); readers.add(new BKDReader(in, randomBoolean()));
} }
out = dir.createOutput("bkd2", IOContext.DEFAULT); out = dir.createOutput("bkd2", IOContext.DEFAULT);
indexFP = w.merge(out, docMaps, readers); indexFP = w.merge(out, docMaps, readers);
@ -799,7 +801,7 @@ public class TestBKD extends LuceneTestCase {
} }
in.seek(indexFP); in.seek(indexFP);
BKDReader r = new BKDReader(in); BKDReader r = new BKDReader(in, randomBoolean());
int iters = atLeast(100); int iters = atLeast(100);
for(int iter=0;iter<iters;iter++) { for(int iter=0;iter<iters;iter++) {
@ -1073,7 +1075,7 @@ public class TestBKD extends LuceneTestCase {
IndexInput in = dir.openInput("bkd", IOContext.DEFAULT); IndexInput in = dir.openInput("bkd", IOContext.DEFAULT);
in.seek(fp); in.seek(fp);
BKDReader r = new BKDReader(in); BKDReader r = new BKDReader(in, randomBoolean());
r.intersect(new IntersectVisitor() { r.intersect(new IntersectVisitor() {
int lastDocID = -1; int lastDocID = -1;
@ -1187,7 +1189,7 @@ public class TestBKD extends LuceneTestCase {
IndexInput in = dir.openInput("bkd", IOContext.DEFAULT); IndexInput in = dir.openInput("bkd", IOContext.DEFAULT);
in.seek(fp); in.seek(fp);
BKDReader r = new BKDReader(in); BKDReader r = new BKDReader(in, randomBoolean());
int[] count = new int[1]; int[] count = new int[1];
r.intersect(new IntersectVisitor() { r.intersect(new IntersectVisitor() {
@ -1242,7 +1244,7 @@ public class TestBKD extends LuceneTestCase {
IndexInput in = dir.openInput("bkd", IOContext.DEFAULT); IndexInput in = dir.openInput("bkd", IOContext.DEFAULT);
in.seek(fp); in.seek(fp);
BKDReader r = new BKDReader(in); BKDReader r = new BKDReader(in, randomBoolean());
int[] count = new int[1]; int[] count = new int[1];
r.intersect(new IntersectVisitor() { r.intersect(new IntersectVisitor() {