LUCENE-6901: speed up dimensional values indexing and merging

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1716189 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael McCandless 2015-11-24 16:06:36 +00:00
parent ee91bffb99
commit 50a1a78be5
9 changed files with 807 additions and 136 deletions

View File

@ -89,6 +89,11 @@ Optimizations
each leaf block in the default codec, to reduce the index
size (Mike McCandless)
* LUCENE-6901: Optimize dimensional values indexing: use faster
IntroSorter instead of InPlaceMergeSorter, and specialize 1D
merging to merge sort the already sorted segments instead of
re-indexing (Mike McCandless)
Changes in Runtime Behavior
* LUCENE-6789: IndexSearcher's default Similarity is changed to BM25Similarity.

View File

@ -37,11 +37,10 @@ public abstract class DimensionalWriter implements Closeable {
/** Write all values contained in the provided reader */
public abstract void writeField(FieldInfo fieldInfo, DimensionalReader values) throws IOException;
/** Default merge implementation to merge incoming dimensional readers by visiting all their points and
* adding to this writer */
public void merge(MergeState mergeState) throws IOException {
for (FieldInfo fieldInfo : mergeState.mergeFieldInfos) {
if (fieldInfo.getDimensionCount() != 0) {
/** Default naive merge implemenation for one field: it just re-indexes all the values
* from the incoming segment. The default codec overrides this for 1D fields and uses
* a faster but more complex implementation. */
protected void mergeOneField(MergeState mergeState, FieldInfo fieldInfo) throws IOException {
writeField(fieldInfo,
new DimensionalReader() {
@Override
@ -98,6 +97,14 @@ public abstract class DimensionalWriter implements Closeable {
}
});
}
/** Default merge implementation to merge incoming dimensional readers by visiting all their points and
* adding to this writer */
public void merge(MergeState mergeState) throws IOException {
for (FieldInfo fieldInfo : mergeState.mergeFieldInfos) {
if (fieldInfo.getDimensionCount() != 0) {
mergeOneField(mergeState, fieldInfo);
}
}
}
}

View File

@ -71,7 +71,9 @@ public class Lucene60DimensionalReader extends DimensionalReader implements Clos
int fieldNumber = indexIn.readVInt();
long fp = indexIn.readVLong();
dataIn.seek(fp);
readers.put(fieldNumber, new BKDReader(dataIn));
BKDReader reader = new BKDReader(dataIn);
readers.put(fieldNumber, reader);
//reader.verify(readState.segmentInfo.maxDoc());
}
CodecUtil.checkFooter(indexIn);
success = true;

View File

@ -19,7 +19,9 @@ package org.apache.lucene.codecs.lucene60;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.lucene.codecs.CodecUtil;
@ -28,10 +30,13 @@ import org.apache.lucene.codecs.DimensionalWriter;
import org.apache.lucene.index.DimensionalValues.IntersectVisitor;
import org.apache.lucene.index.DimensionalValues.Relation;
import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.FieldInfos;
import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.index.MergeState;
import org.apache.lucene.index.SegmentWriteState;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.bkd.BKDReader;
import org.apache.lucene.util.bkd.BKDWriter;
/** Writes dimensional values */
@ -104,6 +109,61 @@ public class Lucene60DimensionalWriter extends DimensionalWriter implements Clos
}
}
@Override
public void merge(MergeState mergeState) throws IOException {
for(DimensionalReader reader : mergeState.dimensionalReaders) {
if (reader instanceof Lucene60DimensionalReader == false) {
// We can only bulk merge when all to-be-merged segments use our format:
super.merge(mergeState);
return;
}
}
for (FieldInfo fieldInfo : mergeState.mergeFieldInfos) {
if (fieldInfo.getDimensionCount() != 0) {
if (fieldInfo.getDimensionCount() == 1) {
//System.out.println("MERGE: field=" + fieldInfo.name);
// Optimize the 1D case to use BKDWriter.merge, which does a single merge sort of the
// already sorted incoming segments, instead of trying to sort all points again as if
// we were simply reindexing them:
try (BKDWriter writer = new BKDWriter(writeState.directory,
writeState.segmentInfo.name,
fieldInfo.getDimensionCount(),
fieldInfo.getDimensionNumBytes(),
maxPointsInLeafNode,
maxMBSortInHeap)) {
List<BKDReader> bkdReaders = new ArrayList<>();
List<MergeState.DocMap> docMaps = new ArrayList<>();
List<Integer> docIDBases = new ArrayList<>();
for(int i=0;i<mergeState.dimensionalReaders.length;i++) {
DimensionalReader reader = mergeState.dimensionalReaders[i];
Lucene60DimensionalReader reader60 = (Lucene60DimensionalReader) reader;
if (reader60 != null) {
// TODO: I could just use the merged fieldInfo.number instead of resolving to this
// reader's FieldInfo, right? Field numbers are always consistent across segments,
// since when?
FieldInfos readerFieldInfos = mergeState.fieldInfos[i];
FieldInfo readerFieldInfo = readerFieldInfos.fieldInfo(fieldInfo.name);
if (readerFieldInfo != null) {
BKDReader bkdReader = reader60.readers.get(readerFieldInfo.number);
if (bkdReader != null) {
docIDBases.add(mergeState.docBase[i]);
bkdReaders.add(bkdReader);
docMaps.add(mergeState.docMaps[i]);
}
}
}
}
indexFPs.put(fieldInfo.name, writer.merge(dataOut, docMaps, bkdReaders, docIDBases));
}
} else {
mergeOneField(mergeState, fieldInfo);
}
}
}
}
@Override
public void close() throws IOException {

View File

@ -415,6 +415,8 @@ class DocumentsWriterPerThread {
return null;
}
long t0 = System.nanoTime();
if (infoStream.isEnabled("DWPT")) {
infoStream.message("DWPT", "flush postings as segment " + flushState.segmentInfo.name + " numDocs=" + numDocsInRAM);
}
@ -458,6 +460,9 @@ class DocumentsWriterPerThread {
FlushedSegment fs = new FlushedSegment(segmentInfoPerCommit, flushState.fieldInfos,
segmentDeletes, flushState.liveDocs, flushState.delCountOnFlush);
sealFlushedSegment(fs);
if (infoStream.isEnabled("DWPT")) {
infoStream.message("DWPT", "flush time " + ((System.nanoTime() - t0)/1000000.0) + " msec");
}
return fs;
} catch (Throwable th) {

View File

@ -25,7 +25,9 @@ import org.apache.lucene.index.DimensionalValues.IntersectVisitor;
import org.apache.lucene.index.DimensionalValues.Relation;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.RamUsageEstimator;
import org.apache.lucene.util.StringHelper;
/** Handles intersection of an multi-dimensional shape in byte[] space with a block KD-tree previously written with {@link BKDWriter}.
*
@ -34,7 +36,7 @@ import org.apache.lucene.util.RamUsageEstimator;
public class BKDReader implements Accountable {
// Packed array of byte[] holding all split values in the full binary tree:
final private byte[] splitPackedValues;
final private long[] leafBlockFPs;
final long[] leafBlockFPs;
final private int leafNodeOffset;
final int numDims;
final int bytesPerDim;
@ -55,10 +57,12 @@ public class BKDReader implements Accountable {
leafNodeOffset = numLeaves;
splitPackedValues = new byte[(1+bytesPerDim)*numLeaves];
// TODO: don't write split packed values[0]!
in.readBytes(splitPackedValues, 0, splitPackedValues.length);
// Tree is fully balanced binary tree, so number of nodes = numLeaves-1, except our nodeIDs are 1-based (splitPackedValues[0] is unused):
leafBlockFPs = new long[numLeaves];
// Read the file pointers to the start of each leaf block:
long[] leafBlockFPs = new long[numLeaves];
long lastFP = 0;
for(int i=0;i<numLeaves;i++) {
long delta = in.readVLong();
@ -66,6 +70,47 @@ public class BKDReader implements Accountable {
lastFP += delta;
}
// Possibly rotate the leaf block FPs, if the index not fully balanced binary tree (only happens
// if it was created by BKDWriter.merge). In this case the leaf nodes may straddle the two bottom
// levels of the binary tree:
if (numDims == 1 && numLeaves > 1) {
//System.out.println("BKDR: numLeaves=" + numLeaves);
int levelCount = 2;
while (true) {
//System.out.println(" cycle levelCount=" + levelCount);
if (numLeaves >= levelCount && numLeaves <= 2*levelCount) {
int lastLevel = 2*(numLeaves - levelCount);
assert lastLevel >= 0;
/*
System.out.println("BKDR: lastLevel=" + lastLevel + " vs " + levelCount);
System.out.println("FPs before:");
for(int i=0;i<leafBlockFPs.length;i++) {
System.out.println(" " + i + " " + leafBlockFPs[i]);
}
*/
if (lastLevel != 0) {
// Last level is partially filled, so we must rotate the leaf FPs to match. We do this here, after loading
// at read-time, so that we can still delta code them on disk at write:
//System.out.println("BKDR: now rotate index");
long[] newLeafBlockFPs = new long[numLeaves];
System.arraycopy(leafBlockFPs, lastLevel, newLeafBlockFPs, 0, leafBlockFPs.length - lastLevel);
System.arraycopy(leafBlockFPs, 0, newLeafBlockFPs, leafBlockFPs.length - lastLevel, lastLevel);
leafBlockFPs = newLeafBlockFPs;
}
/*
System.out.println("FPs:");
for(int i=0;i<leafBlockFPs.length;i++) {
System.out.println(" " + i + " " + leafBlockFPs[i]);
}
*/
break;
}
levelCount *= 2;
}
}
this.leafBlockFPs = leafBlockFPs;
this.in = in;
}
@ -81,7 +126,120 @@ public class BKDReader implements Accountable {
this.splitPackedValues = splitPackedValues;
}
private static final class IntersectState {
private static class VerifyVisitor implements IntersectVisitor {
byte[] cellMinPacked;
byte[] cellMaxPacked;
byte[] lastPackedValue;
final int numDims;
final int bytesPerDim;
final int maxDoc;
public VerifyVisitor(int numDims, int bytesPerDim, int maxDoc) {
this.numDims = numDims;
this.bytesPerDim = bytesPerDim;
this.maxDoc = maxDoc;
}
@Override
public void visit(int docID) {
throw new UnsupportedOperationException();
}
@Override
public void visit(int docID, byte[] packedValue) {
if (docID < 0 || docID >= maxDoc) {
throw new RuntimeException("docID=" + docID + " is out of bounds of 0.." + maxDoc);
}
for(int dim=0;dim<numDims;dim++) {
if (StringHelper.compare(bytesPerDim, cellMinPacked, dim*bytesPerDim, packedValue, dim*bytesPerDim) > 0) {
throw new RuntimeException("value=" + new BytesRef(packedValue, dim*bytesPerDim, bytesPerDim) + " for docID=" + docID + " dim=" + dim + " is less than this leaf block's minimum=" + new BytesRef(cellMinPacked, dim*bytesPerDim, bytesPerDim));
}
if (StringHelper.compare(bytesPerDim, cellMaxPacked, dim*bytesPerDim, packedValue, dim*bytesPerDim) < 0) {
throw new RuntimeException("value=" + new BytesRef(packedValue, dim*bytesPerDim, bytesPerDim) + " for docID=" + docID + " dim=" + dim + " is greater than this leaf block's maximum=" + new BytesRef(cellMaxPacked, dim*bytesPerDim, bytesPerDim));
}
}
if (numDims == 1) {
// With only 1D, all values should always be in sorted order
if (lastPackedValue == null) {
lastPackedValue = Arrays.copyOf(packedValue, packedValue.length);
} else if (BKDUtil.compare(bytesPerDim, lastPackedValue, 0, packedValue, 0) > 0) {
throw new RuntimeException("value=" + new BytesRef(packedValue) + " for docID=" + docID + " dim=0" + " sorts before last value=" + new BytesRef(lastPackedValue));
} else {
System.arraycopy(packedValue, 0, lastPackedValue, 0, bytesPerDim);
}
}
}
@Override
public Relation compare(byte[] minPackedValue, byte[] maxPackedValue) {
throw new UnsupportedOperationException();
}
}
/** Only used for debugging, to make sure all values in each leaf block fall within the range expected by the index */
// TODO: maybe we can get this into CheckIndex?
public void verify(int maxDoc) throws IOException {
//System.out.println("BKDR.verify this=" + this);
// Visits every doc in every leaf block and confirms that
// their values agree with the index:
byte[] rootMinPacked = new byte[packedBytesLength];
byte[] rootMaxPacked = new byte[packedBytesLength];
Arrays.fill(rootMaxPacked, (byte) 0xff);
IntersectState state = new IntersectState(in.clone(), numDims, packedBytesLength,
maxPointsInLeafNode,
new VerifyVisitor(numDims, bytesPerDim, maxDoc));
verify(state, 1, rootMinPacked, rootMaxPacked);
}
private void verify(IntersectState state, int nodeID, byte[] cellMinPacked, byte[] cellMaxPacked) throws IOException {
if (nodeID >= leafNodeOffset) {
int leafID = nodeID - leafNodeOffset;
// In the unbalanced case it's possible the left most node only has one child:
if (leafID < leafBlockFPs.length) {
//System.out.println("CHECK nodeID=" + nodeID + " leaf=" + (nodeID-leafNodeOffset) + " offset=" + leafNodeOffset + " fp=" + leafBlockFPs[leafID]);
//System.out.println("BKDR.verify leafID=" + leafID + " nodeID=" + nodeID + " fp=" + leafBlockFPs[leafID] + " min=" + new BytesRef(cellMinPacked) + " max=" + new BytesRef(cellMaxPacked));
// Leaf node: check that all values are in fact in bounds:
VerifyVisitor visitor = (VerifyVisitor) state.visitor;
visitor.cellMinPacked = cellMinPacked;
visitor.cellMaxPacked = cellMaxPacked;
int count = readDocIDs(state.in, leafBlockFPs[leafID], state.scratchDocIDs);
visitDocValues(state.commonPrefixLengths, state.scratchPackedValue, state.in, state.scratchDocIDs, count, state.visitor);
} else {
//System.out.println("BKDR.verify skip leafID=" + leafID);
}
} else {
// Non-leaf node:
int address = nodeID * (bytesPerDim+1);
int splitDim = splitPackedValues[address] & 0xff;
assert splitDim < numDims;
byte[] splitPackedValue = new byte[packedBytesLength];
// Recurse on left sub-tree:
System.arraycopy(cellMaxPacked, 0, splitPackedValue, 0, packedBytesLength);
System.arraycopy(splitPackedValues, address+1, splitPackedValue, splitDim*bytesPerDim, bytesPerDim);
verify(state,
2*nodeID,
cellMinPacked, splitPackedValue);
// Recurse on right sub-tree:
System.arraycopy(cellMinPacked, 0, splitPackedValue, 0, packedBytesLength);
System.arraycopy(splitPackedValues, address+1, splitPackedValue, splitDim*bytesPerDim, bytesPerDim);
verify(state,
2*nodeID+1,
splitPackedValue, cellMaxPacked);
}
}
static final class IntersectState {
final IndexInput in;
final int[] scratchDocIDs;
final byte[] scratchPackedValue;
@ -119,6 +277,7 @@ public class BKDReader implements Accountable {
if (nodeID >= leafNodeOffset) {
//System.out.println("ADDALL");
visitDocIDs(state.in, leafBlockFPs[nodeID-leafNodeOffset], state.visitor);
// TODO: we can assert that the first value here in fact matches what the index claimed?
} else {
addAll(state, 2*nodeID);
addAll(state, 2*nodeID+1);
@ -196,38 +355,43 @@ public class BKDReader implements Accountable {
}
if (nodeID >= leafNodeOffset) {
//System.out.println("FILTER");
// TODO: we can assert that the first value here in fact matches what the index claimed?
int leafID = nodeID - leafNodeOffset;
// In the unbalanced case it's possible the left most node only has one child:
if (leafID < leafBlockFPs.length) {
// Leaf node; scan and filter all points in this block:
int count = readDocIDs(state.in, leafBlockFPs[nodeID-leafNodeOffset], state.scratchDocIDs);
int count = readDocIDs(state.in, leafBlockFPs[leafID], state.scratchDocIDs);
// Again, this time reading values and checking with the visitor
visitDocValues(state.commonPrefixLengths, state.scratchPackedValue, state.in, state.scratchDocIDs, count, state.visitor);
}
} else {
// Non-leaf node: recurse on the split left and right nodes
// TODO: save the unused 1 byte prefix (it's always 0) in the 1d case here:
int address = nodeID * (bytesPerDim+1);
int splitDim = splitPackedValues[address] & 0xff;
assert splitDim < numDims;
// TODO: can we alloc & reuse this up front?
byte[] splitValue = new byte[bytesPerDim];
System.arraycopy(splitPackedValues, address+1, splitValue, 0, bytesPerDim);
// TODO: can we alloc & reuse this up front?
byte[] splitPackedValue = new byte[packedBytesLength];
// Recurse on left sub-tree:
System.arraycopy(cellMaxPacked, 0, splitPackedValue, 0, packedBytesLength);
System.arraycopy(splitValue, 0, splitPackedValue, splitDim*bytesPerDim, bytesPerDim);
System.arraycopy(splitPackedValues, address+1, splitPackedValue, splitDim*bytesPerDim, bytesPerDim);
intersect(state,
2*nodeID,
cellMinPacked, splitPackedValue);
// Recurse on right sub-tree:
System.arraycopy(cellMinPacked, 0, splitPackedValue, 0, packedBytesLength);
System.arraycopy(splitValue, 0, splitPackedValue, splitDim*bytesPerDim, bytesPerDim);
System.arraycopy(splitPackedValues, address+1, splitPackedValue, splitDim*bytesPerDim, bytesPerDim);
intersect(state,
2*nodeID+1,
splitPackedValue, cellMaxPacked);

View File

@ -69,8 +69,12 @@ public final class BKDUtil {
/** Returns positive int if a &gt; b, negative int if a &lt; b and 0 if a == b */
public static int compare(int bytesPerDim, byte[] a, int aIndex, byte[] b, int bIndex) {
assert aIndex >= 0;
assert bIndex >= 0;
int aOffset = aIndex*bytesPerDim;
int bOffset = bIndex*bytesPerDim;
for(int i=0;i<bytesPerDim;i++) {
int cmp = (a[aIndex*bytesPerDim+i]&0xff) - (b[bIndex*bytesPerDim+i]&0xff);
int cmp = (a[aOffset+i]&0xff) - (b[bOffset+i]&0xff);
if (cmp != 0) {
return cmp;
}

View File

@ -20,10 +20,13 @@ package org.apache.lucene.util.bkd;
import java.io.Closeable;
import java.io.EOFException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.index.MergeState;
import org.apache.lucene.store.ByteArrayDataInput;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IndexInput;
@ -33,11 +36,13 @@ import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefBuilder;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.InPlaceMergeSorter;
import org.apache.lucene.util.IntroSorter;
import org.apache.lucene.util.LongBitSet;
import org.apache.lucene.util.OfflineSorter;
import org.apache.lucene.util.OfflineSorter.ByteSequencesWriter;
import org.apache.lucene.util.OfflineSorter;
import org.apache.lucene.util.PriorityQueue;
import org.apache.lucene.util.RamUsageEstimator;
import org.apache.lucene.util.StringHelper;
// TODO
// - the compression is somewhat stupid now (delta vInt for 1024 docIDs, no compression for the byte[] values even though they have high locality)
@ -210,16 +215,339 @@ public class BKDWriter implements Closeable {
pointCount++;
}
private static class MergeReader {
final BKDReader bkd;
final BKDReader.IntersectState state;
final MergeState.DocMap docMap;
/** Base offset for all our docIDs */
final int docIDBase;
/** Current doc ID */
public int docID;
/** Which doc in this block we are up to */
private int docBlockUpto;
/** How many docs in the current block */
private int docsInBlock;
/** Which leaf block we are up to */
private int blockID;
public MergeReader(BKDReader bkd, MergeState.DocMap docMap, int docIDBase) throws IOException {
this.bkd = bkd;
state = new BKDReader.IntersectState(bkd.in.clone(),
bkd.numDims,
bkd.packedBytesLength,
bkd.maxPointsInLeafNode,
null);
this.docMap = docMap;
this.docIDBase = docIDBase;
long minFP = Long.MAX_VALUE;
//System.out.println("MR.init " + this + " bkdreader=" + bkd + " leafBlockFPs.length=" + bkd.leafBlockFPs.length);
for(long fp : bkd.leafBlockFPs) {
minFP = Math.min(minFP, fp);
//System.out.println(" leaf fp=" + fp);
}
state.in.seek(minFP);
}
public boolean next() throws IOException {
//System.out.println("MR.next this=" + this);
while (true) {
if (docBlockUpto == docsInBlock) {
if (blockID == bkd.leafBlockFPs.length) {
//System.out.println(" done!");
return false;
}
//System.out.println(" new block @ fp=" + state.in.getFilePointer());
docsInBlock = bkd.readDocIDs(state.in, state.in.getFilePointer(), state.scratchDocIDs);
docBlockUpto = 0;
for(int dim=0;dim<bkd.numDims;dim++) {
int prefix = state.in.readVInt();
state.commonPrefixLengths[dim] = prefix;
if (prefix > 0) {
state.in.readBytes(state.scratchPackedValue, dim*bkd.bytesPerDim, prefix);
}
}
blockID++;
}
int oldDocID = state.scratchDocIDs[docBlockUpto++];
int mappedDocID;
if (docMap == null) {
mappedDocID = oldDocID;
} else {
mappedDocID = docMap.get(oldDocID);
}
for(int dim=0;dim<bkd.numDims;dim++) {
int prefix = state.commonPrefixLengths[dim];
state.in.readBytes(state.scratchPackedValue, dim*bkd.bytesPerDim + prefix, bkd.bytesPerDim - prefix);
}
if (mappedDocID != -1) {
// Not deleted!
docID = mappedDocID;
return true;
}
}
}
}
private static class BKDMergeQueue extends PriorityQueue<MergeReader> {
private final int bytesPerDim;
public BKDMergeQueue(int bytesPerDim, int maxSize) {
super(maxSize);
this.bytesPerDim = bytesPerDim;
}
@Override
public boolean lessThan(MergeReader a, MergeReader b) {
assert a != b;
int cmp = StringHelper.compare(bytesPerDim, a.state.scratchPackedValue, 0, b.state.scratchPackedValue, 0);
if (cmp < 0) {
return true;
} else if (cmp > 0) {
return false;
}
// Tie break by sorting smaller docIDs earlier:
return a.docIDBase < b.docIDBase;
}
}
/** More efficient bulk-add for incoming {@link BKDReader}s. This does a merge sort of the already
* sorted values and currently only works when numDims==1. */
public long merge(IndexOutput out, List<MergeState.DocMap> docMaps, List<BKDReader> readers, List<Integer> docIDBases) throws IOException {
if (numDims != 1) {
throw new UnsupportedOperationException("numDims must be 1 but got " + numDims);
}
if (pointCount != 0) {
throw new IllegalStateException("cannot mix add and merge");
}
//System.out.println("BKDW.merge segs=" + readers.size());
// Catch user silliness:
if (heapPointWriter == null && tempInput == null) {
throw new IllegalStateException("already finished");
}
// Mark that we already finished:
heapPointWriter = null;
assert docMaps == null || readers.size() == docMaps.size();
BKDMergeQueue queue = new BKDMergeQueue(bytesPerDim, readers.size());
for(int i=0;i<readers.size();i++) {
BKDReader bkd = readers.get(i);
MergeState.DocMap docMap;
if (docMaps == null) {
docMap = null;
} else {
docMap = docMaps.get(i);
}
MergeReader reader = new MergeReader(bkd, docMap, docIDBases.get(i));
if (reader.next()) {
queue.add(reader);
}
}
int leafCount = 0;
List<Long> leafBlockFPs = new ArrayList<>();
List<byte[]> leafBlockStartValues = new ArrayList<>();
// Target halfway between min and max allowed for the leaf:
int pointsPerLeafBlock = (int) (0.75 * maxPointsInLeafNode);
//System.out.println("POINTS PER: " + pointsPerLeafBlock);
byte[] lastPackedValue = new byte[bytesPerDim];
byte[] firstPackedValue = new byte[bytesPerDim];
long valueCount = 0;
// Buffer up each leaf block's docs and values
int[] leafBlockDocIDs = new int[maxPointsInLeafNode];
byte[][] leafBlockPackedValues = new byte[maxPointsInLeafNode][];
for(int i=0;i<maxPointsInLeafNode;i++) {
leafBlockPackedValues[i] = new byte[packedBytesLength];
}
Arrays.fill(commonPrefixLengths, bytesPerDim);
while (queue.size() != 0) {
MergeReader reader = queue.top();
// System.out.println("iter reader=" + reader);
// NOTE: doesn't work with subclasses (e.g. SimpleText!)
leafBlockDocIDs[leafCount] = reader.docIDBase + reader.docID;
System.arraycopy(reader.state.scratchPackedValue, 0, leafBlockPackedValues[leafCount], 0, packedBytesLength);
assert numDims > 1 || valueInOrder(valueCount++, lastPackedValue, reader.state.scratchPackedValue);
if (leafCount == 0) {
if (leafBlockFPs.size() > 0) {
// Save the first (minimum) value in each leaf block except the first, to build the split value index in the end:
leafBlockStartValues.add(Arrays.copyOf(reader.state.scratchPackedValue, bytesPerDim));
}
Arrays.fill(commonPrefixLengths, bytesPerDim);
System.arraycopy(reader.state.scratchPackedValue, 0, firstPackedValue, 0, bytesPerDim);
} else {
// Find per-dim common prefix:
for(int dim=0;dim<numDims;dim++) {
int offset = dim * bytesPerDim;
for(int j=0;j<commonPrefixLengths[dim];j++) {
if (firstPackedValue[offset+j] != reader.state.scratchPackedValue[offset+j]) {
commonPrefixLengths[dim] = j;
break;
}
}
}
}
leafCount++;
if (reader.next()) {
queue.updateTop();
} else {
// This segment was exhausted
queue.pop();
}
// We write a block once we hit exactly the max count ... this is different from
// when we flush a new segment, where we write between max/2 and max per leaf block,
// so merged segments will behave differently from newly flushed segments:
if (leafCount == pointsPerLeafBlock || queue.size() == 0) {
leafBlockFPs.add(out.getFilePointer());
checkMaxLeafNodeCount(leafBlockFPs.size());
writeLeafBlockDocs(out, leafBlockDocIDs, 0, leafCount);
writeCommonPrefixes(out, commonPrefixLengths, firstPackedValue);
// Write the full values:
for (int i=0;i<leafCount;i++) {
writeLeafBlockPackedValue(out, commonPrefixLengths, leafBlockPackedValues[i]);
}
leafCount = 0;
}
}
long indexFP = out.getFilePointer();
int numInnerNodes = leafBlockStartValues.size();
//System.out.println("BKDW: now rotate numInnerNodes=" + numInnerNodes + " leafBlockStarts=" + leafBlockStartValues.size());
byte[] index = new byte[(1+numInnerNodes) * (1+bytesPerDim)];
rotateToTree(1, 0, numInnerNodes, index, leafBlockStartValues);
long[] arr = new long[leafBlockFPs.size()];
for(int i=0;i<leafBlockFPs.size();i++) {
arr[i] = leafBlockFPs.get(i);
}
writeIndex(out, arr, index);
return indexFP;
}
// TODO: there must be a simpler way?
private void rotateToTree(int nodeID, int offset, int count, byte[] index, List<byte[]> leafBlockStartValues) {
//System.out.println("ROTATE: nodeID=" + nodeID + " offset=" + offset + " count=" + count + " bpd=" + bytesPerDim + " index.length=" + index.length);
if (count == 1) {
// Leaf index node
//System.out.println(" leaf index node");
//System.out.println(" index[" + nodeID + "] = blockStartValues[" + offset + "]");
System.arraycopy(leafBlockStartValues.get(offset), 0, index, nodeID*(1+bytesPerDim)+1, bytesPerDim);
} else if (count > 1) {
// Internal index node: binary partition of count
int countAtLevel = 1;
int totalCount = 0;
while (true) {
int countLeft = count - totalCount;
//System.out.println(" cycle countLeft=" + countLeft + " coutAtLevel=" + countAtLevel);
if (countLeft <= countAtLevel) {
// This is the last level, possibly partially filled:
int lastLeftCount = Math.min(countAtLevel/2, countLeft);
assert lastLeftCount >= 0;
int leftHalf = (totalCount-1)/2 + lastLeftCount;
int rootOffset = offset + leftHalf;
/*
System.out.println(" last left count " + lastLeftCount);
System.out.println(" leftHalf " + leftHalf + " rightHalf=" + (count-leftHalf-1));
System.out.println(" rootOffset=" + rootOffset);
*/
System.arraycopy(leafBlockStartValues.get(rootOffset), 0, index, nodeID*(1+bytesPerDim)+1, bytesPerDim);
//System.out.println(" index[" + nodeID + "] = blockStartValues[" + rootOffset + "]");
// TODO: we could optimize/specialize, when we know it's simply fully balanced binary tree
// under here, to save this while loop on each recursion
// Recurse left
rotateToTree(2*nodeID, offset, leftHalf, index, leafBlockStartValues);
// Recurse right
rotateToTree(2*nodeID+1, rootOffset+1, count-leftHalf-1, index, leafBlockStartValues);
return;
}
totalCount += countAtLevel;
countAtLevel *= 2;
}
} else {
assert count == 0;
}
}
// TODO: if we fixed each partition step to just record the file offset at the "split point", we could probably handle variable length
// encoding and not have our own ByteSequencesReader/Writer
/** If dim=-1 we sort by docID, else by that dim. */
/** Sort the heap writer by the specified dim */
private void sortHeapPointWriter(final HeapPointWriter writer, int start, int length, int dim) {
assert pointCount < Integer.MAX_VALUE;
//int[] swapCount = new int[1];
//int[] cmpCount = new int[1];
//System.out.println("SORT length=" + length);
// All buffered points are still in heap; just do in-place sort:
new InPlaceMergeSorter() {
new IntroSorter() {
private final byte[] pivotPackedValue = new byte[bytesPerDim];
private int pivotDocID;
private long pivotOrd;
@Override
protected void setPivot(int i) {
pivotDocID = writer.docIDs[i];
pivotOrd = writer.ords[i];
int block = i / writer.valuesPerBlock;
int index = i % writer.valuesPerBlock;
System.arraycopy(writer.blocks.get(block), index*packedBytesLength+dim*bytesPerDim, pivotPackedValue, 0, bytesPerDim);
}
@Override
protected int comparePivot(int j) {
//cmpCount[0]++;
int block = j / writer.valuesPerBlock;
int index = j % writer.valuesPerBlock;
assert index >= 0: "index=" + index + " j=" + j;
int cmp = BKDUtil.compare(bytesPerDim, pivotPackedValue, 0, writer.blocks.get(block), index*numDims+dim);
if (cmp != 0) {
return cmp;
}
// Tie-break
cmp = Integer.compare(pivotDocID, writer.docIDs[j]);
if (cmp != 0) {
return cmp;
}
return Long.compare(pivotOrd, writer.ords[j]);
}
@Override
protected void swap(int i, int j) {
int docID = writer.docIDs[i];
@ -230,21 +558,27 @@ public class BKDWriter implements Closeable {
writer.ords[i] = writer.ords[j];
writer.ords[j] = ord;
byte[] blockI = writer.blocks.get(i / writer.valuesPerBlock);
int indexI = (i % writer.valuesPerBlock) * packedBytesLength;
byte[] blockJ = writer.blocks.get(j / writer.valuesPerBlock);
int indexJ = (j % writer.valuesPerBlock) * packedBytesLength;
// scratch1 = values[i]
writer.readPackedValue(i, scratch1);
// scratch2 = values[j]
writer.readPackedValue(j, scratch2);
// values[i] = scratch2
writer.writePackedValue(i, scratch2);
System.arraycopy(blockI, indexI, scratch1, 0, packedBytesLength);
// values[i] = values[j]
System.arraycopy(blockJ, indexJ, blockI, indexI, packedBytesLength);
// values[j] = scratch1
writer.writePackedValue(j, scratch1);
System.arraycopy(scratch1, 0, blockJ, indexJ, packedBytesLength);
}
@Override
protected int compare(int i, int j) {
writer.readPackedValue(i, scratch1);
writer.readPackedValue(j, scratch2);
int cmp = BKDUtil.compare(bytesPerDim, scratch1, dim, scratch2, dim);
//cmpCount[0]++;
int blockI = i / writer.valuesPerBlock;
int dimI = i % writer.valuesPerBlock;
int blockJ = j / writer.valuesPerBlock;
int dimJ = j % writer.valuesPerBlock;
int cmp = BKDUtil.compare(bytesPerDim, writer.blocks.get(blockI), dimI*numDims+dim, writer.blocks.get(blockJ), dimJ*numDims+dim);
if (cmp != 0) {
return cmp;
}
@ -258,6 +592,7 @@ public class BKDWriter implements Closeable {
return Long.compare(writer.ords[i], writer.ords[j]);
}
}.sort(start, start+length);
//System.out.println("LEN=" + length + " SWAP=" + swapCount[0] + " CMP=" + cmpCount[0]);
}
private PointWriter sort(int dim) throws IOException {
@ -278,7 +613,10 @@ public class BKDWriter implements Closeable {
sorted.copyFrom(heapPointWriter);
}
//long t0 = System.nanoTime();
sortHeapPointWriter(sorted, 0, (int) pointCount, dim);
//long t1 = System.nanoTime();
//System.out.println("BKD: sort took " + ((t1-t0)/1000000.0) + " msec");
sorted.close();
return sorted;
@ -366,6 +704,12 @@ public class BKDWriter implements Closeable {
}
}
private void checkMaxLeafNodeCount(int numLeaves) {
if ((1+bytesPerDim) * (long) numLeaves > ArrayUtil.MAX_ARRAY_LENGTH) {
throw new IllegalStateException("too many nodes; increase maxPointsInLeafNode (currently " + maxPointsInLeafNode + ") and reindex");
}
}
/** Writes the BKD tree to the provided {@link IndexOutput} and returns the file offset where index was written. */
public long finish(IndexOutput out) throws IOException {
//System.out.println("\nBKDTreeWriter.finish pointCount=" + pointCount + " out=" + out + " heapWriter=" + heapWriter);
@ -381,7 +725,12 @@ public class BKDWriter implements Closeable {
offlinePointWriter.close();
}
LongBitSet ordBitSet = new LongBitSet(pointCount);
LongBitSet ordBitSet;
if (numDims > 1) {
ordBitSet = new LongBitSet(pointCount);
} else {
ordBitSet = null;
}
long countPerLeaf = pointCount;
long innerNodeCount = 1;
@ -391,16 +740,9 @@ public class BKDWriter implements Closeable {
innerNodeCount *= 2;
}
//System.out.println("innerNodeCount=" + innerNodeCount);
int numLeaves = (int) innerNodeCount;
if (1+2*innerNodeCount >= Integer.MAX_VALUE) {
throw new IllegalStateException("too many nodes; increase maxPointsInLeafNode (currently " + maxPointsInLeafNode + ") and reindex");
}
innerNodeCount--;
int numLeaves = (int) (innerNodeCount+1);
//System.out.println("LEAVES: " + numLeaves);
checkMaxLeafNodeCount(numLeaves);
// NOTE: we could save the 1+ here, to use a bit less heap at search time, but then we'd need a somewhat costly check at each
// step of the recursion to recompute the split dim:
@ -548,7 +890,7 @@ public class BKDWriter implements Closeable {
private byte[] markRightTree(long rightCount, int splitDim, PathSlice source, LongBitSet ordBitSet) throws IOException {
// Now we mark ords that fall into the right half, so we can partition on all other dims that are not the split dim:
assert ordBitSet.cardinality() == 0: "cardinality=" + ordBitSet.cardinality();
assert numDims == 1 || ordBitSet.cardinality() == 0: "cardinality=" + ordBitSet.cardinality();
// Read the split value, then mark all ords in the right tree (larger than the split value):
try (PointReader reader = source.writer.getReader(source.start + source.count - rightCount)) {
@ -556,6 +898,7 @@ public class BKDWriter implements Closeable {
assert result;
System.arraycopy(reader.packedValue(), splitDim*bytesPerDim, scratch1, 0, bytesPerDim);
if (numDims > 1) {
ordBitSet.set(reader.ord());
@ -568,6 +911,7 @@ public class BKDWriter implements Closeable {
assert rightCount == ordBitSet.cardinality(): "rightCount=" + rightCount + " cardinality=" + ordBitSet.cardinality();
}
}
return scratch1;
}
@ -643,7 +987,8 @@ public class BKDWriter implements Closeable {
PathSlice source = slices[0];
if (source.writer instanceof HeapPointWriter == false) {
// Adversarial cases can cause this, e.g. very lopsided data, all equal points
// Adversarial cases can cause this, e.g. very lopsided data, all equal points, such that we started
// offline, but then kept splitting only in one dimension, and so never had to rewrite into heap writer
source = switchToHeap(source);
}
@ -652,17 +997,19 @@ public class BKDWriter implements Closeable {
// Save the block file pointer:
leafBlockFPs[nodeID - leafNodeOffset] = out.getFilePointer();
//System.out.println(" write leaf block @ fp=" + out.getFilePointer());
// Write docIDs first, as their own chunk, so that at intersect time we can add all docIDs w/o
// loading the values:
writeLeafBlockDocs(out, heapSource.docIDs, Math.toIntExact(source.start), Math.toIntExact(source.count));
int count = Math.toIntExact(source.count);
writeLeafBlockDocs(out, heapSource.docIDs, Math.toIntExact(source.start), count);
// TODO: we should delta compress / only write suffix bytes, like terms dict (the values will all be "close together" since we are at
// a leaf cell):
// First pass: find the per-dim common prefix for all values in this block:
Arrays.fill(commonPrefixLengths, bytesPerDim);
for (int i=0;i<source.count;i++) {
for (int i=0;i<count;i++) {
if (i == 0) {
heapSource.readPackedValue(Math.toIntExact(source.start + i), scratch1);
} else {
@ -682,9 +1029,11 @@ public class BKDWriter implements Closeable {
writeCommonPrefixes(out, commonPrefixLengths, scratch1);
// Second pass: write the full values:
byte[] lastPackedValue = new byte[bytesPerDim];
for (int i=0;i<source.count;i++) {
// TODO: we could do bulk copying here, avoiding the intermediate copy:
heapSource.readPackedValue(Math.toIntExact(source.start + i), scratchPackedValue);
assert numDims != 1 || valueInOrder(i, lastPackedValue, scratchPackedValue);
// Make sure this value does in fact fall within this leaf cell:
assert valueInBounds(scratchPackedValue, minPackedValue, maxPackedValue);
@ -694,7 +1043,12 @@ public class BKDWriter implements Closeable {
} else {
// Inner node: partition/recurse
int splitDim = split(minPackedValue, maxPackedValue);
int splitDim;
if (numDims > 1) {
splitDim = split(minPackedValue, maxPackedValue);
} else {
splitDim = 0;
}
PathSlice source = slices[splitDim];
@ -758,7 +1112,9 @@ public class BKDWriter implements Closeable {
}
}
if (numDims > 1) {
ordBitSet.clear(0, pointCount);
}
// Recurse on left tree:
build(2*nodeID, leafNodeOffset, leftSlices,
@ -787,6 +1143,15 @@ public class BKDWriter implements Closeable {
}
}
// only called from assert
private boolean valueInOrder(long ord, byte[] lastPackedValue, byte[] packedValue) {
if (ord > 0 && BKDUtil.compare(bytesPerDim, lastPackedValue, 0, packedValue, 0) > 0) {
throw new AssertionError("values out of order: last value=" + new BytesRef(lastPackedValue) + " current value=" + new BytesRef(packedValue) + " ord=" + ord);
}
System.arraycopy(packedValue, 0, lastPackedValue, 0, bytesPerDim);
return true;
}
PointWriter getPointWriter(long count) throws IOException {
if (count <= maxPointsSortInHeap) {
int size = Math.toIntExact(count);

View File

@ -114,7 +114,7 @@ public class TestBKD extends LuceneTestCase {
try (Directory dir = getDirectory(numDocs)) {
int numDims = TestUtil.nextInt(random(), 1, 5);
int maxPointsInLeafNode = TestUtil.nextInt(random(), 50, 100);
float maxMB = (float) 0.1 + (3*random().nextFloat());
float maxMB = (float) 3.0 + (3*random().nextFloat());
BKDWriter w = new BKDWriter(dir, "tmp", numDims, 4, maxPointsInLeafNode, maxMB);
if (VERBOSE) {
@ -238,7 +238,7 @@ public class TestBKD extends LuceneTestCase {
int numBytesPerDim = TestUtil.nextInt(random(), 2, 30);
int numDims = TestUtil.nextInt(random(), 1, 5);
int maxPointsInLeafNode = TestUtil.nextInt(random(), 50, 100);
float maxMB = (float) 0.1 + (3*random().nextFloat());
float maxMB = (float) 3.0 + (3*random().nextFloat());
BKDWriter w = new BKDWriter(dir, "tmp", numDims, numBytesPerDim, maxPointsInLeafNode, maxMB);
BigInteger[][] docs = new BigInteger[numDocs][];
@ -425,6 +425,7 @@ public class TestBKD extends LuceneTestCase {
private void doTestRandomBinary(int count) throws Exception {
int numDocs = TestUtil.nextInt(random(), count, count*2);
int numBytesPerDim = TestUtil.nextInt(random(), 2, 30);
int numDims = TestUtil.nextInt(random(), 1, 5);
byte[][][] docValues = new byte[numDocs][][];
@ -601,17 +602,9 @@ public class TestBKD extends LuceneTestCase {
/** docIDs can be null, for the single valued case, else it maps value to docID */
private void verify(byte[][][] docValues, int[] docIDs, int numDims, int numBytesPerDim) throws Exception {
try (Directory dir = getDirectory(docValues.length)) {
while (true) {
int maxPointsInLeafNode = TestUtil.nextInt(random(), 50, 100);
double maxMB = (float) 0.1 + (3*random().nextDouble());
try {
int maxPointsInLeafNode = TestUtil.nextInt(random(), 50, 1000);
double maxMB = (float) 3.0 + (3*random().nextDouble());
verify(dir, docValues, docIDs, numDims, numBytesPerDim, maxPointsInLeafNode, maxMB);
return;
} catch (IllegalArgumentException iae) {
// This just means we got a too-small maxMB for the maxPointsInLeafNode; just retry
assertTrue(iae.getMessage().contains("either increase maxMBSortInHeap or decrease maxPointsInLeafNode"));
}
}
}
}
@ -620,10 +613,32 @@ public class TestBKD extends LuceneTestCase {
if (VERBOSE) {
System.out.println("TEST: numValues=" + numValues + " numDims=" + numDims + " numBytesPerDim=" + numBytesPerDim + " maxPointsInLeafNode=" + maxPointsInLeafNode + " maxMB=" + maxMB);
}
long indexFP;
try (BKDWriter w = new BKDWriter(dir, "tmp", numDims, numBytesPerDim, maxPointsInLeafNode, maxMB)) {
List<Long> toMerge = null;
List<Integer> docIDBases = null;
int seg = 0;
BKDWriter w = new BKDWriter(dir, "_" + seg, numDims, numBytesPerDim, maxPointsInLeafNode, maxMB);
IndexOutput out = dir.createOutput("bkd", IOContext.DEFAULT);
IndexInput in = null;
boolean success = false;
try {
byte[] scratch = new byte[numBytesPerDim*numDims];
int lastDocIDBase = 0;
boolean useMerge = numDims == 1 && numValues >= 10 && random().nextBoolean();
int valuesInThisSeg;
if (useMerge) {
// Sometimes we will call merge with a single segment:
valuesInThisSeg = TestUtil.nextInt(random(), numValues/10, numValues);
} else {
valuesInThisSeg = 0;
}
int segCount = 0;
for(int ord=0;ord<numValues;ord++) {
int docID;
if (docIDs == null) {
@ -632,7 +647,7 @@ public class TestBKD extends LuceneTestCase {
docID = docIDs[ord];
}
if (VERBOSE) {
System.out.println(" ord=" + ord + " docID=" + docID);
System.out.println(" ord=" + ord + " docID=" + docID + " lastDocIDBase=" + lastDocIDBase);
}
for(int dim=0;dim<numDims;dim++) {
if (VERBOSE) {
@ -640,21 +655,56 @@ public class TestBKD extends LuceneTestCase {
}
System.arraycopy(docValues[ord][dim], 0, scratch, dim*numBytesPerDim, numBytesPerDim);
}
w.add(scratch, docID);
w.add(scratch, docID-lastDocIDBase);
segCount++;
if (useMerge && segCount == valuesInThisSeg) {
if (toMerge == null) {
toMerge = new ArrayList<>();
docIDBases = new ArrayList<>();
}
docIDBases.add(lastDocIDBase);
toMerge.add(w.finish(out));
valuesInThisSeg = TestUtil.nextInt(random(), numValues/10, numValues/2);
segCount = 0;
seg++;
maxPointsInLeafNode = TestUtil.nextInt(random(), 50, 1000);
maxMB = (float) 3.0 + (3*random().nextDouble());
w = new BKDWriter(dir, "_" + seg, numDims, numBytesPerDim, maxPointsInLeafNode, maxMB);
lastDocIDBase = docID;
}
}
boolean success = false;
try (IndexOutput out = dir.createOutput("bkd", IOContext.DEFAULT)) {
long indexFP;
if (toMerge != null) {
System.out.println("merge " + toMerge.size());
if (segCount > 0) {
docIDBases.add(lastDocIDBase);
toMerge.add(w.finish(out));
}
out.close();
in = dir.openInput("bkd", IOContext.DEFAULT);
seg++;
w = new BKDWriter(dir, "_" + seg, numDims, numBytesPerDim, maxPointsInLeafNode, maxMB);
List<BKDReader> readers = new ArrayList<>();
for(long fp : toMerge) {
in.seek(fp);
readers.add(new BKDReader(in));
}
out = dir.createOutput("bkd2", IOContext.DEFAULT);
indexFP = w.merge(out, null, readers, docIDBases);
out.close();
in.close();
in = dir.openInput("bkd2", IOContext.DEFAULT);
} else {
indexFP = w.finish(out);
success = true;
} finally {
if (success == false) {
IOUtils.deleteFilesIgnoringExceptions(dir, "bkd");
}
}
out.close();
in = dir.openInput("bkd", IOContext.DEFAULT);
}
try (IndexInput in = dir.openInput("bkd", IOContext.DEFAULT)) {
in.seek(indexFP);
BKDReader r = new BKDReader(in);
@ -751,8 +801,17 @@ public class TestBKD extends LuceneTestCase {
assertEquals("docID=" + docID, expected.get(docID), hits.get(docID));
}
}
} finally {
in.close();
dir.deleteFile("bkd");
if (toMerge != null) {
dir.deleteFile("bkd2");
}
success = true;
} finally {
if (success == false) {
IOUtils.closeWhileHandlingException(w, in, out);
IOUtils.deleteFilesIgnoringExceptions(dir, "bkd", "bkd2");
}
}
}