diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/FrontCodedIndexedBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/FrontCodedIndexedBenchmark.java index 30656630656..00be9202532 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/FrontCodedIndexedBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/FrontCodedIndexedBenchmark.java @@ -86,19 +86,29 @@ public class FrontCodedIndexedBenchmark @Param({"16"}) public int width; - @Param({"generic", "front-coded-4", "front-coded-16"}) + @Param({ + "generic", + "front-coded-4", + "front-coded-16", + "front-coded-incremental-buckets-4", + "front-coded-incremental-buckets-16" + }) public String indexType; @Param({"10000"}) public int numOperations; private File fileFrontCoded; + private File fileFrontCodedIncrementalBuckets; private File fileGeneric; private File smooshDirFrontCoded; + private File smooshDirFrontCodedIncrementalBuckets; private File smooshDirGeneric; private GenericIndexed genericIndexed; private FrontCodedIndexed frontCodedIndexed; + private FrontCodedIndexed frontCodedIndexedIncrementalBuckets; + private Indexed indexed; private String[] values; @@ -128,31 +138,54 @@ public class FrontCodedIndexedBenchmark FrontCodedIndexedWriter frontCodedIndexedWriter = new FrontCodedIndexedWriter( new OnHeapMemorySegmentWriteOutMedium(), ByteOrder.nativeOrder(), - "front-coded-4".equals(indexType) ? 4 : 16 + "front-coded-4".equals(indexType) ? 4 : 16, + false ); frontCodedIndexedWriter.open(); + FrontCodedIndexedWriter frontCodedIndexedWriterIncrementalBuckets = new FrontCodedIndexedWriter( + new OnHeapMemorySegmentWriteOutMedium(), + ByteOrder.nativeOrder(), + "front-coded-incremental-buckets-4".equals(indexType) ? 4 : 16, + true + ); + frontCodedIndexedWriterIncrementalBuckets.open(); + int count = 0; while (iterator.hasNext()) { final String next = iterator.next(); values[count++] = next; frontCodedIndexedWriter.write(StringUtils.toUtf8Nullable(next)); genericIndexedWriter.write(next); + frontCodedIndexedWriterIncrementalBuckets.write(StringUtils.toUtf8Nullable(next)); } smooshDirFrontCoded = FileUtils.createTempDir(); fileFrontCoded = File.createTempFile("frontCodedIndexedBenchmark", "meta"); + smooshDirGeneric = FileUtils.createTempDir(); fileGeneric = File.createTempFile("genericIndexedBenchmark", "meta"); + smooshDirFrontCodedIncrementalBuckets = FileUtils.createTempDir(); + fileFrontCodedIncrementalBuckets = File.createTempFile("frontCodedIndexedBenchmarkIncrementalBuckets", "meta"); + EncodingSizeProfiler.encodedSize = (int) ("generic".equals(indexType) ? genericIndexedWriter.getSerializedSize() - : frontCodedIndexedWriter.getSerializedSize()); + : indexType.startsWith("front-coded-incremental-buckets") + ? frontCodedIndexedWriterIncrementalBuckets.getSerializedSize() + : frontCodedIndexedWriter.getSerializedSize()); try ( FileChannel fileChannelFrontCoded = FileChannel.open( fileFrontCoded.toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE ); FileSmoosher fileSmoosherFrontCoded = new FileSmoosher(smooshDirFrontCoded); + + FileChannel fileChannelFrontCodedIncrementalBuckets = FileChannel.open( + fileFrontCodedIncrementalBuckets.toPath(), + StandardOpenOption.CREATE, StandardOpenOption.WRITE + ); + FileSmoosher fileSmoosherFrontCodedIncrementalBuckets = new FileSmoosher(smooshDirFrontCodedIncrementalBuckets); + FileChannel fileChannelGeneric = FileChannel.open( fileGeneric.toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE @@ -161,6 +194,10 @@ public class FrontCodedIndexedBenchmark ) { frontCodedIndexedWriter.writeTo(fileChannelFrontCoded, fileSmoosherFrontCoded); genericIndexedWriter.writeTo(fileChannelGeneric, fileSmoosherGeneric); + frontCodedIndexedWriterIncrementalBuckets.writeTo( + fileChannelFrontCodedIncrementalBuckets, + fileSmoosherFrontCodedIncrementalBuckets + ); } FileChannel fileChannelGeneric = FileChannel.open(fileGeneric.toPath()); @@ -172,6 +209,13 @@ public class FrontCodedIndexedBenchmark fileFrontCoded.length() ); + FileChannel fileChannelFrontCodedIncrementalBuckets = FileChannel.open(fileFrontCodedIncrementalBuckets.toPath()); + MappedByteBuffer byteBufferFrontCodedIncrementalBuckets = fileChannelFrontCodedIncrementalBuckets.map( + FileChannel.MapMode.READ_ONLY, + 0, + fileFrontCodedIncrementalBuckets.length() + ); + genericIndexed = GenericIndexed.read( byteBufferGeneric, GenericIndexed.UTF8_STRATEGY, @@ -181,19 +225,29 @@ public class FrontCodedIndexedBenchmark byteBufferFrontCoded.order(ByteOrder.nativeOrder()), ByteOrder.nativeOrder() ).get(); + frontCodedIndexedIncrementalBuckets = FrontCodedIndexed.read( + byteBufferFrontCodedIncrementalBuckets.order(ByteOrder.nativeOrder()), + ByteOrder.nativeOrder() + ).get(); // sanity test for (int i = 0; i < numElements; i++) { final String expected = StringUtils.fromUtf8Nullable(genericIndexed.get(i)); final String actual = StringUtils.fromUtf8Nullable(frontCodedIndexed.get(i)); + final String actual2 = StringUtils.fromUtf8Nullable(frontCodedIndexedIncrementalBuckets.get(i)); Preconditions.checkArgument( Objects.equals(expected, actual), "elements not equal: " + i + " " + expected + " " + actual ); + Preconditions.checkArgument( + Objects.equals(expected, actual2), + "elements not equal (incremental buckets): " + i + " " + expected + " " + actual + ); } Iterator genericIterator = genericIndexed.iterator(); Iterator frontCodedIterator = frontCodedIndexed.iterator(); + Iterator frontCodedIteratorIncrementalBuckets = frontCodedIndexedIncrementalBuckets.iterator(); Iterator frontCodedStringIterator = new StringEncodingStrategies.Utf8ToStringIndexed(frontCodedIndexed).iterator(); @@ -202,6 +256,7 @@ public class FrontCodedIndexedBenchmark final String expected = StringUtils.fromUtf8Nullable(genericIterator.next()); final String actual = StringUtils.fromUtf8Nullable(frontCodedIterator.next()); final String actual2 = frontCodedStringIterator.next(); + final String actual3 = StringUtils.fromUtf8Nullable(frontCodedIteratorIncrementalBuckets.next()); Preconditions.checkArgument( Objects.equals(expected, actual), "elements not equal: " + counter + " " + expected + " " + actual @@ -210,11 +265,16 @@ public class FrontCodedIndexedBenchmark Objects.equals(expected, actual2), "elements not equal: " + counter + " " + expected + " " + actual ); + Preconditions.checkArgument( + Objects.equals(expected, actual3), + "elements not equal: " + counter + " " + expected + " " + actual + ); counter++; } Preconditions.checkArgument(counter == numElements); Preconditions.checkArgument(genericIterator.hasNext() == frontCodedIterator.hasNext()); Preconditions.checkArgument(genericIterator.hasNext() == frontCodedStringIterator.hasNext()); + Preconditions.checkArgument(genericIterator.hasNext() == frontCodedIteratorIncrementalBuckets.hasNext()); elementsToSearch = new String[numOperations]; for (int i = 0; i < numOperations; i++) { @@ -226,6 +286,8 @@ public class FrontCodedIndexedBenchmark } if ("generic".equals(indexType)) { indexed = genericIndexed.singleThreaded(); + } else if (indexType.startsWith("front-coded-incremental-buckets")) { + indexed = frontCodedIndexedIncrementalBuckets; } else { indexed = frontCodedIndexed; } diff --git a/processing/src/main/java/org/apache/druid/segment/column/StringEncodingStrategies.java b/processing/src/main/java/org/apache/druid/segment/column/StringEncodingStrategies.java index 2427658b669..0c04f0288c8 100644 --- a/processing/src/main/java/org/apache/druid/segment/column/StringEncodingStrategies.java +++ b/processing/src/main/java/org/apache/druid/segment/column/StringEncodingStrategies.java @@ -53,10 +53,12 @@ public class StringEncodingStrategies // writing the dictionary itself DictionaryWriter writer; if (StringEncodingStrategy.FRONT_CODED.equals(encodingStrategy.getType())) { + StringEncodingStrategy.FrontCoded strategy = (StringEncodingStrategy.FrontCoded) encodingStrategy; writer = new FrontCodedIndexedWriter( writeoutMedium, IndexIO.BYTE_ORDER, - ((StringEncodingStrategy.FrontCoded) encodingStrategy).getBucketSize() + strategy.getBucketSize(), + true ); } else { throw new ISE("Unknown encoding strategy: %s", encodingStrategy.getType()); diff --git a/processing/src/main/java/org/apache/druid/segment/column/StringEncodingStrategy.java b/processing/src/main/java/org/apache/druid/segment/column/StringEncodingStrategy.java index fedabb206a9..e850b0c362a 100644 --- a/processing/src/main/java/org/apache/druid/segment/column/StringEncodingStrategy.java +++ b/processing/src/main/java/org/apache/druid/segment/column/StringEncodingStrategy.java @@ -86,12 +86,6 @@ public interface StringEncodingStrategy { public static final int DEFAULT_BUCKET_SIZE = 4; - @JsonProperty - public int getBucketSize() - { - return bucketSize; - } - @JsonProperty private final int bucketSize; @@ -106,6 +100,12 @@ public interface StringEncodingStrategy } } + @JsonProperty + public int getBucketSize() + { + return bucketSize; + } + @Override public String getType() { diff --git a/processing/src/main/java/org/apache/druid/segment/data/FrontCodedIndexed.java b/processing/src/main/java/org/apache/druid/segment/data/FrontCodedIndexed.java index 2596f7ec2bf..cc0f280405f 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/FrontCodedIndexed.java +++ b/processing/src/main/java/org/apache/druid/segment/data/FrontCodedIndexed.java @@ -21,6 +21,7 @@ package org.apache.druid.segment.data; import com.google.common.base.Preconditions; import com.google.common.base.Supplier; +import org.apache.druid.annotations.SuppressFBWarnings; import org.apache.druid.common.config.NullHandling; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; @@ -38,37 +39,39 @@ import java.util.NoSuchElementException; * sorted and unique, using 'front coding'. Front coding is a type of delta encoding for byte arrays, where sorted * values are grouped into buckets. The first value of the bucket is written entirely, and remaining values are stored * as a pair of an integer which indicates how much of the first byte array of the bucket to use as a prefix, followed - * by the remaining bytes after the prefix to complete the value. - * + * by the remaining bytes after the prefix to complete the value. If using 'incremental' buckets, instead of using the + * prefix of the first bucket value, instead the prefix is computed against the immediately preceding value in the + * bucket. + *

