better FrontCodedIndexed (#13854)

* Adds new implementation of 'frontCoded' string encoding strategy, which writes out a v1 FrontCodedIndexed which stores buckets on a prefix of the previous value instead of the first value in the bucket
This commit is contained in:
Clint Wylie 2023-03-14 18:14:11 -07:00 committed by GitHub
parent 7ce3371730
commit ed57c5c853
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 486 additions and 68 deletions

View File

@ -86,19 +86,29 @@ public class FrontCodedIndexedBenchmark
@Param({"16"})
public int width;
@Param({"generic", "front-coded-4", "front-coded-16"})
@Param({
"generic",
"front-coded-4",
"front-coded-16",
"front-coded-incremental-buckets-4",
"front-coded-incremental-buckets-16"
})
public String indexType;
@Param({"10000"})
public int numOperations;
private File fileFrontCoded;
private File fileFrontCodedIncrementalBuckets;
private File fileGeneric;
private File smooshDirFrontCoded;
private File smooshDirFrontCodedIncrementalBuckets;
private File smooshDirGeneric;
private GenericIndexed<ByteBuffer> genericIndexed;
private FrontCodedIndexed frontCodedIndexed;
private FrontCodedIndexed frontCodedIndexedIncrementalBuckets;
private Indexed<ByteBuffer> indexed;
private String[] values;
@ -128,31 +138,54 @@ public class FrontCodedIndexedBenchmark
FrontCodedIndexedWriter frontCodedIndexedWriter = new FrontCodedIndexedWriter(
new OnHeapMemorySegmentWriteOutMedium(),
ByteOrder.nativeOrder(),
"front-coded-4".equals(indexType) ? 4 : 16
"front-coded-4".equals(indexType) ? 4 : 16,
false
);
frontCodedIndexedWriter.open();
FrontCodedIndexedWriter frontCodedIndexedWriterIncrementalBuckets = new FrontCodedIndexedWriter(
new OnHeapMemorySegmentWriteOutMedium(),
ByteOrder.nativeOrder(),
"front-coded-incremental-buckets-4".equals(indexType) ? 4 : 16,
true
);
frontCodedIndexedWriterIncrementalBuckets.open();
int count = 0;
while (iterator.hasNext()) {
final String next = iterator.next();
values[count++] = next;
frontCodedIndexedWriter.write(StringUtils.toUtf8Nullable(next));
genericIndexedWriter.write(next);
frontCodedIndexedWriterIncrementalBuckets.write(StringUtils.toUtf8Nullable(next));
}
smooshDirFrontCoded = FileUtils.createTempDir();
fileFrontCoded = File.createTempFile("frontCodedIndexedBenchmark", "meta");
smooshDirGeneric = FileUtils.createTempDir();
fileGeneric = File.createTempFile("genericIndexedBenchmark", "meta");
smooshDirFrontCodedIncrementalBuckets = FileUtils.createTempDir();
fileFrontCodedIncrementalBuckets = File.createTempFile("frontCodedIndexedBenchmarkIncrementalBuckets", "meta");
EncodingSizeProfiler.encodedSize = (int) ("generic".equals(indexType)
? genericIndexedWriter.getSerializedSize()
: frontCodedIndexedWriter.getSerializedSize());
: indexType.startsWith("front-coded-incremental-buckets")
? frontCodedIndexedWriterIncrementalBuckets.getSerializedSize()
: frontCodedIndexedWriter.getSerializedSize());
try (
FileChannel fileChannelFrontCoded = FileChannel.open(
fileFrontCoded.toPath(),
StandardOpenOption.CREATE, StandardOpenOption.WRITE
);
FileSmoosher fileSmoosherFrontCoded = new FileSmoosher(smooshDirFrontCoded);
FileChannel fileChannelFrontCodedIncrementalBuckets = FileChannel.open(
fileFrontCodedIncrementalBuckets.toPath(),
StandardOpenOption.CREATE, StandardOpenOption.WRITE
);
FileSmoosher fileSmoosherFrontCodedIncrementalBuckets = new FileSmoosher(smooshDirFrontCodedIncrementalBuckets);
FileChannel fileChannelGeneric = FileChannel.open(
fileGeneric.toPath(),
StandardOpenOption.CREATE, StandardOpenOption.WRITE
@ -161,6 +194,10 @@ public class FrontCodedIndexedBenchmark
) {
frontCodedIndexedWriter.writeTo(fileChannelFrontCoded, fileSmoosherFrontCoded);
genericIndexedWriter.writeTo(fileChannelGeneric, fileSmoosherGeneric);
frontCodedIndexedWriterIncrementalBuckets.writeTo(
fileChannelFrontCodedIncrementalBuckets,
fileSmoosherFrontCodedIncrementalBuckets
);
}
FileChannel fileChannelGeneric = FileChannel.open(fileGeneric.toPath());
@ -172,6 +209,13 @@ public class FrontCodedIndexedBenchmark
fileFrontCoded.length()
);
FileChannel fileChannelFrontCodedIncrementalBuckets = FileChannel.open(fileFrontCodedIncrementalBuckets.toPath());
MappedByteBuffer byteBufferFrontCodedIncrementalBuckets = fileChannelFrontCodedIncrementalBuckets.map(
FileChannel.MapMode.READ_ONLY,
0,
fileFrontCodedIncrementalBuckets.length()
);
genericIndexed = GenericIndexed.read(
byteBufferGeneric,
GenericIndexed.UTF8_STRATEGY,
@ -181,19 +225,29 @@ public class FrontCodedIndexedBenchmark
byteBufferFrontCoded.order(ByteOrder.nativeOrder()),
ByteOrder.nativeOrder()
).get();
frontCodedIndexedIncrementalBuckets = FrontCodedIndexed.read(
byteBufferFrontCodedIncrementalBuckets.order(ByteOrder.nativeOrder()),
ByteOrder.nativeOrder()
).get();
// sanity test
for (int i = 0; i < numElements; i++) {
final String expected = StringUtils.fromUtf8Nullable(genericIndexed.get(i));
final String actual = StringUtils.fromUtf8Nullable(frontCodedIndexed.get(i));
final String actual2 = StringUtils.fromUtf8Nullable(frontCodedIndexedIncrementalBuckets.get(i));
Preconditions.checkArgument(
Objects.equals(expected, actual),
"elements not equal: " + i + " " + expected + " " + actual
);
Preconditions.checkArgument(
Objects.equals(expected, actual2),
"elements not equal (incremental buckets): " + i + " " + expected + " " + actual
);
}
Iterator<ByteBuffer> genericIterator = genericIndexed.iterator();
Iterator<ByteBuffer> frontCodedIterator = frontCodedIndexed.iterator();
Iterator<ByteBuffer> frontCodedIteratorIncrementalBuckets = frontCodedIndexedIncrementalBuckets.iterator();
Iterator<String> frontCodedStringIterator =
new StringEncodingStrategies.Utf8ToStringIndexed(frontCodedIndexed).iterator();
@ -202,6 +256,7 @@ public class FrontCodedIndexedBenchmark
final String expected = StringUtils.fromUtf8Nullable(genericIterator.next());
final String actual = StringUtils.fromUtf8Nullable(frontCodedIterator.next());
final String actual2 = frontCodedStringIterator.next();
final String actual3 = StringUtils.fromUtf8Nullable(frontCodedIteratorIncrementalBuckets.next());
Preconditions.checkArgument(
Objects.equals(expected, actual),
"elements not equal: " + counter + " " + expected + " " + actual
@ -210,11 +265,16 @@ public class FrontCodedIndexedBenchmark
Objects.equals(expected, actual2),
"elements not equal: " + counter + " " + expected + " " + actual
);
Preconditions.checkArgument(
Objects.equals(expected, actual3),
"elements not equal: " + counter + " " + expected + " " + actual
);
counter++;
}
Preconditions.checkArgument(counter == numElements);
Preconditions.checkArgument(genericIterator.hasNext() == frontCodedIterator.hasNext());
Preconditions.checkArgument(genericIterator.hasNext() == frontCodedStringIterator.hasNext());
Preconditions.checkArgument(genericIterator.hasNext() == frontCodedIteratorIncrementalBuckets.hasNext());
elementsToSearch = new String[numOperations];
for (int i = 0; i < numOperations; i++) {
@ -226,6 +286,8 @@ public class FrontCodedIndexedBenchmark
}
if ("generic".equals(indexType)) {
indexed = genericIndexed.singleThreaded();
} else if (indexType.startsWith("front-coded-incremental-buckets")) {
indexed = frontCodedIndexedIncrementalBuckets;
} else {
indexed = frontCodedIndexed;
}

