optimize offline -> offline partition

This commit is contained in:
Mike McCandless 2016-03-13 08:55:31 -04:00
parent 42d1eb5364
commit 8a9b033349
5 changed files with 111 additions and 28 deletions

View File

@ -1176,24 +1176,7 @@ public class BKDWriter implements Closeable {
PointWriter rightPointWriter = getPointWriter(source.count - leftCount, "right" + dim); PointWriter rightPointWriter = getPointWriter(source.count - leftCount, "right" + dim);
PointReader reader = slices[dim].writer.getReader(slices[dim].start);) { PointReader reader = slices[dim].writer.getReader(slices[dim].start);) {
// Partition this source according to how the splitDim split the values: long nextRightCount = reader.split(source.count, ordBitSet, leftPointWriter, rightPointWriter, dim == dimToClear);
long nextRightCount = 0;
for (long i=0;i<source.count;i++) {
boolean result = reader.next();
assert result;
byte[] packedValue = reader.packedValue();
long ord = reader.ord();
int docID = reader.docID();
if (ordBitSet.get(ord)) {
rightPointWriter.append(packedValue, ord, docID);
nextRightCount++;
if (dim == dimToClear) {
ordBitSet.clear(ord);
}
} else {
leftPointWriter.append(packedValue, ord, docID);
}
}
leftSlices[dim] = new PathSlice(leftPointWriter, 0, leftCount); leftSlices[dim] = new PathSlice(leftPointWriter, 0, leftCount);
rightSlices[dim] = new PathSlice(rightPointWriter, 0, rightCount); rightSlices[dim] = new PathSlice(rightPointWriter, 0, rightCount);

View File

@ -18,9 +18,7 @@ package org.apache.lucene.util.bkd;
import java.util.List; import java.util.List;
import org.apache.lucene.util.PagedBytes; final class HeapPointReader extends PointReader {
final class HeapPointReader implements PointReader {
private int curRead; private int curRead;
final List<byte[]> blocks; final List<byte[]> blocks;
final int valuesPerBlock; final int valuesPerBlock;

View File

@ -22,9 +22,11 @@ import java.io.IOException;
import org.apache.lucene.store.Directory; import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.util.LongBitSet;
/** Reads points from disk in a fixed-with format, previously written with {@link OfflinePointWriter}. */ /** Reads points from disk in a fixed-with format, previously written with {@link OfflinePointWriter}. */
final class OfflinePointReader implements PointReader { final class OfflinePointReader extends PointReader {
long countLeft; long countLeft;
private final IndexInput in; private final IndexInput in;
private final byte[] packedValue; private final byte[] packedValue;
@ -90,5 +92,78 @@ final class OfflinePointReader implements PointReader {
public void close() throws IOException { public void close() throws IOException {
in.close(); in.close();
} }
@Override
public long split(long count, LongBitSet rightTree, PointWriter left, PointWriter right, boolean doClearBits) throws IOException {
if (left instanceof OfflinePointWriter == false ||
right instanceof OfflinePointWriter == false) {
return super.split(count, rightTree, left, right, doClearBits);
}
// We specialize the offline -> offline split since the default impl
// is somewhat wasteful otherwise (e.g. decoding docID when we don't
// need to)
int packedBytesLength = packedValue.length;
int bytesPerDoc = packedBytesLength + Integer.BYTES;
if (longOrds) {
bytesPerDoc += Long.BYTES;
} else {
bytesPerDoc += Integer.BYTES;
}
long rightCount = 0;
IndexOutput rightOut = ((OfflinePointWriter) right).out;
IndexOutput leftOut = ((OfflinePointWriter) left).out;
((OfflinePointWriter) right).count = count;
((OfflinePointWriter) left).count = count;
assert count <= countLeft: "count=" + count + " countLeft=" + countLeft;
countLeft -= count;
byte[] buffer = new byte[bytesPerDoc];
while (count > 0) {
in.readBytes(buffer, 0, buffer.length);
long ord;
if (longOrds) {
ord = readLong(buffer, packedBytesLength);
} else {
ord = readInt(buffer, packedBytesLength);
}
if (rightTree.get(ord)) {
rightOut.writeBytes(buffer, 0, bytesPerDoc);
if (doClearBits) {
rightTree.clear(ord);
}
rightCount++;
} else {
leftOut.writeBytes(buffer, 0, bytesPerDoc);
}
count--;
}
return rightCount;
}
// Poached from ByteArrayDataInput:
private static long readLong(byte[] bytes, int pos) {
final int i1 = ((bytes[pos++] & 0xff) << 24) | ((bytes[pos++] & 0xff) << 16) |
((bytes[pos++] & 0xff) << 8) | (bytes[pos++] & 0xff);
final int i2 = ((bytes[pos++] & 0xff) << 24) | ((bytes[pos++] & 0xff) << 16) |
((bytes[pos++] & 0xff) << 8) | (bytes[pos++] & 0xff);
return (((long)i1) << 32) | (i2 & 0xFFFFFFFFL);
}
// Poached from ByteArrayDataInput:
private static int readInt(byte[] bytes, int pos) {
return ((bytes[pos++] & 0xFF) << 24) | ((bytes[pos++] & 0xFF) << 16)
| ((bytes[pos++] & 0xFF) << 8) | (bytes[pos++] & 0xFF);
}
} }

View File

@ -28,7 +28,7 @@ final class OfflinePointWriter implements PointWriter {
final Directory tempDir; final Directory tempDir;
final IndexOutput out; final IndexOutput out;
final int packedBytesLength; final int packedBytesLength;
private long count; long count;
private boolean closed; private boolean closed;
// true if ords are written as long (8 bytes), else 4 bytes // true if ords are written as long (8 bytes), else 4 bytes
private boolean longOrds; private boolean longOrds;

View File

@ -20,21 +20,48 @@ package org.apache.lucene.util.bkd;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import org.apache.lucene.util.LongBitSet;
/** One pass iterator through all points previously written with a /** One pass iterator through all points previously written with a
* {@link PointWriter}, abstracting away whether points a read * {@link PointWriter}, abstracting away whether points a read
* from (offline) disk or simple arrays in heap. */ * from (offline) disk or simple arrays in heap. */
interface PointReader extends Closeable { abstract class PointReader implements Closeable {
/** Returns false once iteration is done, else true. */ /** Returns false once iteration is done, else true. */
boolean next() throws IOException; abstract boolean next() throws IOException;
/** Returns the packed byte[] value */ /** Returns the packed byte[] value */
byte[] packedValue(); abstract byte[] packedValue();
/** Point ordinal */ /** Point ordinal */
long ord(); abstract long ord();
/** DocID for this point */ /** DocID for this point */
int docID(); abstract int docID();
/** Splits this reader into left and right partitions */
public long split(long count, LongBitSet rightTree, PointWriter left, PointWriter right, boolean doClearBits) throws IOException {
// Partition this source according to how the splitDim split the values:
long rightCount = 0;
for (long i=0;i<count;i++) {
boolean result = next();
assert result;
byte[] packedValue = packedValue();
long ord = ord();
int docID = docID();
if (rightTree.get(ord)) {
right.append(packedValue, ord, docID);
rightCount++;
if (doClearBits) {
rightTree.clear(ord);
}
} else {
left.append(packedValue, ord, docID);
}
}
return rightCount;
}
} }