* front coded indexed layout: * | version | bucket size | has null? | number of values | size of "offsets" + "buckets" | "offsets" | "buckets" | * | ------- | ----------- | --------- | ---------------- | ----------------------------- | --------- | --------- | * | byte | byte | byte | vbyte int | vbyte int | int[] | bucket[] | - * + *

* "offsets" are the ending offsets of each bucket stored in order, stored as plain integers for easy random access. - * + *

* bucket layout: * | first value | prefix length | fragment | ... | prefix length | fragment | * | ----------- | ------------- | -------- | --- | ------------- | -------- | * | blob | vbyte int | blob | ... | vbyte int | blob | - * + *

* blob layout: * | blob length | blob bytes | * | ----------- | ---------- | * | vbyte int | byte[] | - * - * + *

+ *

* Getting a value first picks the appropriate bucket, finds its offset in the underlying buffer, then scans the bucket * values to seek to the correct position of the value within the bucket in order to reconstruct it using the prefix * length. - * + *

* Finding the index of a value involves binary searching the first values of each bucket to find the correct bucket, * then a linear scan within the bucket to find the matching value (or negative insertion point -1 for values that * are not present). - * + *

* The value iterator reads an entire bucket at a time, reconstructing the values into an array to iterate within the * bucket before moving onto the next bucket as the iterator is consumed. - * + *