View File

@ -53,10 +53,12 @@ public class StringEncodingStrategies
// writing the dictionary itself
DictionaryWriter<byte[]> writer;
if (StringEncodingStrategy.FRONT_CODED.equals(encodingStrategy.getType())) {
StringEncodingStrategy.FrontCoded strategy = (StringEncodingStrategy.FrontCoded) encodingStrategy;
writer = new FrontCodedIndexedWriter(
writeoutMedium,
IndexIO.BYTE_ORDER,
((StringEncodingStrategy.FrontCoded) encodingStrategy).getBucketSize()
strategy.getBucketSize(),
true
);
} else {
throw new ISE("Unknown encoding strategy: %s", encodingStrategy.getType());

View File

@ -86,12 +86,6 @@ public interface StringEncodingStrategy
{
public static final int DEFAULT_BUCKET_SIZE = 4;
@JsonProperty
public int getBucketSize()
{
return bucketSize;
}
@JsonProperty
private final int bucketSize;
@ -106,6 +100,12 @@ public interface StringEncodingStrategy
}
}
@JsonProperty
public int getBucketSize()
{
return bucketSize;
}
@Override
public String getType()
{

View File

@ -21,6 +21,7 @@ package org.apache.druid.segment.data;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import org.apache.druid.annotations.SuppressFBWarnings;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
@ -38,37 +39,39 @@ import java.util.NoSuchElementException;
* sorted and unique, using 'front coding'. Front coding is a type of delta encoding for byte arrays, where sorted
* values are grouped into buckets. The first value of the bucket is written entirely, and remaining values are stored
* as a pair of an integer which indicates how much of the first byte array of the bucket to use as a prefix, followed
* by the remaining bytes after the prefix to complete the value.
*
* by the remaining bytes after the prefix to complete the value. If using 'incremental' buckets, instead of using the
* prefix of the first bucket value, instead the prefix is computed against the immediately preceding value in the
* bucket.
* <p>
* front coded indexed layout:
* | version | bucket size | has null? | number of values | size of "offsets" + "buckets" | "offsets" | "buckets" |
* | ------- | ----------- | --------- | ---------------- | ----------------------------- | --------- | --------- |
* | byte | byte | byte | vbyte int | vbyte int | int[] | bucket[] |
*
* <p>
* "offsets" are the ending offsets of each bucket stored in order, stored as plain integers for easy random access.
*
* <p>
* bucket layout:
* | first value | prefix length | fragment | ... | prefix length | fragment |
* | ----------- | ------------- | -------- | --- | ------------- | -------- |
* | blob | vbyte int | blob | ... | vbyte int | blob |
*
* <p>
* blob layout:
* | blob length | blob bytes |
* | ----------- | ---------- |
* | vbyte int | byte[] |
*
*
* <p>
* <p>
* Getting a value first picks the appropriate bucket, finds its offset in the underlying buffer, then scans the bucket
* values to seek to the correct position of the value within the bucket in order to reconstruct it using the prefix
* length.
*
* <p>
* Finding the index of a value involves binary searching the first values of each bucket to find the correct bucket,
* then a linear scan within the bucket to find the matching value (or negative insertion point -1 for values that
* are not present).
*
* <p>
* The value iterator reads an entire bucket at a time, reconstructing the values into an array to iterate within the
* bucket before moving onto the next bucket as the iterator is consumed.
*
* <p>
* This class is not thread-safe since during operation modifies positions of a shared buffer.
*/
public final class FrontCodedIndexed implements Indexed<ByteBuffer>
@ -77,7 +80,7 @@ public final class FrontCodedIndexed implements Indexed<ByteBuffer>
{
final ByteBuffer orderedBuffer = buffer.asReadOnlyBuffer().order(ordering);
final byte version = orderedBuffer.get();
Preconditions.checkArgument(version == 0, "only V0 exists, encountered " + version);
Preconditions.checkArgument(version == 0 || version == 1, "only V0 and V1 exist, encountered " + version);
final int bucketSize = Byte.toUnsignedInt(orderedBuffer.get());
final boolean hasNull = NullHandling.IS_NULL_BYTE == orderedBuffer.get();
final int numValues = VByte.readInt(orderedBuffer);
@ -93,7 +96,8 @@ public final class FrontCodedIndexed implements Indexed<ByteBuffer>
bucketSize,
numValues,
hasNull,
offsetsPosition
offsetsPosition,
version
);
}
@ -101,6 +105,8 @@ public final class FrontCodedIndexed implements Indexed<ByteBuffer>
private final int adjustedNumValues;
private final int adjustIndex;
private final int bucketSize;
private final int[] unwindPrefixLength;
private final int[] unwindBufferPosition;
private final int numBuckets;
private final int div;
private final int rem;
@ -109,13 +115,19 @@ public final class FrontCodedIndexed implements Indexed<ByteBuffer>
private final boolean hasNull;
private final int lastBucketNumValues;
private final GetBucketValue getBucketValueFn;
private final ReadBucket readBucketFn;
private final FindInBucket findInBucketFn;
@SuppressFBWarnings(value = "NP_STORE_INTO_NONNULL_FIELD", justification = "V0 does not use unwindPrefixLength or unwindBufferPosition")
private FrontCodedIndexed(
ByteBuffer buffer,
ByteOrder order,
int bucketSize,
int numValues,
boolean hasNull,
int offsetsPosition
int offsetsPosition,
byte version
)
{
if (Integer.bitCount(bucketSize) != 1) {
@ -133,6 +145,23 @@ public final class FrontCodedIndexed implements Indexed<ByteBuffer>
this.lastBucketNumValues = (numValues & rem) == 0 ? bucketSize : numValues & rem;
this.offsetsPosition = offsetsPosition;
this.bucketsPosition = offsetsPosition + ((numBuckets - 1) * Integer.BYTES);
if (version == 0) {
// version zero, all prefixes are computed against the first value in the bucket
this.getBucketValueFn = FrontCodedIndexed::getFromBucketV0;
this.readBucketFn = FrontCodedIndexed::readBucketV0;
this.findInBucketFn = this::findValueInBucketV0;
//noinspection DataFlowIssue
this.unwindPrefixLength = null;
//noinspection DataFlowIssue
this.unwindBufferPosition = null;
} else {
// version one uses 'incremental' buckets, where the prefix is computed against the previous value
this.unwindPrefixLength = new int[bucketSize];
this.unwindBufferPosition = new int[bucketSize];
this.getBucketValueFn = this::getFromBucketV1;
this.readBucketFn = this::readBucketV1;
this.findInBucketFn = this::findValueInBucketV1;
}
}
@Override
@ -158,7 +187,7 @@ public final class FrontCodedIndexed implements Indexed<ByteBuffer>
final int bucketIndex = adjustedIndex & rem;
final int offset = getBucketOffset(bucket);
buffer.position(offset);
return getFromBucket(buffer, bucketIndex);
return getBucketValueFn.get(buffer, bucketIndex);
}
@Override
@ -216,7 +245,7 @@ public final class FrontCodedIndexed implements Indexed<ByteBuffer>
// find the value in the bucket (or where it would be if it were present)
buffer.position(firstOffset + firstLength);
return findValueInBucket(value, currBucketFirstValueIndex, bucketSize, sharedPrefix);
return findInBucketFn.find(value, currBucketFirstValueIndex, bucketSize, sharedPrefix);
} else if (comparison < 0) {
minBucketIndex = currentBucket + 1;
} else {
@ -251,12 +280,12 @@ public final class FrontCodedIndexed implements Indexed<ByteBuffer>
if (comparison > 0) {
// value preceedes bucket, so bail out
return -(bucketIndexBase + adjustIndex) - 1;
return ~(bucketIndexBase + adjustIndex);
}
buffer.position(firstOffset + firstLength);
return findValueInBucket(value, bucketIndexBase, numValuesInBucket, sharedPrefix);
return findInBucketFn.find(value, bucketIndexBase, numValuesInBucket, sharedPrefix);
}
@Override
@ -274,7 +303,7 @@ public final class FrontCodedIndexed implements Indexed<ByteBuffer>
}
ByteBuffer copy = buffer.asReadOnlyBuffer().order(buffer.order());
copy.position(bucketsPosition);
final ByteBuffer[] firstBucket = readBucket(copy, numBuckets > 1 ? bucketSize : lastBucketNumValues);
final ByteBuffer[] firstBucket = readBucketFn.readBucket(copy, numBuckets > 1 ? bucketSize : lastBucketNumValues);
// iterator decodes and buffers a bucket at a time, paging through buckets as the iterator is consumed
return new Iterator<ByteBuffer>()
{
@ -305,7 +334,7 @@ public final class FrontCodedIndexed implements Indexed<ByteBuffer>
if (bucketNum != currentBucketIndex) {
final int offset = copy.getInt(offsetsPosition + ((bucketNum - 1) * Integer.BYTES));
copy.position(bucketsPosition + offset);
currentBucket = readBucket(
currentBucket = readBucketFn.readBucket(
copy,
bucketNum < (numBuckets - 1) ? bucketSize : lastBucketNumValues
);
@ -345,7 +374,7 @@ public final class FrontCodedIndexed implements Indexed<ByteBuffer>
* MUST be prepared before calling, as it expects the length of the first value to have already been read externally,
* and the buffer position to be at the start of the first bucket value. The final buffer position will be the
* 'shared prefix length' of the first value in the bucket and the value to compare.
*
* <p>
* Bytes are compared using {@link StringUtils#compareUtf8UsingJavaStringOrdering(byte, byte)}. Therefore, when the
* values are UTF-8 encoded strings, the ordering is compatible with {@link String#compareTo(String)}.
*/
@ -372,17 +401,17 @@ public final class FrontCodedIndexed implements Indexed<ByteBuffer>
* Finds a value in a bucket among the fragments. The first value is assumed to have been already compared against
* and be smaller than the value we are looking for. This comparison is the source of the 'shared prefix', which is
* the length which the value has in common with the first value of the bucket.
*
* <p>
* This method uses this shared prefix length to skip more expensive byte by byte full value comparisons when
* possible by comparing the shared prefix length with the prefix length of the fragment. Since the bucket is always
* sorted, prefix lengths shrink as you progress to higher indexes, and we can use this to reason that a fragment
* with a longer prefix length than the shared prefix will always sort before the value we are looking for, and values
* which have a shorter prefix will always be greater than the value we are looking for, so we only need to do a
* full comparison if the prefix length is the same
*
* <p>
* this method modifies the position of {@link #buffer}
*/
private int findValueInBucket(
private int findValueInBucketV0(
ByteBuffer value,
int currBucketFirstValueIndex,
int bucketSize,
@ -396,6 +425,7 @@ public final class FrontCodedIndexed implements Indexed<ByteBuffer>
while (++relativePosition < bucketSize) {
prefixLength = VByte.readInt(buffer);
if (prefixLength > sharedPrefix) {
// this value shares more in common with the first value, so the value we are looking for comes after
final int skip = VByte.readInt(buffer);
buffer.position(buffer.position() + skip);
insertionPoint++;
@ -430,20 +460,20 @@ public final class FrontCodedIndexed implements Indexed<ByteBuffer>
}
}
// (-(insertion point) - 1)
return -(currBucketFirstValueIndex + adjustIndex) + (-(insertionPoint) - 1);
return -(currBucketFirstValueIndex + adjustIndex) + (~insertionPoint);
}
/**
* Get a value from a bucket at a relative position.
*
* <p>
* This method modifies the position of the buffer.
*/
static ByteBuffer getFromBucket(ByteBuffer buffer, int offset)
static ByteBuffer getFromBucketV0(ByteBuffer buffer, int offset)
{
int prefixPosition;
if (offset == 0) {
final int length = VByte.readInt(buffer);
final ByteBuffer firstValue = buffer.asReadOnlyBuffer().order(buffer.order());
final ByteBuffer firstValue = buffer.asReadOnlyBuffer();
firstValue.limit(firstValue.position() + length);
return firstValue;
} else {
@ -485,10 +515,10 @@ public final class FrontCodedIndexed implements Indexed<ByteBuffer>
/**
* Read an entire bucket from a {@link ByteBuffer}, returning an array of reconstructed value bytes.
*
* <p>
* This method modifies the position of the buffer.
*/
private static ByteBuffer[] readBucket(ByteBuffer bucket, int numValues)
private static ByteBuffer[] readBucketV0(ByteBuffer bucket, int numValues)
{
final int length = VByte.readInt(bucket);
final byte[] prefixBytes = new byte[length];
@ -509,4 +539,177 @@ public final class FrontCodedIndexed implements Indexed<ByteBuffer>
}
return bucketBuffers;
}
/**
* Finds a value in a bucket among the fragments. The first value is assumed to have been already compared against
* and be smaller than the value we are looking for. This comparison is the source of the 'shared prefix', which is
* the length which the value has in common with the previous value of the bucket.
* <p>
* This method uses this shared prefix length to skip more expensive byte by byte full value comparisons when
* possible by comparing the shared prefix length with the prefix length of the fragment. Since the bucket is always
* sorted, prefix lengths shrink as you progress to higher indexes, and we can use this to reason that a fragment
* with a longer prefix length than the shared prefix will always sort before the value we are looking for, and values
* which have a shorter prefix will always be greater than the value we are looking for, so we only need to do a
* full comparison if the prefix length is the same
* <p>
* this method modifies the position of {@link #buffer}
*/
private int findValueInBucketV1(
ByteBuffer value,
int currBucketFirstValueIndex,
int bucketSize,
int sharedPrefixLength
)
{
int relativePosition = 0;
int prefixLength;
// scan through bucket values until we find match or compare numValues
int insertionPoint = 1;
while (++relativePosition < bucketSize) {
prefixLength = VByte.readInt(buffer);
if (prefixLength > sharedPrefixLength) {
// bucket value shares more in common with the preceding value, so the value we are looking for comes after
final int skip = VByte.readInt(buffer);
buffer.position(buffer.position() + skip);
insertionPoint++;
} else if (prefixLength < sharedPrefixLength) {
// bucket value prefix is smaller, that means the value we are looking for sorts ahead of it
break;
} else {
// value has the same shared prefix, so compare additional values to find
final int fragmentLength = VByte.readInt(buffer);
final int common = Math.min(fragmentLength, value.remaining() - prefixLength);
int fragmentComparison = 0;
boolean shortCircuit = false;
for (int i = 0; i < common; i++) {
fragmentComparison = StringUtils.compareUtf8UsingJavaStringOrdering(
buffer.get(buffer.position() + i),
value.get(prefixLength + i)
);
if (fragmentComparison != 0) {
sharedPrefixLength = prefixLength + i;
shortCircuit = true;
break;
}
}
if (fragmentComparison == 0) {
fragmentComparison = Integer.compare(prefixLength + fragmentLength, value.remaining());
}
if (fragmentComparison == 0) {
return (currBucketFirstValueIndex + adjustIndex) + relativePosition;
} else if (fragmentComparison < 0) {
// value we are looking for is longer than the current bucket value, continue on
if (!shortCircuit) {
sharedPrefixLength = prefixLength + common;
}
buffer.position(buffer.position() + fragmentLength);
insertionPoint++;
} else {
break;
}
}
}
// (-(insertion point) - 1)
return -(currBucketFirstValueIndex + adjustIndex) + (~insertionPoint);
}
private ByteBuffer getFromBucketV1(ByteBuffer buffer, int offset)
{
// first value is written whole
final int length = VByte.readInt(buffer);
if (offset == 0) {
// return first value directly from underlying buffer since it is stored whole
final ByteBuffer value = buffer.asReadOnlyBuffer();
value.limit(value.position() + length);
return value;
}
int pos = 0;
int prefixLength;
int fragmentLength;
unwindPrefixLength[pos] = 0;
unwindBufferPosition[pos] = buffer.position();
buffer.position(buffer.position() + length);
do {
prefixLength = VByte.readInt(buffer);
if (++pos < offset) {
// not there yet, no need to read anything other than the length to skip ahead
final int skipLength = VByte.readInt(buffer);
unwindPrefixLength[pos] = prefixLength;
unwindBufferPosition[pos] = buffer.position();
buffer.position(buffer.position() + skipLength);
} else {
// we've reached our destination
fragmentLength = VByte.readInt(buffer);
if (prefixLength == 0) {
// no prefix, return it directly from the underlying buffer
final ByteBuffer value = buffer.asReadOnlyBuffer();
value.limit(value.position() + fragmentLength);
return value;
}
break;
}
} while (true);
final int valueLength = prefixLength + fragmentLength;
final byte[] valueBytes = new byte[valueLength];
buffer.get(valueBytes, prefixLength, fragmentLength);
for (int i = prefixLength; i > 0;) {
// previous value had a larger prefix than or the same as the value we are looking for
// skip it since the fragment doesn't have anything we need
if (unwindPrefixLength[--pos] >= i) {
continue;
}
buffer.position(unwindBufferPosition[pos]);
buffer.get(valueBytes, unwindPrefixLength[pos], i - unwindPrefixLength[pos]);
i = unwindPrefixLength[pos];
}
return ByteBuffer.wrap(valueBytes);
}
/**
* Read an entire bucket from a {@link ByteBuffer}, returning an array of reconstructed value bytes.
* <p>
* This method modifies the position of the buffer.
*/
private ByteBuffer[] readBucketV1(ByteBuffer bucket, int numValues)
{
final ByteBuffer[] bucketBuffers = new ByteBuffer[numValues];
// first value is written whole
final int length = VByte.readInt(bucket);
byte[] prefixBytes = new byte[length];
bucket.get(prefixBytes, 0, length);
bucketBuffers[0] = ByteBuffer.wrap(prefixBytes);
int pos = 1;
while (pos < numValues) {
final int prefixLength = VByte.readInt(bucket);
final int fragmentLength = VByte.readInt(bucket);
byte[] nextValueBytes = new byte[prefixLength + fragmentLength];
System.arraycopy(prefixBytes, 0, nextValueBytes, 0, prefixLength);
bucket.get(nextValueBytes, prefixLength, fragmentLength);
final ByteBuffer value = ByteBuffer.wrap(nextValueBytes);
prefixBytes = nextValueBytes;
bucketBuffers[pos++] = value;
}
return bucketBuffers;
}
@FunctionalInterface
interface GetBucketValue
{
ByteBuffer get(ByteBuffer buffer, int offset);
}
@FunctionalInterface
interface ReadBucket
{
ByteBuffer[] readBucket(ByteBuffer buffer, int bucketSize);
}
@FunctionalInterface
interface FindInBucket
{
int find(ByteBuffer value, int currBucketFirstValueIndex, int bucketSize, int sharedPrefixLength);
}
}

View File

@ -42,7 +42,8 @@ import java.nio.channels.WritableByteChannel;
*
* Front coding is a type of delta encoding for byte arrays, where values are grouped into buckets. The first value of
* the bucket is written entirely, and remaining values are stored as pairs of an integer which indicates how much
* of the first byte array of the bucket to use as a prefix, followed by the remaining value bytes after the prefix.
* of the first byte array of the bucket to use as a prefix, (or the preceding value of the bucket if using
* 'incremental' buckets) followed by the remaining value bytes after the prefix.
*
* This writer is designed for use with UTF-8 encoded strings that are written in an order compatible with
* {@link String#compareTo(String)}.
@ -58,6 +59,7 @@ public class FrontCodedIndexedWriter implements DictionaryWriter<byte[]>
private final byte[][] bucketBuffer;
private final ByteBuffer getOffsetBuffer;
private final int div;
private final boolean useIncrementalBuckets;
@Nullable
private byte[] prevObject = null;
@ -71,10 +73,12 @@ public class FrontCodedIndexedWriter implements DictionaryWriter<byte[]>
private boolean isClosed = false;
private boolean hasNulls = false;
public FrontCodedIndexedWriter(
SegmentWriteOutMedium segmentWriteOutMedium,
ByteOrder byteOrder,
int bucketSize
int bucketSize,
boolean useIncrementalBuckets
)
{
if (Integer.bitCount(bucketSize) != 1 || bucketSize < 1 || bucketSize > 128) {
@ -87,6 +91,7 @@ public class FrontCodedIndexedWriter implements DictionaryWriter<byte[]>
this.bucketBuffer = new byte[bucketSize][];
this.getOffsetBuffer = ByteBuffer.allocate(Integer.BYTES).order(byteOrder);
this.div = Integer.numberOfTrailingZeros(bucketSize);
this.useIncrementalBuckets = useIncrementalBuckets;
}
@Override
@ -119,7 +124,9 @@ public class FrontCodedIndexedWriter implements DictionaryWriter<byte[]>
int written;
// write the bucket, growing scratch buffer as necessary
do {
written = writeBucket(scratch, bucketBuffer, bucketSize);
written = useIncrementalBuckets
? writeIncrementalBucket(scratch, bucketBuffer, bucketSize)
: writeBucket(scratch, bucketBuffer, bucketSize);
if (written < 0) {
growScratch();
}
@ -163,8 +170,14 @@ public class FrontCodedIndexedWriter implements DictionaryWriter<byte[]>
flush();
}
resetScratch();
// version 0
scratch.put((byte) 0);
if (useIncrementalBuckets) {
// version 1 is incremental buckets
scratch.put((byte) 1);
} else {
// version 0 all values are prefixed on first bucket value
scratch.put((byte) 0);
}
scratch.put((byte) bucketSize);
scratch.put(hasNulls ? NullHandling.IS_NULL_BYTE : NullHandling.IS_NOT_NULL_BYTE);
VByte.writeInt(scratch, numWritten);
@ -202,14 +215,16 @@ public class FrontCodedIndexedWriter implements DictionaryWriter<byte[]>
startOffset = getBucketOffset(bucket - 1);
}
long endOffset = getBucketOffset(bucket);
int bucketSize = Ints.checkedCast(endOffset - startOffset);
if (bucketSize == 0) {
int bucketBytesSize = Ints.checkedCast(endOffset - startOffset);
if (bucketBytesSize == 0) {
return null;
}
final ByteBuffer bucketBuffer = ByteBuffer.allocate(bucketSize).order(byteOrder);
final ByteBuffer bucketBuffer = ByteBuffer.allocate(bucketBytesSize).order(byteOrder);
valuesOut.readFully(startOffset, bucketBuffer);
bucketBuffer.clear();
final ByteBuffer valueBuffer = FrontCodedIndexed.getFromBucket(bucketBuffer, relativeIndex);
final ByteBuffer valueBuffer = useIncrementalBuckets
? getFromBucketV1(bucketBuffer, relativeIndex, bucketSize)
: FrontCodedIndexed.getFromBucketV0(bucketBuffer, relativeIndex);
final byte[] valueBytes = new byte[valueBuffer.limit() - valueBuffer.position()];
valueBuffer.get(valueBytes);
return valueBytes;
@ -232,7 +247,10 @@ public class FrontCodedIndexedWriter implements DictionaryWriter<byte[]>
resetScratch();
int written;
do {
written = writeBucket(scratch, bucketBuffer, remainder == 0 ? bucketSize : remainder);
int flushSize = remainder == 0 ? bucketSize : remainder;
written = useIncrementalBuckets
? writeIncrementalBucket(scratch, bucketBuffer, flushSize)
: writeBucket(scratch, bucketBuffer, flushSize);
if (written < 0) {
growScratch();
}
@ -308,6 +326,57 @@ public class FrontCodedIndexedWriter implements DictionaryWriter<byte[]>
return written;
}
/**
* Write bucket of values to a {@link ByteBuffer}. The first value is written completely, subsequent values are
* written with an integer to indicate how much of the preceding value in the bucket is a prefix of the value,
* followed by the remaining bytes of the value.
*
* Uses {@link VByte} encoded integers to indicate prefix length and value length.
*/
public static int writeIncrementalBucket(ByteBuffer buffer, byte[][] values, int numValues)
{
int written = 0;
byte[] prev = null;
while (written < numValues) {
byte[] next = values[written];
if (written == 0) {
prev = next;
// the first value in the bucket is written completely as it is
int rem = writeValue(buffer, prev);
// wasn't enough room, bail out
if (rem < 0) {
return rem;
}
} else {
// all other values must be partitioned into a prefix length and suffix bytes
int prefixLength = 0;
for (; prefixLength < prev.length; prefixLength++) {
final int cmp = StringUtils.compareUtf8UsingJavaStringOrdering(prev[prefixLength], next[prefixLength]);
if (cmp != 0) {
break;
}
}
// convert to bytes because not every char is a single byte
final byte[] suffix = new byte[next.length - prefixLength];
System.arraycopy(next, prefixLength, suffix, 0, suffix.length);
int rem = buffer.remaining() - VByte.computeIntSize(prefixLength);
// wasn't enough room, bail out
if (rem < 0) {
return rem;
}
VByte.writeInt(buffer, prefixLength);
rem = writeValue(buffer, suffix);
prev = next;
// wasn't enough room, bail out
if (rem < 0) {
return rem;
}
}
written++;
}
return written;
}
/**
* Write a variable length byte[] value to a {@link ByteBuffer}, storing the length as a {@link VByte} encoded
* integer followed by the value itself. Returns the number of bytes written to the buffer. This method returns a
@ -344,4 +413,71 @@ public class FrontCodedIndexedWriter implements DictionaryWriter<byte[]>
return StringUtils.compareUtf8UsingJavaStringOrdering(b1, b2);
}
/**
* same as {@link FrontCodedIndexed#getFromBucketV1(ByteBuffer, int)} but without re-using prefixLength and buffer position
* arrays so has more overhead/garbage creation than the instance method.
*
* Note: adding the unwindPrefixLength and unwindBufferPosition arrays as arguments and having
* {@link FrontCodedIndexed#getFromBucketV1(ByteBuffer, int)} call this static method added 5-10ns of overhead
* compared to having its own copy of the code, presumably due to the overhead of an additional method call and extra
* arguments.
*
* As such, since the writer is the only user of this method, it has been copied here...
*/
static ByteBuffer getFromBucketV1(ByteBuffer buffer, int offset, int bucketSize)
{
final int[] unwindPrefixLength = new int[bucketSize];
final int[] unwindBufferPosition = new int[bucketSize];
// first value is written whole
final int length = VByte.readInt(buffer);
if (offset == 0) {
// return first value directly from underlying buffer since it is stored whole
final ByteBuffer value = buffer.asReadOnlyBuffer();
value.limit(value.position() + length);
return value;
}
int pos = 0;
int prefixLength;
int fragmentLength;
unwindPrefixLength[pos] = 0;
unwindBufferPosition[pos] = buffer.position();
buffer.position(buffer.position() + length);
do {
prefixLength = VByte.readInt(buffer);
if (++pos < offset) {
// not there yet, no need to read anything other than the length to skip ahead
final int skipLength = VByte.readInt(buffer);
unwindPrefixLength[pos] = prefixLength;
unwindBufferPosition[pos] = buffer.position();
buffer.position(buffer.position() + skipLength);
} else {
// we've reached our destination
fragmentLength = VByte.readInt(buffer);
if (prefixLength == 0) {
// no prefix, return it directly from the underlying buffer
final ByteBuffer value = buffer.asReadOnlyBuffer();
value.limit(value.position() + fragmentLength);
return value;
}
break;
}
} while (true);
final int valueLength = prefixLength + fragmentLength;
final byte[] valueBytes = new byte[valueLength];
buffer.get(valueBytes, prefixLength, fragmentLength);
for (int i = prefixLength; i > 0;) {
// previous value had a larger prefix than or the same as the value we are looking for
// skip it since the fragment doesn't have anything we need
if (unwindPrefixLength[--pos] >= i) {
continue;
}
buffer.position(unwindBufferPosition[pos]);
buffer.get(valueBytes, unwindPrefixLength[pos], i - unwindPrefixLength[pos]);
i = unwindPrefixLength[pos];
}
return ByteBuffer.wrap(valueBytes);
}
}

View File

@ -43,17 +43,24 @@ import java.util.TreeSet;
@RunWith(Parameterized.class)
public class FrontCodedIndexedTest extends InitializedNullHandlingTest
{
@Parameterized.Parameters(name = "{0}")
@Parameterized.Parameters(name = "byteOrder: {0} useIncrementalBuckets: {1}")
public static Collection<Object[]> constructorFeeder()
{
return ImmutableList.of(new Object[]{ByteOrder.LITTLE_ENDIAN}, new Object[]{ByteOrder.BIG_ENDIAN});
return ImmutableList.of(
new Object[]{ByteOrder.LITTLE_ENDIAN, true},
new Object[]{ByteOrder.LITTLE_ENDIAN, false},
new Object[]{ByteOrder.BIG_ENDIAN, true},
new Object[]{ByteOrder.BIG_ENDIAN, false}
);
}
private final ByteOrder order;
private final boolean useIncrementalBuckets;
public FrontCodedIndexedTest(ByteOrder byteOrder)
public FrontCodedIndexedTest(ByteOrder byteOrder, boolean useIncrementalBuckets)
{
this.order = byteOrder;
this.useIncrementalBuckets = useIncrementalBuckets;
}
@Test
@ -61,7 +68,7 @@ public class FrontCodedIndexedTest extends InitializedNullHandlingTest
{
ByteBuffer buffer = ByteBuffer.allocate(1 << 12).order(order);
List<String> theList = ImmutableList.of("hello", "helloo", "hellooo", "hellooz", "helloozy");
fillBuffer(buffer, theList, 4);
fillBuffer(buffer, theList, 4, useIncrementalBuckets);
buffer.position(0);
FrontCodedIndexed codedUtf8Indexed = FrontCodedIndexed.read(
@ -92,13 +99,16 @@ public class FrontCodedIndexedTest extends InitializedNullHandlingTest
{
ByteBuffer buffer = ByteBuffer.allocate(1 << 12).order(order);
List<String> theList = ImmutableList.of("hello", "helloo", "hellooo", "hellooz", "helloozy");
fillBuffer(buffer, theList, 16);
fillBuffer(buffer, theList, 16, useIncrementalBuckets);
FrontCodedIndexed codedUtf8Indexed = FrontCodedIndexed.read(
buffer,
buffer.order()
).get();
Assert.assertEquals("hello", StringUtils.fromUtf8(codedUtf8Indexed.get(0)));
Assert.assertEquals("helloo", StringUtils.fromUtf8(codedUtf8Indexed.get(1)));
Assert.assertEquals("hellooo", StringUtils.fromUtf8(codedUtf8Indexed.get(2)));
Assert.assertEquals("hellooz", StringUtils.fromUtf8(codedUtf8Indexed.get(3)));
Assert.assertEquals("helloozy", StringUtils.fromUtf8(codedUtf8Indexed.get(4)));
Iterator<String> newListIterator = theList.iterator();
@ -127,7 +137,7 @@ public class FrontCodedIndexedTest extends InitializedNullHandlingTest
for (int i = 0; i < sizeBase + sizeAdjust; i++) {
values.add(IdUtils.getRandomId() + IdUtils.getRandomId() + IdUtils.getRandomId() + IdUtils.getRandomId());
}
fillBuffer(buffer, values, bucketSize);
fillBuffer(buffer, values, bucketSize, useIncrementalBuckets);
FrontCodedIndexed codedUtf8Indexed = FrontCodedIndexed.read(
buffer,
@ -163,7 +173,7 @@ public class FrontCodedIndexedTest extends InitializedNullHandlingTest
for (int i = 0; i < sizeBase + sizeAdjust; i++) {
values.add(IdUtils.getRandomId() + IdUtils.getRandomId() + IdUtils.getRandomId() + IdUtils.getRandomId());
}
fillBuffer(buffer, values, bucketSize);
fillBuffer(buffer, values, bucketSize, useIncrementalBuckets);
FrontCodedIndexed codedUtf8Indexed = FrontCodedIndexed.read(
buffer,
@ -197,7 +207,7 @@ public class FrontCodedIndexedTest extends InitializedNullHandlingTest
ByteBuffer buffer = ByteBuffer.allocate(1 << 12).order(order);
List<String> theList = ImmutableList.of("hello", "helloo", "hellooo", "hellooz", "helloozy");
fillBuffer(buffer, theList, 4);
fillBuffer(buffer, theList, 4, useIncrementalBuckets);
FrontCodedIndexed codedUtf8Indexed = FrontCodedIndexed.read(
buffer,
@ -221,7 +231,7 @@ public class FrontCodedIndexedTest extends InitializedNullHandlingTest
TreeSet<String> values = new TreeSet<>(GenericIndexed.STRING_STRATEGY);
values.add(null);
values.addAll(theList);
fillBuffer(buffer, values, 4);
fillBuffer(buffer, values, 4, useIncrementalBuckets);
FrontCodedIndexed codedUtf8Indexed = FrontCodedIndexed.read(
buffer,
@ -244,7 +254,7 @@ public class FrontCodedIndexedTest extends InitializedNullHandlingTest
// "\uD83D\uDCA9" and "(請參見已被刪除版本)" are a regression test for https://github.com/apache/druid/pull/13364
List<String> theList = ImmutableList.of("Győ-Moson-Sopron", "Győr", "\uD83D\uDCA9", "(請參見已被刪除版本)");
fillBuffer(buffer, theList, 4);
fillBuffer(buffer, theList, 4, useIncrementalBuckets);
buffer.position(0);
FrontCodedIndexed codedUtf8Indexed = FrontCodedIndexed.read(
@ -260,7 +270,7 @@ public class FrontCodedIndexedTest extends InitializedNullHandlingTest
final ByteBuffer nextUtf8 = utf8Iterator.next();
Assert.assertEquals(next, StringUtils.fromUtf8(nextUtf8));
nextUtf8.position(0);
Assert.assertEquals(next, StringUtils.fromUtf8(codedUtf8Indexed.get(ctr)));
Assert.assertEquals("mismatch row " + ctr, next, StringUtils.fromUtf8(codedUtf8Indexed.get(ctr)));
Assert.assertEquals(ctr, codedUtf8Indexed.indexOf(nextUtf8));
ctr++;
}
@ -272,7 +282,7 @@ public class FrontCodedIndexedTest extends InitializedNullHandlingTest
{
ByteBuffer buffer = ByteBuffer.allocate(1 << 12).order(order);
List<String> theList = Collections.singletonList(null);
fillBuffer(buffer, theList, 4);
fillBuffer(buffer, theList, 4, useIncrementalBuckets);
buffer.position(0);
FrontCodedIndexed codedUtf8Indexed = FrontCodedIndexed.read(
@ -315,7 +325,7 @@ public class FrontCodedIndexedTest extends InitializedNullHandlingTest
values.add(IdUtils.getRandomId() + IdUtils.getRandomId() + IdUtils.getRandomId() + IdUtils.getRandomId());
}
for (int bucketSize : bucketSizes) {
fillBuffer(buffer, values, bucketSize);
fillBuffer(buffer, values, bucketSize, useIncrementalBuckets);
FrontCodedIndexed codedUtf8Indexed = FrontCodedIndexed.read(
buffer,
buffer.order()
@ -352,7 +362,8 @@ public class FrontCodedIndexedTest extends InitializedNullHandlingTest
() -> new FrontCodedIndexedWriter(
medium,
ByteOrder.nativeOrder(),
0
0,
useIncrementalBuckets
)
);
@ -361,7 +372,8 @@ public class FrontCodedIndexedTest extends InitializedNullHandlingTest
() -> new FrontCodedIndexedWriter(
medium,
ByteOrder.nativeOrder(),
15
15,
useIncrementalBuckets
)
);
@ -370,20 +382,23 @@ public class FrontCodedIndexedTest extends InitializedNullHandlingTest
() -> new FrontCodedIndexedWriter(
medium,
ByteOrder.nativeOrder(),
256
256,
useIncrementalBuckets
)
);
}
private static long fillBuffer(ByteBuffer buffer, Iterable<String> sortedIterable, int bucketSize) throws IOException
private static long fillBuffer(ByteBuffer buffer, Iterable<String> sortedIterable, int bucketSize, boolean useIncrementalBuckets) throws IOException
{
Iterator<String> sortedStrings = sortedIterable.iterator();
buffer.position(0);
OnHeapMemorySegmentWriteOutMedium medium = new OnHeapMemorySegmentWriteOutMedium();
FrontCodedIndexedWriter writer = new FrontCodedIndexedWriter(
DictionaryWriter<byte[]> writer;
writer = new FrontCodedIndexedWriter(
medium,
buffer.order(),
bucketSize
bucketSize,
useIncrementalBuckets
);
writer.open();
int index = 0;