refactor front-coded into static classes instead of using functional interfaces (#14572)

* refactor front-coded into static classes instead of using functional interfaces

* shared v0 static method instead of copy
This commit is contained in:
Clint Wylie 2023-08-04 10:52:36 -07:00 committed by GitHub
parent d6c73ca6e5
commit e5661a394c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 353 additions and 365 deletions

View File

@ -88,10 +88,10 @@ public class FrontCodedIndexedBenchmark
@Param({
"generic",
"front-coded-4",
"front-coded-16",
"front-coded-incremental-buckets-4",
"front-coded-incremental-buckets-16"
"front-coded-v0-4",
"front-coded-v0-16",
"front-coded-v1-4",
"front-coded-v1-16"
})
public String indexType;
@ -138,7 +138,7 @@ public class FrontCodedIndexedBenchmark
FrontCodedIndexedWriter frontCodedIndexedWriter = new FrontCodedIndexedWriter(
new OnHeapMemorySegmentWriteOutMedium(),
ByteOrder.nativeOrder(),
"front-coded-4".equals(indexType) ? 4 : 16,
"front-coded-v0-4".equals(indexType) ? 4 : 16,
FrontCodedIndexed.V0
);
frontCodedIndexedWriter.open();
@ -146,7 +146,7 @@ public class FrontCodedIndexedBenchmark
FrontCodedIndexedWriter frontCodedIndexedWriterIncrementalBuckets = new FrontCodedIndexedWriter(
new OnHeapMemorySegmentWriteOutMedium(),
ByteOrder.nativeOrder(),
"front-coded-incremental-buckets-4".equals(indexType) ? 4 : 16,
"front-coded-v1-4".equals(indexType) ? 4 : 16,
FrontCodedIndexed.V1
);
frontCodedIndexedWriterIncrementalBuckets.open();
@ -166,11 +166,11 @@ public class FrontCodedIndexedBenchmark
fileGeneric = File.createTempFile("genericIndexedBenchmark", "meta");
smooshDirFrontCodedIncrementalBuckets = FileUtils.createTempDir();
fileFrontCodedIncrementalBuckets = File.createTempFile("frontCodedIndexedBenchmarkIncrementalBuckets", "meta");
fileFrontCodedIncrementalBuckets = File.createTempFile("frontCodedIndexedBenchmarkv1Buckets", "meta");
EncodingSizeProfiler.encodedSize = (int) ("generic".equals(indexType)
? genericIndexedWriter.getSerializedSize()
: indexType.startsWith("front-coded-incremental-buckets")
: indexType.startsWith("front-coded-v1")
? frontCodedIndexedWriterIncrementalBuckets.getSerializedSize()
: frontCodedIndexedWriter.getSerializedSize());
try (
@ -286,7 +286,7 @@ public class FrontCodedIndexedBenchmark
}
if ("generic".equals(indexType)) {
indexed = genericIndexed.singleThreaded();
} else if (indexType.startsWith("front-coded-incremental-buckets")) {
} else if (indexType.startsWith("front-coded-v1")) {
indexed = frontCodedIndexedIncrementalBuckets;
} else {
indexed = frontCodedIndexed;

View File

@ -21,7 +21,6 @@ package org.apache.druid.segment.data;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import org.apache.druid.annotations.SuppressFBWarnings;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
@ -75,7 +74,7 @@ import java.util.NoSuchElementException;
* <p>
* This class is not thread-safe since during operation modifies positions of a shared buffer.
*/
public final class FrontCodedIndexed implements Indexed<ByteBuffer>
public abstract class FrontCodedIndexed implements Indexed<ByteBuffer>
{
public static final byte V0 = 0;
public static final byte V1 = 1;
@ -94,6 +93,7 @@ public final class FrontCodedIndexed implements Indexed<ByteBuffer>
}
return version;
}
public static Supplier<FrontCodedIndexed> read(ByteBuffer buffer, ByteOrder ordering)
{
final ByteBuffer orderedBuffer = buffer.asReadOnlyBuffer().order(ordering);
@ -108,44 +108,46 @@ public final class FrontCodedIndexed implements Indexed<ByteBuffer>
// move position to end of buffer
buffer.position(offsetsPosition + size);
return () -> new FrontCodedIndexed(
buffer,
ordering,
bucketSize,
numValues,
hasNull,
offsetsPosition,
version
);
if (version == V0) {
return () -> new FrontCodedV0(
buffer,
ordering,
bucketSize,
numValues,
hasNull,
offsetsPosition
);
} else {
return () -> new FrontCodedV1(
buffer,
ordering,
bucketSize,
numValues,
hasNull,
offsetsPosition
);
}
}
private final ByteBuffer buffer;
private final int adjustedNumValues;
private final int adjustIndex;
private final int bucketSize;
private final int[] unwindPrefixLength;
private final int[] unwindBufferPosition;
private final int numBuckets;
private final int div;
private final int rem;
private final int offsetsPosition;
private final int bucketsPosition;
private final boolean hasNull;
private final int lastBucketNumValues;
protected final ByteBuffer buffer;
protected final int adjustedNumValues;
protected final int adjustIndex;
protected final int bucketSize;
protected final int numBuckets;
protected final int div;
protected final int rem;
protected final int offsetsPosition;
protected final int bucketsPosition;
protected final boolean hasNull;
protected final int lastBucketNumValues;
private final GetBucketValue getBucketValueFn;
private final ReadBucket readBucketFn;
private final FindInBucket findInBucketFn;
@SuppressFBWarnings(value = "NP_STORE_INTO_NONNULL_FIELD", justification = "V0 does not use unwindPrefixLength or unwindBufferPosition")
private FrontCodedIndexed(
ByteBuffer buffer,
ByteOrder order,
int bucketSize,
int numValues,
boolean hasNull,
int offsetsPosition,
byte version
int offsetsPosition
)
{
if (Integer.bitCount(bucketSize) != 1) {
@ -163,25 +165,38 @@ public final class FrontCodedIndexed implements Indexed<ByteBuffer>
this.lastBucketNumValues = (numValues & rem) == 0 ? bucketSize : numValues & rem;
this.offsetsPosition = offsetsPosition;
this.bucketsPosition = offsetsPosition + ((numBuckets - 1) * Integer.BYTES);
if (version == 0) {
// version zero, all prefixes are computed against the first value in the bucket
this.getBucketValueFn = FrontCodedIndexed::getFromBucketV0;
this.readBucketFn = FrontCodedIndexed::readBucketV0;
this.findInBucketFn = this::findValueInBucketV0;
//noinspection DataFlowIssue
this.unwindPrefixLength = null;
//noinspection DataFlowIssue
this.unwindBufferPosition = null;
} else {
// version one uses 'incremental' buckets, where the prefix is computed against the previous value
this.unwindPrefixLength = new int[bucketSize];
this.unwindBufferPosition = new int[bucketSize];
this.getBucketValueFn = this::getFromBucketV1;
this.readBucketFn = this::readBucketV1;
this.findInBucketFn = this::findValueInBucketV1;
}
}
/**
* Get a value from a bucket at a relative position.
* <p>
* This method modifies the position of the buffer.
*/
abstract ByteBuffer getFromBucket(ByteBuffer buffer, int offset);
/**
* Read an entire bucket from a {@link ByteBuffer}, returning an array of reconstructed value bytes.
* <p>
* This method modifies the position of the buffer.
*/
abstract ByteBuffer[] readBucket(ByteBuffer buffer, int numValues);
/**
* Finds a value in a bucket among the fragments. The first value is assumed to have been already compared against
* and be smaller than the value we are looking for. This comparison is the source of the 'shared prefix', which is
* the length which the value has in common with the previous value of the bucket.
* <p>
* This method uses this shared prefix length to skip more expensive byte by byte full value comparisons when
* possible by comparing the shared prefix length with the prefix length of the fragment. Since the bucket is always
* sorted, prefix lengths shrink as you progress to higher indexes, and we can use this to reason that a fragment
* with a longer prefix length than the shared prefix will always sort before the value we are looking for, and values
* which have a shorter prefix will always be greater than the value we are looking for, so we only need to do a
* full comparison if the prefix length is the same
* <p>
* this method modifies the position of {@link #buffer}
*/
abstract int findInBucket(ByteBuffer value, int currBucketFirstValueIndex, int bucketSize, int sharedPrefixLength);
@Override
public int size()
{
@ -206,7 +221,7 @@ public final class FrontCodedIndexed implements Indexed<ByteBuffer>
final int bucketIndex = adjustedIndex & rem;
final int offset = getBucketOffset(bucket);
buffer.position(offset);
return getBucketValueFn.get(buffer, bucketIndex);
return getFromBucket(buffer, bucketIndex);
}
@Override
@ -268,7 +283,7 @@ public final class FrontCodedIndexed implements Indexed<ByteBuffer>
// find the value in the bucket (or where it would be if it were present)
buffer.position(firstOffset + firstLength);
return findInBucketFn.find(value, currBucketFirstValueIndex, bucketSize, sharedPrefix);
return findInBucket(value, currBucketFirstValueIndex, bucketSize, sharedPrefix);
} else if (comparison < 0) {
minBucketIndex = currentBucket + 1;
} else {
@ -308,7 +323,7 @@ public final class FrontCodedIndexed implements Indexed<ByteBuffer>
buffer.position(firstOffset + firstLength);
return findInBucketFn.find(value, bucketIndexBase, numValuesInBucket, sharedPrefix);
return findInBucket(value, bucketIndexBase, numValuesInBucket, sharedPrefix);
}
@Override
@ -329,7 +344,7 @@ public final class FrontCodedIndexed implements Indexed<ByteBuffer>
}
ByteBuffer copy = buffer.asReadOnlyBuffer().order(buffer.order());
copy.position(bucketsPosition);
final ByteBuffer[] firstBucket = readBucketFn.readBucket(copy, numBuckets > 1 ? bucketSize : lastBucketNumValues);
final ByteBuffer[] firstBucket = readBucket(copy, numBuckets > 1 ? bucketSize : lastBucketNumValues);
// iterator decodes and buffers a bucket at a time, paging through buckets as the iterator is consumed
return new Iterator<ByteBuffer>()
{
@ -360,7 +375,7 @@ public final class FrontCodedIndexed implements Indexed<ByteBuffer>
if (bucketNum != currentBucketIndex) {
final int offset = copy.getInt(offsetsPosition + ((bucketNum - 1) * Integer.BYTES));
copy.position(bucketsPosition + offset);
currentBucket = readBucketFn.readBucket(
currentBucket = readBucket(
copy,
bucketNum < (numBuckets - 1) ? bucketSize : lastBucketNumValues
);
@ -394,7 +409,6 @@ public final class FrontCodedIndexed implements Indexed<ByteBuffer>
return bucketsPosition + (bucket > 0 ? buffer.getInt(offsetsPosition + ((bucket - 1) * Integer.BYTES)) : 0);
}
/**
* Performs byte-by-byte comparison of the first value in a bucket with the specified value. Note that this method
* MUST be prepared before calling, as it expects the length of the first value to have already been read externally,
@ -423,319 +437,293 @@ public final class FrontCodedIndexed implements Indexed<ByteBuffer>
return comparison;
}
/**
* Finds a value in a bucket among the fragments. The first value is assumed to have been already compared against
* and be smaller than the value we are looking for. This comparison is the source of the 'shared prefix', which is
* the length which the value has in common with the first value of the bucket.
* <p>
* This method uses this shared prefix length to skip more expensive byte by byte full value comparisons when
* possible by comparing the shared prefix length with the prefix length of the fragment. Since the bucket is always
* sorted, prefix lengths shrink as you progress to higher indexes, and we can use this to reason that a fragment
* with a longer prefix length than the shared prefix will always sort before the value we are looking for, and values
* which have a shorter prefix will always be greater than the value we are looking for, so we only need to do a
* full comparison if the prefix length is the same
* <p>
* this method modifies the position of {@link #buffer}
*/
private int findValueInBucketV0(
ByteBuffer value,
int currBucketFirstValueIndex,
int bucketSize,
int sharedPrefix
)
public static final class FrontCodedV0 extends FrontCodedIndexed
{
int relativePosition = 0;
int prefixLength;
// scan through bucket values until we find match or compare numValues
int insertionPoint = 1;
while (++relativePosition < bucketSize) {
prefixLength = VByte.readInt(buffer);
if (prefixLength > sharedPrefix) {
// this value shares more in common with the first value, so the value we are looking for comes after
final int skip = VByte.readInt(buffer);
buffer.position(buffer.position() + skip);
insertionPoint++;
} else if (prefixLength < sharedPrefix) {
// prefix is smaller, that means this value sorts ahead of it
break;
} else {
final int fragmentLength = VByte.readInt(buffer);
final int common = Math.min(fragmentLength, value.remaining() - prefixLength);
int fragmentComparison = 0;
for (int i = 0; i < common; i++) {
fragmentComparison = StringUtils.compareUtf8UsingJavaStringOrdering(
buffer.get(buffer.position() + i),
value.get(prefixLength + i)
);
if (fragmentComparison != 0) {
break;
}
}
if (fragmentComparison == 0) {
fragmentComparison = Integer.compare(prefixLength + fragmentLength, value.remaining());
}
private FrontCodedV0(
ByteBuffer buffer,
ByteOrder order,
int bucketSize,
int numValues,
boolean hasNull,
int offsetsPosition
)
{
super(buffer, order, bucketSize, numValues, hasNull, offsetsPosition);
}
if (fragmentComparison == 0) {
return (currBucketFirstValueIndex + adjustIndex) + relativePosition;
} else if (fragmentComparison < 0) {
buffer.position(buffer.position() + fragmentLength);
insertionPoint++;
@Override
ByteBuffer getFromBucket(ByteBuffer buffer, int offset)
{
return getValueFromBucket(buffer, offset);
}
public static ByteBuffer getValueFromBucket(ByteBuffer buffer, int offset)
{
int prefixPosition;
if (offset == 0) {
final int length = VByte.readInt(buffer);
final ByteBuffer firstValue = buffer.asReadOnlyBuffer();
firstValue.limit(firstValue.position() + length);
return firstValue;
} else {
final int firstLength = VByte.readInt(buffer);
prefixPosition = buffer.position();
buffer.position(buffer.position() + firstLength);
}
int pos = 0;
int prefixLength;
int fragmentLength;
int fragmentPosition;
// scan through bucket values until we reach offset
do {
prefixLength = VByte.readInt(buffer);
if (++pos < offset) {
// not there yet, no need to read anything other than the length to skip ahead
final int skipLength = VByte.readInt(buffer);
buffer.position(buffer.position() + skipLength);
} else {
// we've reached our destination
fragmentLength = VByte.readInt(buffer);
fragmentPosition = buffer.position();
break;
}
} while (true);
final int valueLength = prefixLength + fragmentLength;
ByteBuffer value = ByteBuffer.allocate(valueLength);
for (int i = 0; i < valueLength; i++) {
if (i < prefixLength) {
value.put(buffer.get(prefixPosition + i));
} else {
value.put(buffer.get(fragmentPosition + i - prefixLength));
}
}
}
// (-(insertion point) - 1)
return -(currBucketFirstValueIndex + adjustIndex) + (~insertionPoint);
}
/**
* Get a value from a bucket at a relative position.
* <p>
* This method modifies the position of the buffer.
*/
static ByteBuffer getFromBucketV0(ByteBuffer buffer, int offset)
{
int prefixPosition;
if (offset == 0) {
final int length = VByte.readInt(buffer);
final ByteBuffer firstValue = buffer.asReadOnlyBuffer();
firstValue.limit(firstValue.position() + length);
return firstValue;
} else {
final int firstLength = VByte.readInt(buffer);
prefixPosition = buffer.position();
buffer.position(buffer.position() + firstLength);
}
int pos = 0;
int prefixLength;
int fragmentLength;
int fragmentPosition;
// scan through bucket values until we reach offset
do {
prefixLength = VByte.readInt(buffer);
if (++pos < offset) {
// not there yet, no need to read anything other than the length to skip ahead
final int skipLength = VByte.readInt(buffer);
buffer.position(buffer.position() + skipLength);
} else {
// we've reached our destination
fragmentLength = VByte.readInt(buffer);
fragmentPosition = buffer.position();
break;
}
} while (true);
final int valueLength = prefixLength + fragmentLength;
ByteBuffer value = ByteBuffer.allocate(valueLength);
for (int i = 0; i < valueLength; i++) {
if (i < prefixLength) {
value.put(buffer.get(prefixPosition + i));
} else {
value.put(buffer.get(fragmentPosition + i - prefixLength));
}
}
value.flip();
return value;
}
/**
* Read an entire bucket from a {@link ByteBuffer}, returning an array of reconstructed value bytes.
* <p>
* This method modifies the position of the buffer.
*/
private static ByteBuffer[] readBucketV0(ByteBuffer bucket, int numValues)
{
final int length = VByte.readInt(bucket);
final byte[] prefixBytes = new byte[length];
bucket.get(prefixBytes, 0, length);
final ByteBuffer[] bucketBuffers = new ByteBuffer[numValues];
bucketBuffers[0] = ByteBuffer.wrap(prefixBytes);
int pos = 1;
while (pos < numValues) {
final int prefixLength = VByte.readInt(bucket);
final int fragmentLength = VByte.readInt(bucket);
final byte[] fragment = new byte[fragmentLength];
bucket.get(fragment, 0, fragmentLength);
final ByteBuffer value = ByteBuffer.allocate(prefixLength + fragmentLength);
value.put(prefixBytes, 0, prefixLength);
value.put(fragment);
value.flip();
bucketBuffers[pos++] = value;
}
return bucketBuffers;
}
/**
* Finds a value in a bucket among the fragments. The first value is assumed to have been already compared against
* and be smaller than the value we are looking for. This comparison is the source of the 'shared prefix', which is
* the length which the value has in common with the previous value of the bucket.
* <p>
* This method uses this shared prefix length to skip more expensive byte by byte full value comparisons when
* possible by comparing the shared prefix length with the prefix length of the fragment. Since the bucket is always
* sorted, prefix lengths shrink as you progress to higher indexes, and we can use this to reason that a fragment
* with a longer prefix length than the shared prefix will always sort before the value we are looking for, and values
* which have a shorter prefix will always be greater than the value we are looking for, so we only need to do a
* full comparison if the prefix length is the same
* <p>
* this method modifies the position of {@link #buffer}
*/
private int findValueInBucketV1(
ByteBuffer value,
int currBucketFirstValueIndex,
int bucketSize,
int sharedPrefixLength
)
{
int relativePosition = 0;
int prefixLength;
// scan through bucket values until we find match or compare numValues
int insertionPoint = 1;
while (++relativePosition < bucketSize) {
prefixLength = VByte.readInt(buffer);
if (prefixLength > sharedPrefixLength) {
// bucket value shares more in common with the preceding value, so the value we are looking for comes after
final int skip = VByte.readInt(buffer);
buffer.position(buffer.position() + skip);
insertionPoint++;
} else if (prefixLength < sharedPrefixLength) {
// bucket value prefix is smaller, that means the value we are looking for sorts ahead of it
break;
} else {
// value has the same shared prefix, so compare additional values to find
final int fragmentLength = VByte.readInt(buffer);
final int common = Math.min(fragmentLength, value.remaining() - prefixLength);
int fragmentComparison = 0;
boolean shortCircuit = false;
for (int i = 0; i < common; i++) {
fragmentComparison = StringUtils.compareUtf8UsingJavaStringOrdering(
buffer.get(buffer.position() + i),
value.get(prefixLength + i)
);
if (fragmentComparison != 0) {
sharedPrefixLength = prefixLength + i;
shortCircuit = true;
break;
}
}
if (fragmentComparison == 0) {
fragmentComparison = Integer.compare(prefixLength + fragmentLength, value.remaining());
}
if (fragmentComparison == 0) {
return (currBucketFirstValueIndex + adjustIndex) + relativePosition;
} else if (fragmentComparison < 0) {
// value we are looking for is longer than the current bucket value, continue on
if (!shortCircuit) {
sharedPrefixLength = prefixLength + common;
}
buffer.position(buffer.position() + fragmentLength);
insertionPoint++;
} else {
break;
}
}
}
// (-(insertion point) - 1)
return -(currBucketFirstValueIndex + adjustIndex) + (~insertionPoint);
}
private ByteBuffer getFromBucketV1(ByteBuffer buffer, int offset)
{
// first value is written whole
final int length = VByte.readInt(buffer);
if (offset == 0) {
// return first value directly from underlying buffer since it is stored whole
final ByteBuffer value = buffer.asReadOnlyBuffer();
value.limit(value.position() + length);
return value;
}
int pos = 0;
int prefixLength;
int fragmentLength;
unwindPrefixLength[pos] = 0;
unwindBufferPosition[pos] = buffer.position();
buffer.position(buffer.position() + length);
do {
prefixLength = VByte.readInt(buffer);
if (++pos < offset) {
// not there yet, no need to read anything other than the length to skip ahead
final int skipLength = VByte.readInt(buffer);
unwindPrefixLength[pos] = prefixLength;
unwindBufferPosition[pos] = buffer.position();
buffer.position(buffer.position() + skipLength);
} else {
// we've reached our destination
fragmentLength = VByte.readInt(buffer);
if (prefixLength == 0) {
// no prefix, return it directly from the underlying buffer
final ByteBuffer value = buffer.asReadOnlyBuffer();
value.limit(value.position() + fragmentLength);
return value;
@Override
ByteBuffer[] readBucket(ByteBuffer buffer, int numValues)
{
final int length = VByte.readInt(buffer);
final byte[] prefixBytes = new byte[length];
buffer.get(prefixBytes, 0, length);
final ByteBuffer[] bucketBuffers = new ByteBuffer[numValues];
bucketBuffers[0] = ByteBuffer.wrap(prefixBytes);
int pos = 1;
while (pos < numValues) {
final int prefixLength = VByte.readInt(buffer);
final int fragmentLength = VByte.readInt(buffer);
final byte[] fragment = new byte[fragmentLength];
buffer.get(fragment, 0, fragmentLength);
final ByteBuffer value = ByteBuffer.allocate(prefixLength + fragmentLength);
value.put(prefixBytes, 0, prefixLength);
value.put(fragment);
value.flip();
bucketBuffers[pos++] = value;
}
return bucketBuffers;
}
@Override
int findInBucket(ByteBuffer value, int currBucketFirstValueIndex, int bucketSize, int sharedPrefixLength)
{
int relativePosition = 0;
int prefixLength;
// scan through bucket values until we find match or compare numValues
int insertionPoint = 1;
while (++relativePosition < bucketSize) {
prefixLength = VByte.readInt(buffer);
if (prefixLength > sharedPrefixLength) {
// this value shares more in common with the first value, so the value we are looking for comes after
final int skip = VByte.readInt(buffer);
buffer.position(buffer.position() + skip);
insertionPoint++;
} else if (prefixLength < sharedPrefixLength) {
// prefix is smaller, that means this value sorts ahead of it
break;
} else {
final int fragmentLength = VByte.readInt(buffer);
final int common = Math.min(fragmentLength, value.remaining() - prefixLength);
int fragmentComparison = 0;
for (int i = 0; i < common; i++) {
fragmentComparison = StringUtils.compareUtf8UsingJavaStringOrdering(
buffer.get(buffer.position() + i),
value.get(prefixLength + i)
);
if (fragmentComparison != 0) {
break;
}
}
if (fragmentComparison == 0) {
fragmentComparison = Integer.compare(prefixLength + fragmentLength, value.remaining());
}
if (fragmentComparison == 0) {
return (currBucketFirstValueIndex + adjustIndex) + relativePosition;
} else if (fragmentComparison < 0) {
buffer.position(buffer.position() + fragmentLength);
insertionPoint++;
} else {
break;
}
}
break;
}
} while (true);
final int valueLength = prefixLength + fragmentLength;
final byte[] valueBytes = new byte[valueLength];
buffer.get(valueBytes, prefixLength, fragmentLength);
for (int i = prefixLength; i > 0;) {
// previous value had a larger prefix than or the same as the value we are looking for
// skip it since the fragment doesn't have anything we need
if (unwindPrefixLength[--pos] >= i) {
continue;
// (-(insertion point) - 1)
return -(currBucketFirstValueIndex + adjustIndex) + (~insertionPoint);
}
}
public static final class FrontCodedV1 extends FrontCodedIndexed
{
private final int[] unwindPrefixLength;
private final int[] unwindBufferPosition;
private FrontCodedV1(
ByteBuffer buffer,
ByteOrder order,
int bucketSize,
int numValues,
boolean hasNull,
int offsetsPosition
)
{
super(buffer, order, bucketSize, numValues, hasNull, offsetsPosition);
this.unwindPrefixLength = new int[bucketSize];
this.unwindBufferPosition = new int[bucketSize];
}
@Override
ByteBuffer getFromBucket(ByteBuffer buffer, int offset)
{
// first value is written whole
final int length = VByte.readInt(buffer);
if (offset == 0) {
// return first value directly from underlying buffer since it is stored whole
final ByteBuffer value = buffer.asReadOnlyBuffer();
value.limit(value.position() + length);
return value;
}
buffer.position(unwindBufferPosition[pos]);
buffer.get(valueBytes, unwindPrefixLength[pos], i - unwindPrefixLength[pos]);
i = unwindPrefixLength[pos];
int pos = 0;
int prefixLength;
int fragmentLength;
unwindPrefixLength[pos] = 0;
unwindBufferPosition[pos] = buffer.position();
buffer.position(buffer.position() + length);
do {
prefixLength = VByte.readInt(buffer);
if (++pos < offset) {
// not there yet, no need to read anything other than the length to skip ahead
final int skipLength = VByte.readInt(buffer);
unwindPrefixLength[pos] = prefixLength;
unwindBufferPosition[pos] = buffer.position();
buffer.position(buffer.position() + skipLength);
} else {
// we've reached our destination
fragmentLength = VByte.readInt(buffer);
if (prefixLength == 0) {
// no prefix, return it directly from the underlying buffer
final ByteBuffer value = buffer.asReadOnlyBuffer();
value.limit(value.position() + fragmentLength);
return value;
}
break;
}
} while (true);
final int valueLength = prefixLength + fragmentLength;
final byte[] valueBytes = new byte[valueLength];
buffer.get(valueBytes, prefixLength, fragmentLength);
for (int i = prefixLength; i > 0;) {
// previous value had a larger prefix than or the same as the value we are looking for
// skip it since the fragment doesn't have anything we need
if (unwindPrefixLength[--pos] >= i) {
continue;
}
buffer.position(unwindBufferPosition[pos]);
buffer.get(valueBytes, unwindPrefixLength[pos], i - unwindPrefixLength[pos]);
i = unwindPrefixLength[pos];
}
return ByteBuffer.wrap(valueBytes);
}
return ByteBuffer.wrap(valueBytes);
}
/**
* Read an entire bucket from a {@link ByteBuffer}, returning an array of reconstructed value bytes.
* <p>
* This method modifies the position of the buffer.
*/
private ByteBuffer[] readBucketV1(ByteBuffer bucket, int numValues)
{
final ByteBuffer[] bucketBuffers = new ByteBuffer[numValues];
@Override
ByteBuffer[] readBucket(ByteBuffer buffer, int numValues)
{
final ByteBuffer[] bucketBuffers = new ByteBuffer[numValues];
// first value is written whole
final int length = VByte.readInt(bucket);
byte[] prefixBytes = new byte[length];
bucket.get(prefixBytes, 0, length);
bucketBuffers[0] = ByteBuffer.wrap(prefixBytes);
int pos = 1;
while (pos < numValues) {
final int prefixLength = VByte.readInt(bucket);
final int fragmentLength = VByte.readInt(bucket);
byte[] nextValueBytes = new byte[prefixLength + fragmentLength];
System.arraycopy(prefixBytes, 0, nextValueBytes, 0, prefixLength);
bucket.get(nextValueBytes, prefixLength, fragmentLength);
final ByteBuffer value = ByteBuffer.wrap(nextValueBytes);
prefixBytes = nextValueBytes;
bucketBuffers[pos++] = value;
// first value is written whole
final int length = VByte.readInt(buffer);
byte[] prefixBytes = new byte[length];
buffer.get(prefixBytes, 0, length);
bucketBuffers[0] = ByteBuffer.wrap(prefixBytes);
int pos = 1;
while (pos < numValues) {
final int prefixLength = VByte.readInt(buffer);
final int fragmentLength = VByte.readInt(buffer);
byte[] nextValueBytes = new byte[prefixLength + fragmentLength];
System.arraycopy(prefixBytes, 0, nextValueBytes, 0, prefixLength);
buffer.get(nextValueBytes, prefixLength, fragmentLength);
final ByteBuffer value = ByteBuffer.wrap(nextValueBytes);
prefixBytes = nextValueBytes;
bucketBuffers[pos++] = value;
}
return bucketBuffers;
}
return bucketBuffers;
}
@FunctionalInterface
interface GetBucketValue
{
ByteBuffer get(ByteBuffer buffer, int offset);
}
@Override
int findInBucket(ByteBuffer value, int currBucketFirstValueIndex, int bucketSize, int sharedPrefixLength)
{
int relativePosition = 0;
int prefixLength;
// scan through bucket values until we find match or compare numValues
int insertionPoint = 1;
while (++relativePosition < bucketSize) {
prefixLength = VByte.readInt(buffer);
if (prefixLength > sharedPrefixLength) {
// bucket value shares more in common with the preceding value, so the value we are looking for comes after
final int skip = VByte.readInt(buffer);
buffer.position(buffer.position() + skip);
insertionPoint++;
} else if (prefixLength < sharedPrefixLength) {
// bucket value prefix is smaller, that means the value we are looking for sorts ahead of it
break;
} else {
// value has the same shared prefix, so compare additional values to find
final int fragmentLength = VByte.readInt(buffer);
final int common = Math.min(fragmentLength, value.remaining() - prefixLength);
int fragmentComparison = 0;
boolean shortCircuit = false;
for (int i = 0; i < common; i++) {
fragmentComparison = StringUtils.compareUtf8UsingJavaStringOrdering(
buffer.get(buffer.position() + i),
value.get(prefixLength + i)
);
if (fragmentComparison != 0) {
sharedPrefixLength = prefixLength + i;
shortCircuit = true;
break;
}
}
if (fragmentComparison == 0) {
fragmentComparison = Integer.compare(prefixLength + fragmentLength, value.remaining());
}
@FunctionalInterface
interface ReadBucket
{
ByteBuffer[] readBucket(ByteBuffer buffer, int bucketSize);
}
@FunctionalInterface
interface FindInBucket
{
int find(ByteBuffer value, int currBucketFirstValueIndex, int bucketSize, int sharedPrefixLength);
if (fragmentComparison == 0) {
return (currBucketFirstValueIndex + adjustIndex) + relativePosition;
} else if (fragmentComparison < 0) {
// value we are looking for is longer than the current bucket value, continue on
if (!shortCircuit) {
sharedPrefixLength = prefixLength + common;
}
buffer.position(buffer.position() + fragmentLength);
insertionPoint++;
} else {
break;
}
}
}
// (-(insertion point) - 1)
return -(currBucketFirstValueIndex + adjustIndex) + (~insertionPoint);
}
}
}

View File

@ -217,7 +217,7 @@ public class FrontCodedIndexedWriter implements DictionaryWriter<byte[]>
bucketBuffer.clear();
final ByteBuffer valueBuffer = version == FrontCodedIndexed.V1
? getFromBucketV1(bucketBuffer, relativeIndex, bucketSize)
: FrontCodedIndexed.getFromBucketV0(bucketBuffer, relativeIndex);
: FrontCodedIndexed.FrontCodedV0.getValueFromBucket(bucketBuffer, relativeIndex);
final byte[] valueBytes = new byte[valueBuffer.limit() - valueBuffer.position()];
valueBuffer.get(valueBytes);
return valueBytes;
@ -413,15 +413,15 @@ public class FrontCodedIndexedWriter implements DictionaryWriter<byte[]>
return StringUtils.compareUtf8UsingJavaStringOrdering(b1, b2);
}
/**
* same as {@link FrontCodedIndexed#getFromBucketV1(ByteBuffer, int)} but without re-using prefixLength and buffer position
* arrays so has more overhead/garbage creation than the instance method.
* same as {@link FrontCodedIndexed.FrontCodedV1#getFromBucket(ByteBuffer, int)} but
* without re-using prefixLength and buffer position arrays so has more overhead/garbage creation than the instance
* method.
*
* Note: adding the unwindPrefixLength and unwindBufferPosition arrays as arguments and having
* {@link FrontCodedIndexed#getFromBucketV1(ByteBuffer, int)} call this static method added 5-10ns of overhead
* compared to having its own copy of the code, presumably due to the overhead of an additional method call and extra
* arguments.
* {@link FrontCodedIndexed.FrontCodedV1#getFromBucket(ByteBuffer, int)} call this static method added 5-10ns of
* overhead compared to having its own copy of the code, presumably due to the overhead of an additional method call
* and extra arguments.
*
* As such, since the writer is the only user of this method, it has been copied here...
*/