* This class is not thread-safe since during operation modifies positions of a shared buffer. */ public final class FrontCodedIndexed implements Indexed @@ -77,7 +80,7 @@ public final class FrontCodedIndexed implements Indexed { final ByteBuffer orderedBuffer = buffer.asReadOnlyBuffer().order(ordering); final byte version = orderedBuffer.get(); - Preconditions.checkArgument(version == 0, "only V0 exists, encountered " + version); + Preconditions.checkArgument(version == 0 || version == 1, "only V0 and V1 exist, encountered " + version); final int bucketSize = Byte.toUnsignedInt(orderedBuffer.get()); final boolean hasNull = NullHandling.IS_NULL_BYTE == orderedBuffer.get(); final int numValues = VByte.readInt(orderedBuffer); @@ -93,7 +96,8 @@ public final class FrontCodedIndexed implements Indexed bucketSize, numValues, hasNull, - offsetsPosition + offsetsPosition, + version ); } @@ -101,6 +105,8 @@ public final class FrontCodedIndexed implements Indexed private final int adjustedNumValues; private final int adjustIndex; private final int bucketSize; + private final int[] unwindPrefixLength; + private final int[] unwindBufferPosition; private final int numBuckets; private final int div; private final int rem; @@ -109,13 +115,19 @@ public final class FrontCodedIndexed implements Indexed private final boolean hasNull; private final int lastBucketNumValues; + private final GetBucketValue getBucketValueFn; + private final ReadBucket readBucketFn; + private final FindInBucket findInBucketFn; + + @SuppressFBWarnings(value = "NP_STORE_INTO_NONNULL_FIELD", justification = "V0 does not use unwindPrefixLength or unwindBufferPosition") private FrontCodedIndexed( ByteBuffer buffer, ByteOrder order, int bucketSize, int numValues, boolean hasNull, - int offsetsPosition + int offsetsPosition, + byte version ) { if (Integer.bitCount(bucketSize) != 1) { @@ -133,6 +145,23 @@ public final class FrontCodedIndexed implements Indexed this.lastBucketNumValues = (numValues & rem) == 0 ? bucketSize : numValues & rem; this.offsetsPosition = offsetsPosition; this.bucketsPosition = offsetsPosition + ((numBuckets - 1) * Integer.BYTES); + if (version == 0) { + // version zero, all prefixes are computed against the first value in the bucket + this.getBucketValueFn = FrontCodedIndexed::getFromBucketV0; + this.readBucketFn = FrontCodedIndexed::readBucketV0; + this.findInBucketFn = this::findValueInBucketV0; + //noinspection DataFlowIssue + this.unwindPrefixLength = null; + //noinspection DataFlowIssue + this.unwindBufferPosition = null; + } else { + // version one uses 'incremental' buckets, where the prefix is computed against the previous value + this.unwindPrefixLength = new int[bucketSize]; + this.unwindBufferPosition = new int[bucketSize]; + this.getBucketValueFn = this::getFromBucketV1; + this.readBucketFn = this::readBucketV1; + this.findInBucketFn = this::findValueInBucketV1; + } } @Override @@ -158,7 +187,7 @@ public final class FrontCodedIndexed implements Indexed final int bucketIndex = adjustedIndex & rem; final int offset = getBucketOffset(bucket); buffer.position(offset); - return getFromBucket(buffer, bucketIndex); + return getBucketValueFn.get(buffer, bucketIndex); } @Override @@ -216,7 +245,7 @@ public final class FrontCodedIndexed implements Indexed // find the value in the bucket (or where it would be if it were present) buffer.position(firstOffset + firstLength); - return findValueInBucket(value, currBucketFirstValueIndex, bucketSize, sharedPrefix); + return findInBucketFn.find(value, currBucketFirstValueIndex, bucketSize, sharedPrefix); } else if (comparison < 0) { minBucketIndex = currentBucket + 1; } else { @@ -251,12 +280,12 @@ public final class FrontCodedIndexed implements Indexed if (comparison > 0) { // value preceedes bucket, so bail out - return -(bucketIndexBase + adjustIndex) - 1; + return ~(bucketIndexBase + adjustIndex); } buffer.position(firstOffset + firstLength); - return findValueInBucket(value, bucketIndexBase, numValuesInBucket, sharedPrefix); + return findInBucketFn.find(value, bucketIndexBase, numValuesInBucket, sharedPrefix); } @Override @@ -274,7 +303,7 @@ public final class FrontCodedIndexed implements Indexed } ByteBuffer copy = buffer.asReadOnlyBuffer().order(buffer.order()); copy.position(bucketsPosition); - final ByteBuffer[] firstBucket = readBucket(copy, numBuckets > 1 ? bucketSize : lastBucketNumValues); + final ByteBuffer[] firstBucket = readBucketFn.readBucket(copy, numBuckets > 1 ? bucketSize : lastBucketNumValues); // iterator decodes and buffers a bucket at a time, paging through buckets as the iterator is consumed return new Iterator() { @@ -305,7 +334,7 @@ public final class FrontCodedIndexed implements Indexed if (bucketNum != currentBucketIndex) { final int offset = copy.getInt(offsetsPosition + ((bucketNum - 1) * Integer.BYTES)); copy.position(bucketsPosition + offset); - currentBucket = readBucket( + currentBucket = readBucketFn.readBucket( copy, bucketNum < (numBuckets - 1) ? bucketSize : lastBucketNumValues ); @@ -345,7 +374,7 @@ public final class FrontCodedIndexed implements Indexed * MUST be prepared before calling, as it expects the length of the first value to have already been read externally, * and the buffer position to be at the start of the first bucket value. The final buffer position will be the * 'shared prefix length' of the first value in the bucket and the value to compare. - * + *

* Bytes are compared using {@link StringUtils#compareUtf8UsingJavaStringOrdering(byte, byte)}. Therefore, when the * values are UTF-8 encoded strings, the ordering is compatible with {@link String#compareTo(String)}. */ @@ -372,17 +401,17 @@ public final class FrontCodedIndexed implements Indexed * Finds a value in a bucket among the fragments. The first value is assumed to have been already compared against * and be smaller than the value we are looking for. This comparison is the source of the 'shared prefix', which is * the length which the value has in common with the first value of the bucket. - * + *

* This method uses this shared prefix length to skip more expensive byte by byte full value comparisons when * possible by comparing the shared prefix length with the prefix length of the fragment. Since the bucket is always * sorted, prefix lengths shrink as you progress to higher indexes, and we can use this to reason that a fragment * with a longer prefix length than the shared prefix will always sort before the value we are looking for, and values * which have a shorter prefix will always be greater than the value we are looking for, so we only need to do a * full comparison if the prefix length is the same - * + *

* this method modifies the position of {@link #buffer} */ - private int findValueInBucket( + private int findValueInBucketV0( ByteBuffer value, int currBucketFirstValueIndex, int bucketSize, @@ -396,6 +425,7 @@ public final class FrontCodedIndexed implements Indexed while (++relativePosition < bucketSize) { prefixLength = VByte.readInt(buffer); if (prefixLength > sharedPrefix) { + // this value shares more in common with the first value, so the value we are looking for comes after final int skip = VByte.readInt(buffer); buffer.position(buffer.position() + skip); insertionPoint++; @@ -430,20 +460,20 @@ public final class FrontCodedIndexed implements Indexed } } // (-(insertion point) - 1) - return -(currBucketFirstValueIndex + adjustIndex) + (-(insertionPoint) - 1); + return -(currBucketFirstValueIndex + adjustIndex) + (~insertionPoint); } /** * Get a value from a bucket at a relative position. - * + *

* This method modifies the position of the buffer. */ - static ByteBuffer getFromBucket(ByteBuffer buffer, int offset) + static ByteBuffer getFromBucketV0(ByteBuffer buffer, int offset) { int prefixPosition; if (offset == 0) { final int length = VByte.readInt(buffer); - final ByteBuffer firstValue = buffer.asReadOnlyBuffer().order(buffer.order()); + final ByteBuffer firstValue = buffer.asReadOnlyBuffer(); firstValue.limit(firstValue.position() + length); return firstValue; } else { @@ -485,10 +515,10 @@ public final class FrontCodedIndexed implements Indexed /** * Read an entire bucket from a {@link ByteBuffer}, returning an array of reconstructed value bytes. - * + *

* This method modifies the position of the buffer. */ - private static ByteBuffer[] readBucket(ByteBuffer bucket, int numValues) + private static ByteBuffer[] readBucketV0(ByteBuffer bucket, int numValues) { final int length = VByte.readInt(bucket); final byte[] prefixBytes = new byte[length]; @@ -509,4 +539,177 @@ public final class FrontCodedIndexed implements Indexed } return bucketBuffers; } + + /** + * Finds a value in a bucket among the fragments. The first value is assumed to have been already compared against + * and be smaller than the value we are looking for. This comparison is the source of the 'shared prefix', which is + * the length which the value has in common with the previous value of the bucket. + *

+ * This method uses this shared prefix length to skip more expensive byte by byte full value comparisons when + * possible by comparing the shared prefix length with the prefix length of the fragment. Since the bucket is always + * sorted, prefix lengths shrink as you progress to higher indexes, and we can use this to reason that a fragment + * with a longer prefix length than the shared prefix will always sort before the value we are looking for, and values + * which have a shorter prefix will always be greater than the value we are looking for, so we only need to do a + * full comparison if the prefix length is the same + *

+ * this method modifies the position of {@link #buffer} + */ + private int findValueInBucketV1( + ByteBuffer value, + int currBucketFirstValueIndex, + int bucketSize, + int sharedPrefixLength + ) + { + int relativePosition = 0; + int prefixLength; + // scan through bucket values until we find match or compare numValues + int insertionPoint = 1; + while (++relativePosition < bucketSize) { + prefixLength = VByte.readInt(buffer); + if (prefixLength > sharedPrefixLength) { + // bucket value shares more in common with the preceding value, so the value we are looking for comes after + final int skip = VByte.readInt(buffer); + buffer.position(buffer.position() + skip); + insertionPoint++; + } else if (prefixLength < sharedPrefixLength) { + // bucket value prefix is smaller, that means the value we are looking for sorts ahead of it + break; + } else { + // value has the same shared prefix, so compare additional values to find + final int fragmentLength = VByte.readInt(buffer); + final int common = Math.min(fragmentLength, value.remaining() - prefixLength); + int fragmentComparison = 0; + boolean shortCircuit = false; + for (int i = 0; i < common; i++) { + fragmentComparison = StringUtils.compareUtf8UsingJavaStringOrdering( + buffer.get(buffer.position() + i), + value.get(prefixLength + i) + ); + if (fragmentComparison != 0) { + sharedPrefixLength = prefixLength + i; + shortCircuit = true; + break; + } + } + if (fragmentComparison == 0) { + fragmentComparison = Integer.compare(prefixLength + fragmentLength, value.remaining()); + } + + if (fragmentComparison == 0) { + return (currBucketFirstValueIndex + adjustIndex) + relativePosition; + } else if (fragmentComparison < 0) { + // value we are looking for is longer than the current bucket value, continue on + if (!shortCircuit) { + sharedPrefixLength = prefixLength + common; + } + buffer.position(buffer.position() + fragmentLength); + insertionPoint++; + } else { + break; + } + } + } + // (-(insertion point) - 1) + return -(currBucketFirstValueIndex + adjustIndex) + (~insertionPoint); + } + + private ByteBuffer getFromBucketV1(ByteBuffer buffer, int offset) + { + // first value is written whole + final int length = VByte.readInt(buffer); + if (offset == 0) { + // return first value directly from underlying buffer since it is stored whole + final ByteBuffer value = buffer.asReadOnlyBuffer(); + value.limit(value.position() + length); + return value; + } + int pos = 0; + int prefixLength; + int fragmentLength; + unwindPrefixLength[pos] = 0; + unwindBufferPosition[pos] = buffer.position(); + + buffer.position(buffer.position() + length); + do { + prefixLength = VByte.readInt(buffer); + if (++pos < offset) { + // not there yet, no need to read anything other than the length to skip ahead + final int skipLength = VByte.readInt(buffer); + unwindPrefixLength[pos] = prefixLength; + unwindBufferPosition[pos] = buffer.position(); + buffer.position(buffer.position() + skipLength); + } else { + // we've reached our destination + fragmentLength = VByte.readInt(buffer); + if (prefixLength == 0) { + // no prefix, return it directly from the underlying buffer + final ByteBuffer value = buffer.asReadOnlyBuffer(); + value.limit(value.position() + fragmentLength); + return value; + } + break; + } + } while (true); + final int valueLength = prefixLength + fragmentLength; + final byte[] valueBytes = new byte[valueLength]; + buffer.get(valueBytes, prefixLength, fragmentLength); + for (int i = prefixLength; i > 0;) { + // previous value had a larger prefix than or the same as the value we are looking for + // skip it since the fragment doesn't have anything we need + if (unwindPrefixLength[--pos] >= i) { + continue; + } + buffer.position(unwindBufferPosition[pos]); + buffer.get(valueBytes, unwindPrefixLength[pos], i - unwindPrefixLength[pos]); + i = unwindPrefixLength[pos]; + } + return ByteBuffer.wrap(valueBytes); + } + + /** + * Read an entire bucket from a {@link ByteBuffer}, returning an array of reconstructed value bytes. + *

+ * This method modifies the position of the buffer. + */ + private ByteBuffer[] readBucketV1(ByteBuffer bucket, int numValues) + { + final ByteBuffer[] bucketBuffers = new ByteBuffer[numValues]; + + // first value is written whole + final int length = VByte.readInt(bucket); + byte[] prefixBytes = new byte[length]; + bucket.get(prefixBytes, 0, length); + bucketBuffers[0] = ByteBuffer.wrap(prefixBytes); + int pos = 1; + while (pos < numValues) { + final int prefixLength = VByte.readInt(bucket); + final int fragmentLength = VByte.readInt(bucket); + byte[] nextValueBytes = new byte[prefixLength + fragmentLength]; + System.arraycopy(prefixBytes, 0, nextValueBytes, 0, prefixLength); + bucket.get(nextValueBytes, prefixLength, fragmentLength); + final ByteBuffer value = ByteBuffer.wrap(nextValueBytes); + prefixBytes = nextValueBytes; + bucketBuffers[pos++] = value; + } + return bucketBuffers; + } + + @FunctionalInterface + interface GetBucketValue + { + ByteBuffer get(ByteBuffer buffer, int offset); + } + + @FunctionalInterface + interface ReadBucket + { + ByteBuffer[] readBucket(ByteBuffer buffer, int bucketSize); + } + + @FunctionalInterface + interface FindInBucket + { + int find(ByteBuffer value, int currBucketFirstValueIndex, int bucketSize, int sharedPrefixLength); + } } diff --git a/processing/src/main/java/org/apache/druid/segment/data/FrontCodedIndexedWriter.java b/processing/src/main/java/org/apache/druid/segment/data/FrontCodedIndexedWriter.java index bcbe47db624..c5a26f3a597 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/FrontCodedIndexedWriter.java +++ b/processing/src/main/java/org/apache/druid/segment/data/FrontCodedIndexedWriter.java @@ -42,7 +42,8 @@ import java.nio.channels.WritableByteChannel; * * Front coding is a type of delta encoding for byte arrays, where values are grouped into buckets. The first value of * the bucket is written entirely, and remaining values are stored as pairs of an integer which indicates how much - * of the first byte array of the bucket to use as a prefix, followed by the remaining value bytes after the prefix. + * of the first byte array of the bucket to use as a prefix, (or the preceding value of the bucket if using + * 'incremental' buckets) followed by the remaining value bytes after the prefix. * * This writer is designed for use with UTF-8 encoded strings that are written in an order compatible with * {@link String#compareTo(String)}. @@ -58,6 +59,7 @@ public class FrontCodedIndexedWriter implements DictionaryWriter private final byte[][] bucketBuffer; private final ByteBuffer getOffsetBuffer; private final int div; + private final boolean useIncrementalBuckets; @Nullable private byte[] prevObject = null; @@ -71,10 +73,12 @@ public class FrontCodedIndexedWriter implements DictionaryWriter private boolean isClosed = false; private boolean hasNulls = false; + public FrontCodedIndexedWriter( SegmentWriteOutMedium segmentWriteOutMedium, ByteOrder byteOrder, - int bucketSize + int bucketSize, + boolean useIncrementalBuckets ) { if (Integer.bitCount(bucketSize) != 1 || bucketSize < 1 || bucketSize > 128) { @@ -87,6 +91,7 @@ public class FrontCodedIndexedWriter implements DictionaryWriter this.bucketBuffer = new byte[bucketSize][]; this.getOffsetBuffer = ByteBuffer.allocate(Integer.BYTES).order(byteOrder); this.div = Integer.numberOfTrailingZeros(bucketSize); + this.useIncrementalBuckets = useIncrementalBuckets; } @Override @@ -119,7 +124,9 @@ public class FrontCodedIndexedWriter implements DictionaryWriter int written; // write the bucket, growing scratch buffer as necessary do { - written = writeBucket(scratch, bucketBuffer, bucketSize); + written = useIncrementalBuckets + ? writeIncrementalBucket(scratch, bucketBuffer, bucketSize) + : writeBucket(scratch, bucketBuffer, bucketSize); if (written < 0) { growScratch(); } @@ -163,8 +170,14 @@ public class FrontCodedIndexedWriter implements DictionaryWriter flush(); } resetScratch(); - // version 0 - scratch.put((byte) 0); + + if (useIncrementalBuckets) { + // version 1 is incremental buckets + scratch.put((byte) 1); + } else { + // version 0 all values are prefixed on first bucket value + scratch.put((byte) 0); + } scratch.put((byte) bucketSize); scratch.put(hasNulls ? NullHandling.IS_NULL_BYTE : NullHandling.IS_NOT_NULL_BYTE); VByte.writeInt(scratch, numWritten); @@ -202,14 +215,16 @@ public class FrontCodedIndexedWriter implements DictionaryWriter startOffset = getBucketOffset(bucket - 1); } long endOffset = getBucketOffset(bucket); - int bucketSize = Ints.checkedCast(endOffset - startOffset); - if (bucketSize == 0) { + int bucketBytesSize = Ints.checkedCast(endOffset - startOffset); + if (bucketBytesSize == 0) { return null; } - final ByteBuffer bucketBuffer = ByteBuffer.allocate(bucketSize).order(byteOrder); + final ByteBuffer bucketBuffer = ByteBuffer.allocate(bucketBytesSize).order(byteOrder); valuesOut.readFully(startOffset, bucketBuffer); bucketBuffer.clear(); - final ByteBuffer valueBuffer = FrontCodedIndexed.getFromBucket(bucketBuffer, relativeIndex); + final ByteBuffer valueBuffer = useIncrementalBuckets + ? getFromBucketV1(bucketBuffer, relativeIndex, bucketSize) + : FrontCodedIndexed.getFromBucketV0(bucketBuffer, relativeIndex); final byte[] valueBytes = new byte[valueBuffer.limit() - valueBuffer.position()]; valueBuffer.get(valueBytes); return valueBytes; @@ -232,7 +247,10 @@ public class FrontCodedIndexedWriter implements DictionaryWriter resetScratch(); int written; do { - written = writeBucket(scratch, bucketBuffer, remainder == 0 ? bucketSize : remainder); + int flushSize = remainder == 0 ? bucketSize : remainder; + written = useIncrementalBuckets + ? writeIncrementalBucket(scratch, bucketBuffer, flushSize) + : writeBucket(scratch, bucketBuffer, flushSize); if (written < 0) { growScratch(); } @@ -308,6 +326,57 @@ public class FrontCodedIndexedWriter implements DictionaryWriter return written; } + /** + * Write bucket of values to a {@link ByteBuffer}. The first value is written completely, subsequent values are + * written with an integer to indicate how much of the preceding value in the bucket is a prefix of the value, + * followed by the remaining bytes of the value. + * + * Uses {@link VByte} encoded integers to indicate prefix length and value length. + */ + public static int writeIncrementalBucket(ByteBuffer buffer, byte[][] values, int numValues) + { + int written = 0; + byte[] prev = null; + while (written < numValues) { + byte[] next = values[written]; + if (written == 0) { + prev = next; + // the first value in the bucket is written completely as it is + int rem = writeValue(buffer, prev); + // wasn't enough room, bail out + if (rem < 0) { + return rem; + } + } else { + // all other values must be partitioned into a prefix length and suffix bytes + int prefixLength = 0; + for (; prefixLength < prev.length; prefixLength++) { + final int cmp = StringUtils.compareUtf8UsingJavaStringOrdering(prev[prefixLength], next[prefixLength]); + if (cmp != 0) { + break; + } + } + // convert to bytes because not every char is a single byte + final byte[] suffix = new byte[next.length - prefixLength]; + System.arraycopy(next, prefixLength, suffix, 0, suffix.length); + int rem = buffer.remaining() - VByte.computeIntSize(prefixLength); + // wasn't enough room, bail out + if (rem < 0) { + return rem; + } + VByte.writeInt(buffer, prefixLength); + rem = writeValue(buffer, suffix); + prev = next; + // wasn't enough room, bail out + if (rem < 0) { + return rem; + } + } + written++; + } + return written; + } + /** * Write a variable length byte[] value to a {@link ByteBuffer}, storing the length as a {@link VByte} encoded * integer followed by the value itself. Returns the number of bytes written to the buffer. This method returns a @@ -344,4 +413,71 @@ public class FrontCodedIndexedWriter implements DictionaryWriter return StringUtils.compareUtf8UsingJavaStringOrdering(b1, b2); } + + + /** + * same as {@link FrontCodedIndexed#getFromBucketV1(ByteBuffer, int)} but without re-using prefixLength and buffer position + * arrays so has more overhead/garbage creation than the instance method. + * + * Note: adding the unwindPrefixLength and unwindBufferPosition arrays as arguments and having + * {@link FrontCodedIndexed#getFromBucketV1(ByteBuffer, int)} call this static method added 5-10ns of overhead + * compared to having its own copy of the code, presumably due to the overhead of an additional method call and extra + * arguments. + * + * As such, since the writer is the only user of this method, it has been copied here... + */ + static ByteBuffer getFromBucketV1(ByteBuffer buffer, int offset, int bucketSize) + { + final int[] unwindPrefixLength = new int[bucketSize]; + final int[] unwindBufferPosition = new int[bucketSize]; + // first value is written whole + final int length = VByte.readInt(buffer); + if (offset == 0) { + // return first value directly from underlying buffer since it is stored whole + final ByteBuffer value = buffer.asReadOnlyBuffer(); + value.limit(value.position() + length); + return value; + } + int pos = 0; + int prefixLength; + int fragmentLength; + unwindPrefixLength[pos] = 0; + unwindBufferPosition[pos] = buffer.position(); + + buffer.position(buffer.position() + length); + do { + prefixLength = VByte.readInt(buffer); + if (++pos < offset) { + // not there yet, no need to read anything other than the length to skip ahead + final int skipLength = VByte.readInt(buffer); + unwindPrefixLength[pos] = prefixLength; + unwindBufferPosition[pos] = buffer.position(); + buffer.position(buffer.position() + skipLength); + } else { + // we've reached our destination + fragmentLength = VByte.readInt(buffer); + if (prefixLength == 0) { + // no prefix, return it directly from the underlying buffer + final ByteBuffer value = buffer.asReadOnlyBuffer(); + value.limit(value.position() + fragmentLength); + return value; + } + break; + } + } while (true); + final int valueLength = prefixLength + fragmentLength; + final byte[] valueBytes = new byte[valueLength]; + buffer.get(valueBytes, prefixLength, fragmentLength); + for (int i = prefixLength; i > 0;) { + // previous value had a larger prefix than or the same as the value we are looking for + // skip it since the fragment doesn't have anything we need + if (unwindPrefixLength[--pos] >= i) { + continue; + } + buffer.position(unwindBufferPosition[pos]); + buffer.get(valueBytes, unwindPrefixLength[pos], i - unwindPrefixLength[pos]); + i = unwindPrefixLength[pos]; + } + return ByteBuffer.wrap(valueBytes); + } } diff --git a/processing/src/test/java/org/apache/druid/segment/data/FrontCodedIndexedTest.java b/processing/src/test/java/org/apache/druid/segment/data/FrontCodedIndexedTest.java index c9e56135365..88a4a5d3acf 100644 --- a/processing/src/test/java/org/apache/druid/segment/data/FrontCodedIndexedTest.java +++ b/processing/src/test/java/org/apache/druid/segment/data/FrontCodedIndexedTest.java @@ -43,17 +43,24 @@ import java.util.TreeSet; @RunWith(Parameterized.class) public class FrontCodedIndexedTest extends InitializedNullHandlingTest { - @Parameterized.Parameters(name = "{0}") + @Parameterized.Parameters(name = "byteOrder: {0} useIncrementalBuckets: {1}") public static Collection constructorFeeder() { - return ImmutableList.of(new Object[]{ByteOrder.LITTLE_ENDIAN}, new Object[]{ByteOrder.BIG_ENDIAN}); + return ImmutableList.of( + new Object[]{ByteOrder.LITTLE_ENDIAN, true}, + new Object[]{ByteOrder.LITTLE_ENDIAN, false}, + new Object[]{ByteOrder.BIG_ENDIAN, true}, + new Object[]{ByteOrder.BIG_ENDIAN, false} + ); } private final ByteOrder order; + private final boolean useIncrementalBuckets; - public FrontCodedIndexedTest(ByteOrder byteOrder) + public FrontCodedIndexedTest(ByteOrder byteOrder, boolean useIncrementalBuckets) { this.order = byteOrder; + this.useIncrementalBuckets = useIncrementalBuckets; } @Test @@ -61,7 +68,7 @@ public class FrontCodedIndexedTest extends InitializedNullHandlingTest { ByteBuffer buffer = ByteBuffer.allocate(1 << 12).order(order); List theList = ImmutableList.of("hello", "helloo", "hellooo", "hellooz", "helloozy"); - fillBuffer(buffer, theList, 4); + fillBuffer(buffer, theList, 4, useIncrementalBuckets); buffer.position(0); FrontCodedIndexed codedUtf8Indexed = FrontCodedIndexed.read( @@ -92,13 +99,16 @@ public class FrontCodedIndexedTest extends InitializedNullHandlingTest { ByteBuffer buffer = ByteBuffer.allocate(1 << 12).order(order); List theList = ImmutableList.of("hello", "helloo", "hellooo", "hellooz", "helloozy"); - fillBuffer(buffer, theList, 16); + fillBuffer(buffer, theList, 16, useIncrementalBuckets); FrontCodedIndexed codedUtf8Indexed = FrontCodedIndexed.read( buffer, buffer.order() ).get(); + Assert.assertEquals("hello", StringUtils.fromUtf8(codedUtf8Indexed.get(0))); Assert.assertEquals("helloo", StringUtils.fromUtf8(codedUtf8Indexed.get(1))); + Assert.assertEquals("hellooo", StringUtils.fromUtf8(codedUtf8Indexed.get(2))); + Assert.assertEquals("hellooz", StringUtils.fromUtf8(codedUtf8Indexed.get(3))); Assert.assertEquals("helloozy", StringUtils.fromUtf8(codedUtf8Indexed.get(4))); Iterator newListIterator = theList.iterator(); @@ -127,7 +137,7 @@ public class FrontCodedIndexedTest extends InitializedNullHandlingTest for (int i = 0; i < sizeBase + sizeAdjust; i++) { values.add(IdUtils.getRandomId() + IdUtils.getRandomId() + IdUtils.getRandomId() + IdUtils.getRandomId()); } - fillBuffer(buffer, values, bucketSize); + fillBuffer(buffer, values, bucketSize, useIncrementalBuckets); FrontCodedIndexed codedUtf8Indexed = FrontCodedIndexed.read( buffer, @@ -163,7 +173,7 @@ public class FrontCodedIndexedTest extends InitializedNullHandlingTest for (int i = 0; i < sizeBase + sizeAdjust; i++) { values.add(IdUtils.getRandomId() + IdUtils.getRandomId() + IdUtils.getRandomId() + IdUtils.getRandomId()); } - fillBuffer(buffer, values, bucketSize); + fillBuffer(buffer, values, bucketSize, useIncrementalBuckets); FrontCodedIndexed codedUtf8Indexed = FrontCodedIndexed.read( buffer, @@ -197,7 +207,7 @@ public class FrontCodedIndexedTest extends InitializedNullHandlingTest ByteBuffer buffer = ByteBuffer.allocate(1 << 12).order(order); List theList = ImmutableList.of("hello", "helloo", "hellooo", "hellooz", "helloozy"); - fillBuffer(buffer, theList, 4); + fillBuffer(buffer, theList, 4, useIncrementalBuckets); FrontCodedIndexed codedUtf8Indexed = FrontCodedIndexed.read( buffer, @@ -221,7 +231,7 @@ public class FrontCodedIndexedTest extends InitializedNullHandlingTest TreeSet values = new TreeSet<>(GenericIndexed.STRING_STRATEGY); values.add(null); values.addAll(theList); - fillBuffer(buffer, values, 4); + fillBuffer(buffer, values, 4, useIncrementalBuckets); FrontCodedIndexed codedUtf8Indexed = FrontCodedIndexed.read( buffer, @@ -244,7 +254,7 @@ public class FrontCodedIndexedTest extends InitializedNullHandlingTest // "\uD83D\uDCA9" and "(請參見已被刪除版本)" are a regression test for https://github.com/apache/druid/pull/13364 List theList = ImmutableList.of("Győ-Moson-Sopron", "Győr", "\uD83D\uDCA9", "(請參見已被刪除版本)"); - fillBuffer(buffer, theList, 4); + fillBuffer(buffer, theList, 4, useIncrementalBuckets); buffer.position(0); FrontCodedIndexed codedUtf8Indexed = FrontCodedIndexed.read( @@ -260,7 +270,7 @@ public class FrontCodedIndexedTest extends InitializedNullHandlingTest final ByteBuffer nextUtf8 = utf8Iterator.next(); Assert.assertEquals(next, StringUtils.fromUtf8(nextUtf8)); nextUtf8.position(0); - Assert.assertEquals(next, StringUtils.fromUtf8(codedUtf8Indexed.get(ctr))); + Assert.assertEquals("mismatch row " + ctr, next, StringUtils.fromUtf8(codedUtf8Indexed.get(ctr))); Assert.assertEquals(ctr, codedUtf8Indexed.indexOf(nextUtf8)); ctr++; } @@ -272,7 +282,7 @@ public class FrontCodedIndexedTest extends InitializedNullHandlingTest { ByteBuffer buffer = ByteBuffer.allocate(1 << 12).order(order); List theList = Collections.singletonList(null); - fillBuffer(buffer, theList, 4); + fillBuffer(buffer, theList, 4, useIncrementalBuckets); buffer.position(0); FrontCodedIndexed codedUtf8Indexed = FrontCodedIndexed.read( @@ -315,7 +325,7 @@ public class FrontCodedIndexedTest extends InitializedNullHandlingTest values.add(IdUtils.getRandomId() + IdUtils.getRandomId() + IdUtils.getRandomId() + IdUtils.getRandomId()); } for (int bucketSize : bucketSizes) { - fillBuffer(buffer, values, bucketSize); + fillBuffer(buffer, values, bucketSize, useIncrementalBuckets); FrontCodedIndexed codedUtf8Indexed = FrontCodedIndexed.read( buffer, buffer.order() @@ -352,7 +362,8 @@ public class FrontCodedIndexedTest extends InitializedNullHandlingTest () -> new FrontCodedIndexedWriter( medium, ByteOrder.nativeOrder(), - 0 + 0, + useIncrementalBuckets ) ); @@ -361,7 +372,8 @@ public class FrontCodedIndexedTest extends InitializedNullHandlingTest () -> new FrontCodedIndexedWriter( medium, ByteOrder.nativeOrder(), - 15 + 15, + useIncrementalBuckets ) ); @@ -370,20 +382,23 @@ public class FrontCodedIndexedTest extends InitializedNullHandlingTest () -> new FrontCodedIndexedWriter( medium, ByteOrder.nativeOrder(), - 256 + 256, + useIncrementalBuckets ) ); } - private static long fillBuffer(ByteBuffer buffer, Iterable sortedIterable, int bucketSize) throws IOException + private static long fillBuffer(ByteBuffer buffer, Iterable sortedIterable, int bucketSize, boolean useIncrementalBuckets) throws IOException { Iterator sortedStrings = sortedIterable.iterator(); buffer.position(0); OnHeapMemorySegmentWriteOutMedium medium = new OnHeapMemorySegmentWriteOutMedium(); - FrontCodedIndexedWriter writer = new FrontCodedIndexedWriter( + DictionaryWriter writer; + writer = new FrontCodedIndexedWriter( medium, buffer.order(), - bucketSize + bucketSize, + useIncrementalBuckets ); writer.open(); int index = 0;