From 28a22c0203f8ffb6f71d6a2c0a610eebf9cbfd12 Mon Sep 17 00:00:00 2001 From: Mike McCandless Date: Wed, 7 Dec 2016 18:59:23 -0500 Subject: [PATCH] LUCENE-7583: buffer small leaf-block writes in BKDWriter --- lucene/CHANGES.txt | 4 + .../CompressingStoredFieldsWriter.java | 19 +++-- .../CompressingTermVectorsWriter.java | 11 +-- .../GrowableByteArrayDataOutput.java | 32 +++++-- .../org/apache/lucene/util/bkd/BKDWriter.java | 85 ++++++++++--------- .../apache/lucene/util/bkd/DocIdsWriter.java | 4 +- .../TestGrowableByteArrayDataOutput.java | 14 +-- 7 files changed, 101 insertions(+), 68 deletions(-) rename lucene/core/src/java/org/apache/lucene/{codecs/compressing => util}/GrowableByteArrayDataOutput.java (83%) rename lucene/core/src/test/org/apache/lucene/{codecs/compressing => store}/TestGrowableByteArrayDataOutput.java (89%) diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt index 1bf77357588..e5831683c6e 100644 --- a/lucene/CHANGES.txt +++ b/lucene/CHANGES.txt @@ -87,6 +87,10 @@ Optimizations a compressed format, using substantially less RAM in some cases (Adrien Grand, Mike McCandless) +* LUCENE-7583: BKD writing now buffers each leaf block in heap before + writing to disk, giving a small speedup in points-heavy use cases. + (Mike McCandless) + Other * LUCENE-7546: Fixed references to benchmark wikipedia data and the Jenkins line-docs file diff --git a/lucene/core/src/java/org/apache/lucene/codecs/compressing/CompressingStoredFieldsWriter.java b/lucene/core/src/java/org/apache/lucene/codecs/compressing/CompressingStoredFieldsWriter.java index 1956ab70683..cda855defcb 100644 --- a/lucene/core/src/java/org/apache/lucene/codecs/compressing/CompressingStoredFieldsWriter.java +++ b/lucene/core/src/java/org/apache/lucene/codecs/compressing/CompressingStoredFieldsWriter.java @@ -33,6 +33,7 @@ import org.apache.lucene.index.MergeState; import org.apache.lucene.index.SegmentInfo; import org.apache.lucene.store.DataOutput; import org.apache.lucene.store.Directory; +import org.apache.lucene.store.GrowableByteArrayDataOutput; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexOutput; @@ -157,7 +158,7 @@ public final class CompressingStoredFieldsWriter extends StoredFieldsWriter { } this.numStoredFields[numBufferedDocs] = numStoredFieldsInDoc; numStoredFieldsInDoc = 0; - endOffsets[numBufferedDocs] = bufferedDocs.length; + endOffsets[numBufferedDocs] = bufferedDocs.getPosition(); ++numBufferedDocs; if (triggerFlush()) { flush(); @@ -210,7 +211,7 @@ public final class CompressingStoredFieldsWriter extends StoredFieldsWriter { } private boolean triggerFlush() { - return bufferedDocs.length >= chunkSize || // chunks of at least chunkSize bytes + return bufferedDocs.getPosition() >= chunkSize || // chunks of at least chunkSize bytes numBufferedDocs >= maxDocsPerChunk; } @@ -223,23 +224,23 @@ public final class CompressingStoredFieldsWriter extends StoredFieldsWriter { lengths[i] = endOffsets[i] - endOffsets[i - 1]; assert lengths[i] >= 0; } - final boolean sliced = bufferedDocs.length >= 2 * chunkSize; + final boolean sliced = bufferedDocs.getPosition() >= 2 * chunkSize; writeHeader(docBase, numBufferedDocs, numStoredFields, lengths, sliced); // compress stored fields to fieldsStream if (sliced) { // big chunk, slice it - for (int compressed = 0; compressed < bufferedDocs.length; compressed += chunkSize) { - compressor.compress(bufferedDocs.bytes, compressed, Math.min(chunkSize, bufferedDocs.length - compressed), fieldsStream); + for (int compressed = 0; compressed < bufferedDocs.getPosition(); compressed += chunkSize) { + compressor.compress(bufferedDocs.getBytes(), compressed, Math.min(chunkSize, bufferedDocs.getPosition() - compressed), fieldsStream); } } else { - compressor.compress(bufferedDocs.bytes, 0, bufferedDocs.length, fieldsStream); + compressor.compress(bufferedDocs.getBytes(), 0, bufferedDocs.getPosition(), fieldsStream); } // reset docBase += numBufferedDocs; numBufferedDocs = 0; - bufferedDocs.length = 0; + bufferedDocs.reset(); numChunks++; } @@ -459,7 +460,7 @@ public final class CompressingStoredFieldsWriter extends StoredFieldsWriter { flush(); numDirtyChunks++; // incomplete: we had to force this flush } else { - assert bufferedDocs.length == 0; + assert bufferedDocs.getPosition() == 0; } if (docBase != numDocs) { throw new RuntimeException("Wrote " + docBase + " docs, finish called with numDocs=" + numDocs); @@ -468,7 +469,7 @@ public final class CompressingStoredFieldsWriter extends StoredFieldsWriter { fieldsStream.writeVLong(numChunks); fieldsStream.writeVLong(numDirtyChunks); CodecUtil.writeFooter(fieldsStream); - assert bufferedDocs.length == 0; + assert bufferedDocs.getPosition() == 0; } // bulk merge is scary: its caused corruption bugs in the past. diff --git a/lucene/core/src/java/org/apache/lucene/codecs/compressing/CompressingTermVectorsWriter.java b/lucene/core/src/java/org/apache/lucene/codecs/compressing/CompressingTermVectorsWriter.java index 46a289a97b5..9bd2483389e 100644 --- a/lucene/core/src/java/org/apache/lucene/codecs/compressing/CompressingTermVectorsWriter.java +++ b/lucene/core/src/java/org/apache/lucene/codecs/compressing/CompressingTermVectorsWriter.java @@ -37,6 +37,7 @@ import org.apache.lucene.index.MergeState; import org.apache.lucene.index.SegmentInfo; import org.apache.lucene.store.DataInput; import org.apache.lucene.store.Directory; +import org.apache.lucene.store.GrowableByteArrayDataOutput; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexOutput; @@ -269,8 +270,8 @@ public final class CompressingTermVectorsWriter extends TermVectorsWriter { @Override public void finishDocument() throws IOException { // append the payload bytes of the doc after its terms - termSuffixes.writeBytes(payloadBytes.bytes, payloadBytes.length); - payloadBytes.length = 0; + termSuffixes.writeBytes(payloadBytes.getBytes(), payloadBytes.getPosition()); + payloadBytes.reset(); ++numDocs; if (triggerFlush()) { flush(); @@ -316,7 +317,7 @@ public final class CompressingTermVectorsWriter extends TermVectorsWriter { } private boolean triggerFlush() { - return termSuffixes.length >= chunkSize + return termSuffixes.getPosition() >= chunkSize || pendingDocs.size() >= MAX_DOCUMENTS_PER_CHUNK; } @@ -355,14 +356,14 @@ public final class CompressingTermVectorsWriter extends TermVectorsWriter { flushPayloadLengths(); // compress terms and payloads and write them to the output - compressor.compress(termSuffixes.bytes, 0, termSuffixes.length, vectorsStream); + compressor.compress(termSuffixes.getBytes(), 0, termSuffixes.getPosition(), vectorsStream); } // reset pendingDocs.clear(); curDoc = null; curField = null; - termSuffixes.length = 0; + termSuffixes.reset(); numChunks++; } diff --git a/lucene/core/src/java/org/apache/lucene/codecs/compressing/GrowableByteArrayDataOutput.java b/lucene/core/src/java/org/apache/lucene/util/GrowableByteArrayDataOutput.java similarity index 83% rename from lucene/core/src/java/org/apache/lucene/codecs/compressing/GrowableByteArrayDataOutput.java rename to lucene/core/src/java/org/apache/lucene/util/GrowableByteArrayDataOutput.java index ec551d14d1f..5f00d4a6ab0 100644 --- a/lucene/core/src/java/org/apache/lucene/codecs/compressing/GrowableByteArrayDataOutput.java +++ b/lucene/core/src/java/org/apache/lucene/util/GrowableByteArrayDataOutput.java @@ -14,8 +14,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.lucene.codecs.compressing; +package org.apache.lucene.store; import java.io.IOException; @@ -25,6 +25,7 @@ import org.apache.lucene.util.UnicodeUtil; /** * A {@link DataOutput} that can be used to build a byte[]. + * * @lucene.internal */ public final class GrowableByteArrayDataOutput extends DataOutput { @@ -33,12 +34,13 @@ public final class GrowableByteArrayDataOutput extends DataOutput { static final int MIN_UTF8_SIZE_TO_ENABLE_DOUBLE_PASS_ENCODING = 65536; /** The bytes */ - public byte[] bytes; + private byte[] bytes; + /** The length */ - public int length; + private int length; // scratch for utf8 encoding of small strings - byte[] scratchBytes = new byte[16]; + private byte[] scratchBytes; /** Create a {@link GrowableByteArrayDataOutput} with the given initial capacity. */ public GrowableByteArrayDataOutput(int cp) { @@ -57,7 +59,9 @@ public final class GrowableByteArrayDataOutput extends DataOutput { @Override public void writeBytes(byte[] b, int off, int len) { final int newLength = length + len; - bytes = ArrayUtil.grow(bytes, newLength); + if (newLength > bytes.length) { + bytes = ArrayUtil.grow(bytes, newLength); + } System.arraycopy(b, off, bytes, length, len); length = newLength; } @@ -68,7 +72,11 @@ public final class GrowableByteArrayDataOutput extends DataOutput { if (maxLen <= MIN_UTF8_SIZE_TO_ENABLE_DOUBLE_PASS_ENCODING) { // string is small enough that we don't need to save memory by falling back to double-pass approach // this is just an optimized writeString() that re-uses scratchBytes. - scratchBytes = ArrayUtil.grow(scratchBytes, maxLen); + if (scratchBytes == null) { + scratchBytes = new byte[ArrayUtil.oversize(maxLen, Character.BYTES)]; + } else { + scratchBytes = ArrayUtil.grow(scratchBytes, maxLen); + } int len = UnicodeUtil.UTF16toUTF8(string, 0, string.length(), scratchBytes); writeVInt(len); writeBytes(scratchBytes, len); @@ -80,4 +88,16 @@ public final class GrowableByteArrayDataOutput extends DataOutput { length = UnicodeUtil.UTF16toUTF8(string, 0, string.length(), bytes, length); } } + + public byte[] getBytes() { + return bytes; + } + + public int getPosition() { + return length; + } + + public void reset() { + length = 0; + } } diff --git a/lucene/core/src/java/org/apache/lucene/util/bkd/BKDWriter.java b/lucene/core/src/java/org/apache/lucene/util/bkd/BKDWriter.java index b9fd37cb042..81eb5a7e274 100644 --- a/lucene/core/src/java/org/apache/lucene/util/bkd/BKDWriter.java +++ b/lucene/core/src/java/org/apache/lucene/util/bkd/BKDWriter.java @@ -30,7 +30,9 @@ import org.apache.lucene.index.MergeState; import org.apache.lucene.index.PointValues.IntersectVisitor; import org.apache.lucene.index.PointValues.Relation; import org.apache.lucene.store.ChecksumIndexInput; +import org.apache.lucene.store.DataOutput; import org.apache.lucene.store.Directory; +import org.apache.lucene.store.GrowableByteArrayDataOutput; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexOutput; import org.apache.lucene.store.RAMOutputStream; @@ -478,8 +480,8 @@ public class BKDWriter implements Closeable { } build(1, numLeaves, reader, 0, Math.toIntExact(pointCount), out, - minPackedValue, maxPackedValue, splitPackedValues, leafBlockFPs, - new int[maxPointsInLeafNode]); + minPackedValue, maxPackedValue, splitPackedValues, leafBlockFPs, + new int[maxPointsInLeafNode]); long indexFP = out.getFilePointer(); writeIndex(out, leafBlockFPs, splitPackedValues); @@ -556,6 +558,9 @@ public class BKDWriter implements Closeable { return oneDimWriter.finish(); } + // reused when writing leaf blocks + private final GrowableByteArrayDataOutput scratchOut = new GrowableByteArrayDataOutput(32*1024); + private class OneDimensionBKDWriter { final IndexOutput out; @@ -563,8 +568,8 @@ public class BKDWriter implements Closeable { final List leafBlockStartValues = new ArrayList<>(); final byte[] leafValues = new byte[maxPointsInLeafNode * packedBytesLength]; final int[] leafDocs = new int[maxPointsInLeafNode]; - long valueCount; - int leafCount; + private long valueCount; + private int leafCount; OneDimensionBKDWriter(IndexOutput out) { if (numDims != 1) { @@ -589,7 +594,7 @@ public class BKDWriter implements Closeable { // for asserts final byte[] lastPackedValue; - int lastDocID; + private int lastDocID; void add(byte[] packedValue, int docID) throws IOException { assert valueInOrder(valueCount + leafCount, @@ -606,8 +611,7 @@ public class BKDWriter implements Closeable { if (leafCount == maxPointsInLeafNode) { // We write a block once we hit exactly the max count ... this is different from - // when we flush a new segment, where we write between max/2 and max per leaf block, - // so merged segments will behave differently from newly flushed segments: + // when we write N > 1 dimensional points where we write between max/2 and max per leaf block writeLeafBlock(); leafCount = 0; } @@ -644,7 +648,6 @@ public class BKDWriter implements Closeable { } private void writeLeafBlock() throws IOException { - //System.out.println("writeLeafBlock pos=" + out.getFilePointer()); assert leafCount != 0; if (valueCount == 0) { System.arraycopy(leafValues, 0, minPackedValue, 0, packedBytesLength); @@ -660,42 +663,39 @@ public class BKDWriter implements Closeable { leafBlockFPs.add(out.getFilePointer()); checkMaxLeafNodeCount(leafBlockFPs.size()); - Arrays.fill(commonPrefixLengths, bytesPerDim); // Find per-dim common prefix: - for(int dim=0;dim packedValues = new IntFunction() { - final BytesRef scratch = new BytesRef(); - - { - scratch.length = packedBytesLength; - scratch.bytes = leafValues; - } - @Override public BytesRef apply(int i) { - scratch.offset = packedBytesLength * i; - return scratch; + scratchBytesRef1.offset = packedBytesLength * i; + return scratchBytesRef1; } }; assert valuesInOrderAndBounds(leafCount, 0, Arrays.copyOf(leafValues, packedBytesLength), Arrays.copyOfRange(leafValues, (leafCount - 1) * packedBytesLength, leafCount * packedBytesLength), packedValues, leafDocs, 0); - writeLeafBlockPackedValues(out, commonPrefixLengths, leafCount, 0, packedValues); + writeLeafBlockPackedValues(scratchOut, commonPrefixLengths, leafCount, 0, packedValues); + out.writeBytes(scratchOut.getBytes(), 0, scratchOut.getPosition()); + scratchOut.reset(); } - } // TODO: there must be a simpler way? @@ -1259,13 +1259,13 @@ public class BKDWriter implements Closeable { out.writeBytes(packedIndex, 0, packedIndex.length); } - private void writeLeafBlockDocs(IndexOutput out, int[] docIDs, int start, int count) throws IOException { + private void writeLeafBlockDocs(DataOutput out, int[] docIDs, int start, int count) throws IOException { assert count > 0: "maxPointsInLeafNode=" + maxPointsInLeafNode; out.writeVInt(count); DocIdsWriter.writeDocIds(docIDs, start, count, out); } - private void writeLeafBlockPackedValues(IndexOutput out, int[] commonPrefixLengths, int count, int sortedDim, IntFunction packedValues) throws IOException { + private void writeLeafBlockPackedValues(DataOutput out, int[] commonPrefixLengths, int count, int sortedDim, IntFunction packedValues) throws IOException { int prefixLenSum = Arrays.stream(commonPrefixLengths).sum(); if (prefixLenSum == packedBytesLength) { // all values in this block are equal @@ -1290,7 +1290,7 @@ public class BKDWriter implements Closeable { } } - private void writeLeafBlockPackedValuesRange(IndexOutput out, int[] commonPrefixLengths, int start, int end, IntFunction packedValues) throws IOException { + private void writeLeafBlockPackedValuesRange(DataOutput out, int[] commonPrefixLengths, int start, int end, IntFunction packedValues) throws IOException { for (int i = start; i < end; ++i) { BytesRef ref = packedValues.apply(i); assert ref.length == packedBytesLength; @@ -1316,7 +1316,7 @@ public class BKDWriter implements Closeable { return end - start; } - private void writeCommonPrefixes(IndexOutput out, int[] commonPrefixes, byte[] packedValue) throws IOException { + private void writeCommonPrefixes(DataOutput out, int[] commonPrefixes, byte[] packedValue) throws IOException { for(int dim=0;dim