LUCENE-7583: buffer small leaf-block writes in BKDWriter

This commit is contained in:
Mike McCandless 2016-12-07 18:59:23 -05:00
parent 0c8e8e396a
commit 28a22c0203
7 changed files with 101 additions and 68 deletions

View File

@ -87,6 +87,10 @@ Optimizations
a compressed format, using substantially less RAM in some cases
(Adrien Grand, Mike McCandless)
* LUCENE-7583: BKD writing now buffers each leaf block in heap before
writing to disk, giving a small speedup in points-heavy use cases.
(Mike McCandless)
Other
* LUCENE-7546: Fixed references to benchmark wikipedia data and the Jenkins line-docs file

View File

@ -33,6 +33,7 @@ import org.apache.lucene.index.MergeState;
import org.apache.lucene.index.SegmentInfo;
import org.apache.lucene.store.DataOutput;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.GrowableByteArrayDataOutput;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
@ -157,7 +158,7 @@ public final class CompressingStoredFieldsWriter extends StoredFieldsWriter {
}
this.numStoredFields[numBufferedDocs] = numStoredFieldsInDoc;
numStoredFieldsInDoc = 0;
endOffsets[numBufferedDocs] = bufferedDocs.length;
endOffsets[numBufferedDocs] = bufferedDocs.getPosition();
++numBufferedDocs;
if (triggerFlush()) {
flush();
@ -210,7 +211,7 @@ public final class CompressingStoredFieldsWriter extends StoredFieldsWriter {
}
private boolean triggerFlush() {
return bufferedDocs.length >= chunkSize || // chunks of at least chunkSize bytes
return bufferedDocs.getPosition() >= chunkSize || // chunks of at least chunkSize bytes
numBufferedDocs >= maxDocsPerChunk;
}
@ -223,23 +224,23 @@ public final class CompressingStoredFieldsWriter extends StoredFieldsWriter {
lengths[i] = endOffsets[i] - endOffsets[i - 1];
assert lengths[i] >= 0;
}
final boolean sliced = bufferedDocs.length >= 2 * chunkSize;
final boolean sliced = bufferedDocs.getPosition() >= 2 * chunkSize;
writeHeader(docBase, numBufferedDocs, numStoredFields, lengths, sliced);
// compress stored fields to fieldsStream
if (sliced) {
// big chunk, slice it
for (int compressed = 0; compressed < bufferedDocs.length; compressed += chunkSize) {
compressor.compress(bufferedDocs.bytes, compressed, Math.min(chunkSize, bufferedDocs.length - compressed), fieldsStream);
for (int compressed = 0; compressed < bufferedDocs.getPosition(); compressed += chunkSize) {
compressor.compress(bufferedDocs.getBytes(), compressed, Math.min(chunkSize, bufferedDocs.getPosition() - compressed), fieldsStream);
}
} else {
compressor.compress(bufferedDocs.bytes, 0, bufferedDocs.length, fieldsStream);
compressor.compress(bufferedDocs.getBytes(), 0, bufferedDocs.getPosition(), fieldsStream);
}
// reset
docBase += numBufferedDocs;
numBufferedDocs = 0;
bufferedDocs.length = 0;
bufferedDocs.reset();
numChunks++;
}
@ -459,7 +460,7 @@ public final class CompressingStoredFieldsWriter extends StoredFieldsWriter {
flush();
numDirtyChunks++; // incomplete: we had to force this flush
} else {
assert bufferedDocs.length == 0;
assert bufferedDocs.getPosition() == 0;
}
if (docBase != numDocs) {
throw new RuntimeException("Wrote " + docBase + " docs, finish called with numDocs=" + numDocs);
@ -468,7 +469,7 @@ public final class CompressingStoredFieldsWriter extends StoredFieldsWriter {
fieldsStream.writeVLong(numChunks);
fieldsStream.writeVLong(numDirtyChunks);
CodecUtil.writeFooter(fieldsStream);
assert bufferedDocs.length == 0;
assert bufferedDocs.getPosition() == 0;
}
// bulk merge is scary: its caused corruption bugs in the past.

View File

@ -37,6 +37,7 @@ import org.apache.lucene.index.MergeState;
import org.apache.lucene.index.SegmentInfo;
import org.apache.lucene.store.DataInput;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.GrowableByteArrayDataOutput;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
@ -269,8 +270,8 @@ public final class CompressingTermVectorsWriter extends TermVectorsWriter {
@Override
public void finishDocument() throws IOException {
// append the payload bytes of the doc after its terms
termSuffixes.writeBytes(payloadBytes.bytes, payloadBytes.length);
payloadBytes.length = 0;
termSuffixes.writeBytes(payloadBytes.getBytes(), payloadBytes.getPosition());
payloadBytes.reset();
++numDocs;
if (triggerFlush()) {
flush();
@ -316,7 +317,7 @@ public final class CompressingTermVectorsWriter extends TermVectorsWriter {
}
private boolean triggerFlush() {
return termSuffixes.length >= chunkSize
return termSuffixes.getPosition() >= chunkSize
|| pendingDocs.size() >= MAX_DOCUMENTS_PER_CHUNK;
}
@ -355,14 +356,14 @@ public final class CompressingTermVectorsWriter extends TermVectorsWriter {
flushPayloadLengths();
// compress terms and payloads and write them to the output
compressor.compress(termSuffixes.bytes, 0, termSuffixes.length, vectorsStream);
compressor.compress(termSuffixes.getBytes(), 0, termSuffixes.getPosition(), vectorsStream);
}
// reset
pendingDocs.clear();
curDoc = null;
curField = null;
termSuffixes.length = 0;
termSuffixes.reset();
numChunks++;
}

View File

@ -14,8 +14,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.codecs.compressing;
package org.apache.lucene.store;
import java.io.IOException;
@ -25,6 +25,7 @@ import org.apache.lucene.util.UnicodeUtil;
/**
* A {@link DataOutput} that can be used to build a byte[].
*
* @lucene.internal
*/
public final class GrowableByteArrayDataOutput extends DataOutput {
@ -33,12 +34,13 @@ public final class GrowableByteArrayDataOutput extends DataOutput {
static final int MIN_UTF8_SIZE_TO_ENABLE_DOUBLE_PASS_ENCODING = 65536;
/** The bytes */
public byte[] bytes;
private byte[] bytes;
/** The length */
public int length;
private int length;
// scratch for utf8 encoding of small strings
byte[] scratchBytes = new byte[16];
private byte[] scratchBytes;
/** Create a {@link GrowableByteArrayDataOutput} with the given initial capacity. */
public GrowableByteArrayDataOutput(int cp) {
@ -57,7 +59,9 @@ public final class GrowableByteArrayDataOutput extends DataOutput {
@Override
public void writeBytes(byte[] b, int off, int len) {
final int newLength = length + len;
if (newLength > bytes.length) {
bytes = ArrayUtil.grow(bytes, newLength);
}
System.arraycopy(b, off, bytes, length, len);
length = newLength;
}
@ -68,7 +72,11 @@ public final class GrowableByteArrayDataOutput extends DataOutput {
if (maxLen <= MIN_UTF8_SIZE_TO_ENABLE_DOUBLE_PASS_ENCODING) {
// string is small enough that we don't need to save memory by falling back to double-pass approach
// this is just an optimized writeString() that re-uses scratchBytes.
if (scratchBytes == null) {
scratchBytes = new byte[ArrayUtil.oversize(maxLen, Character.BYTES)];
} else {
scratchBytes = ArrayUtil.grow(scratchBytes, maxLen);
}
int len = UnicodeUtil.UTF16toUTF8(string, 0, string.length(), scratchBytes);
writeVInt(len);
writeBytes(scratchBytes, len);
@ -80,4 +88,16 @@ public final class GrowableByteArrayDataOutput extends DataOutput {
length = UnicodeUtil.UTF16toUTF8(string, 0, string.length(), bytes, length);
}
}
public byte[] getBytes() {
return bytes;
}
public int getPosition() {
return length;
}
public void reset() {
length = 0;
}
}

View File

@ -30,7 +30,9 @@ import org.apache.lucene.index.MergeState;
import org.apache.lucene.index.PointValues.IntersectVisitor;
import org.apache.lucene.index.PointValues.Relation;
import org.apache.lucene.store.ChecksumIndexInput;
import org.apache.lucene.store.DataOutput;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.GrowableByteArrayDataOutput;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.RAMOutputStream;
@ -556,6 +558,9 @@ public class BKDWriter implements Closeable {
return oneDimWriter.finish();
}
// reused when writing leaf blocks
private final GrowableByteArrayDataOutput scratchOut = new GrowableByteArrayDataOutput(32*1024);
private class OneDimensionBKDWriter {
final IndexOutput out;
@ -563,8 +568,8 @@ public class BKDWriter implements Closeable {
final List<byte[]> leafBlockStartValues = new ArrayList<>();
final byte[] leafValues = new byte[maxPointsInLeafNode * packedBytesLength];
final int[] leafDocs = new int[maxPointsInLeafNode];
long valueCount;
int leafCount;
private long valueCount;
private int leafCount;
OneDimensionBKDWriter(IndexOutput out) {
if (numDims != 1) {
@ -589,7 +594,7 @@ public class BKDWriter implements Closeable {
// for asserts
final byte[] lastPackedValue;
int lastDocID;
private int lastDocID;
void add(byte[] packedValue, int docID) throws IOException {
assert valueInOrder(valueCount + leafCount,
@ -606,8 +611,7 @@ public class BKDWriter implements Closeable {
if (leafCount == maxPointsInLeafNode) {
// We write a block once we hit exactly the max count ... this is different from
// when we flush a new segment, where we write between max/2 and max per leaf block,
// so merged segments will behave differently from newly flushed segments:
// when we write N > 1 dimensional points where we write between max/2 and max per leaf block
writeLeafBlock();
leafCount = 0;
}
@ -644,7 +648,6 @@ public class BKDWriter implements Closeable {
}
private void writeLeafBlock() throws IOException {
//System.out.println("writeLeafBlock pos=" + out.getFilePointer());
assert leafCount != 0;
if (valueCount == 0) {
System.arraycopy(leafValues, 0, minPackedValue, 0, packedBytesLength);
@ -660,42 +663,39 @@ public class BKDWriter implements Closeable {
leafBlockFPs.add(out.getFilePointer());
checkMaxLeafNodeCount(leafBlockFPs.size());
Arrays.fill(commonPrefixLengths, bytesPerDim);
// Find per-dim common prefix:
for(int dim=0;dim<numDims;dim++) {
int offset1 = dim * bytesPerDim;
int offset2 = (leafCount - 1) * packedBytesLength + offset1;
for(int j=0;j<commonPrefixLengths[dim];j++) {
if (leafValues[offset1+j] != leafValues[offset2+j]) {
commonPrefixLengths[dim] = j;
int prefix = bytesPerDim;
int offset = (leafCount - 1) * packedBytesLength;
for(int j=0;j<bytesPerDim;j++) {
if (leafValues[j] != leafValues[offset+j]) {
prefix = j;
break;
}
}
}
writeLeafBlockDocs(out, leafDocs, 0, leafCount);
writeCommonPrefixes(out, commonPrefixLengths, leafValues);
commonPrefixLengths[0] = prefix;
assert scratchOut.getPosition() == 0;
writeLeafBlockDocs(scratchOut, leafDocs, 0, leafCount);
writeCommonPrefixes(scratchOut, commonPrefixLengths, leafValues);
scratchBytesRef1.length = packedBytesLength;
scratchBytesRef1.bytes = leafValues;
final IntFunction<BytesRef> packedValues = new IntFunction<BytesRef>() {
final BytesRef scratch = new BytesRef();
{
scratch.length = packedBytesLength;
scratch.bytes = leafValues;
}
@Override
public BytesRef apply(int i) {
scratch.offset = packedBytesLength * i;
return scratch;
scratchBytesRef1.offset = packedBytesLength * i;
return scratchBytesRef1;
}
};
assert valuesInOrderAndBounds(leafCount, 0, Arrays.copyOf(leafValues, packedBytesLength),
Arrays.copyOfRange(leafValues, (leafCount - 1) * packedBytesLength, leafCount * packedBytesLength),
packedValues, leafDocs, 0);
writeLeafBlockPackedValues(out, commonPrefixLengths, leafCount, 0, packedValues);
writeLeafBlockPackedValues(scratchOut, commonPrefixLengths, leafCount, 0, packedValues);
out.writeBytes(scratchOut.getBytes(), 0, scratchOut.getPosition());
scratchOut.reset();
}
}
// TODO: there must be a simpler way?
@ -1259,13 +1259,13 @@ public class BKDWriter implements Closeable {
out.writeBytes(packedIndex, 0, packedIndex.length);
}
private void writeLeafBlockDocs(IndexOutput out, int[] docIDs, int start, int count) throws IOException {
private void writeLeafBlockDocs(DataOutput out, int[] docIDs, int start, int count) throws IOException {
assert count > 0: "maxPointsInLeafNode=" + maxPointsInLeafNode;
out.writeVInt(count);
DocIdsWriter.writeDocIds(docIDs, start, count, out);
}
private void writeLeafBlockPackedValues(IndexOutput out, int[] commonPrefixLengths, int count, int sortedDim, IntFunction<BytesRef> packedValues) throws IOException {
private void writeLeafBlockPackedValues(DataOutput out, int[] commonPrefixLengths, int count, int sortedDim, IntFunction<BytesRef> packedValues) throws IOException {
int prefixLenSum = Arrays.stream(commonPrefixLengths).sum();
if (prefixLenSum == packedBytesLength) {
// all values in this block are equal
@ -1290,7 +1290,7 @@ public class BKDWriter implements Closeable {
}
}
private void writeLeafBlockPackedValuesRange(IndexOutput out, int[] commonPrefixLengths, int start, int end, IntFunction<BytesRef> packedValues) throws IOException {
private void writeLeafBlockPackedValuesRange(DataOutput out, int[] commonPrefixLengths, int start, int end, IntFunction<BytesRef> packedValues) throws IOException {
for (int i = start; i < end; ++i) {
BytesRef ref = packedValues.apply(i);
assert ref.length == packedBytesLength;
@ -1316,7 +1316,7 @@ public class BKDWriter implements Closeable {
return end - start;
}
private void writeCommonPrefixes(IndexOutput out, int[] commonPrefixes, byte[] packedValue) throws IOException {
private void writeCommonPrefixes(DataOutput out, int[] commonPrefixes, byte[] packedValue) throws IOException {
for(int dim=0;dim<numDims;dim++) {
out.writeVInt(commonPrefixes[dim]);
//System.out.println(commonPrefixes[dim] + " of " + bytesPerDim);
@ -1449,7 +1449,8 @@ public class BKDWriter implements Closeable {
}
}
/* Recursively reorders the provided reader and writes the bkd-tree on the fly. */
/* Recursively reorders the provided reader and writes the bkd-tree on the fly; this method is used
* when we are writing a new segment directly from IndexWriter's indexing buffer (MutablePointsReader). */
private void build(int nodeID, int leafNodeOffset,
MutablePointsReader reader, int from, int to,
IndexOutput out,
@ -1513,18 +1514,20 @@ public class BKDWriter implements Closeable {
// Save the block file pointer:
leafBlockFPs[nodeID - leafNodeOffset] = out.getFilePointer();
assert scratchOut.getPosition() == 0;
// Write doc IDs
int[] docIDs = spareDocIds;
for (int i = from; i < to; ++i) {
docIDs[i - from] = reader.getDocID(i);
}
//System.out.println("writeLeafBlock pos=" + out.getFilePointer());
writeLeafBlockDocs(out, docIDs, 0, count);
writeLeafBlockDocs(scratchOut, docIDs, 0, count);
// Write the common prefixes:
reader.getValue(from, scratchBytesRef1);
System.arraycopy(scratchBytesRef1.bytes, scratchBytesRef1.offset, scratch1, 0, packedBytesLength);
writeCommonPrefixes(out, commonPrefixLengths, scratch1);
writeCommonPrefixes(scratchOut, commonPrefixLengths, scratch1);
// Write the full values:
IntFunction<BytesRef> packedValues = new IntFunction<BytesRef>() {
@ -1536,7 +1539,10 @@ public class BKDWriter implements Closeable {
};
assert valuesInOrderAndBounds(count, sortedDim, minPackedValue, maxPackedValue, packedValues,
docIDs, 0);
writeLeafBlockPackedValues(out, commonPrefixLengths, count, sortedDim, packedValues);
writeLeafBlockPackedValues(scratchOut, commonPrefixLengths, count, sortedDim, packedValues);
out.writeBytes(scratchOut.getBytes(), 0, scratchOut.getPosition());
scratchOut.reset();
} else {
// inner node
@ -1577,7 +1583,8 @@ public class BKDWriter implements Closeable {
}
}
/** The array (sized numDims) of PathSlice describe the cell we have currently recursed to. */
/** The array (sized numDims) of PathSlice describe the cell we have currently recursed to.
/* This method is used when we are merging previously written segments, in the numDims > 1 case. */
private void build(int nodeID, int leafNodeOffset,
PathSlice[] slices,
LongBitSet ordBitSet,

View File

@ -19,14 +19,14 @@ package org.apache.lucene.util.bkd;
import java.io.IOException;
import org.apache.lucene.index.PointValues.IntersectVisitor;
import org.apache.lucene.store.DataOutput;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
class DocIdsWriter {
private DocIdsWriter() {}
static void writeDocIds(int[] docIds, int start, int count, IndexOutput out) throws IOException {
static void writeDocIds(int[] docIds, int start, int count, DataOutput out) throws IOException {
// docs can be sorted either when all docs in a block have the same value
// or when a segment is sorted
boolean sorted = true;

View File

@ -14,8 +14,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.codecs.compressing;
package org.apache.lucene.store;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.TestUtil;
@ -43,13 +43,13 @@ public class TestGrowableByteArrayDataOutput extends LuceneTestCase {
GrowableByteArrayDataOutput dataOutput = new GrowableByteArrayDataOutput(1 << 8);
//explicitly write utf8 len so that we know how many bytes it occupies
dataOutput.writeVInt(len);
int vintLen = dataOutput.length;
int vintLen = dataOutput.getPosition();
// now write the string which will internally write number of bytes as a vint and then utf8 bytes
dataOutput.writeString(unicode);
assertEquals("GrowableByteArrayDataOutput wrote the wrong length after encode", len + vintLen * 2, dataOutput.length);
assertEquals("GrowableByteArrayDataOutput wrote the wrong length after encode", len + vintLen * 2, dataOutput.getPosition());
for (int j = 0, k = vintLen * 2; j < len; j++, k++) {
assertEquals(utf8[j], dataOutput.bytes[k]);
assertEquals(utf8[j], dataOutput.getBytes()[k]);
}
}
}
@ -67,13 +67,13 @@ public class TestGrowableByteArrayDataOutput extends LuceneTestCase {
GrowableByteArrayDataOutput dataOutput = new GrowableByteArrayDataOutput(1 << 8);
//explicitly write utf8 len so that we know how many bytes it occupies
dataOutput.writeVInt(len);
int vintLen = dataOutput.length;
int vintLen = dataOutput.getPosition();
// now write the string which will internally write number of bytes as a vint and then utf8 bytes
dataOutput.writeString(unicode);
assertEquals("GrowableByteArrayDataOutput wrote the wrong length after encode", len + vintLen * 2, dataOutput.length);
assertEquals("GrowableByteArrayDataOutput wrote the wrong length after encode", len + vintLen * 2, dataOutput.getPosition());
for (int j = 0, k = vintLen * 2; j < len; j++, k++) {
assertEquals(utf8[j], dataOutput.bytes[k]);
assertEquals(utf8[j], dataOutput.getBytes()[k]);
}
}
}