mirror of https://github.com/apache/druid.git
use big endian for compressed complex column values to fit object strategy expectations (#17422)
This commit is contained in:
parent
1be2b852e9
commit
10208baab2
|
@ -58,7 +58,8 @@ public final class CompressedBlockReader implements Closeable
|
|||
|
||||
public static Supplier<CompressedBlockReader> fromByteBuffer(
|
||||
ByteBuffer buffer,
|
||||
ByteOrder byteOrder,
|
||||
ByteOrder compressionOrder,
|
||||
ByteOrder valueOrder,
|
||||
boolean copyValuesOnRead
|
||||
)
|
||||
{
|
||||
|
@ -75,26 +76,27 @@ public final class CompressedBlockReader implements Closeable
|
|||
final int numBlocks = buffer.getInt();
|
||||
final int offsetsSize = numBlocks * Integer.BYTES;
|
||||
// buffer is at start of ending offsets
|
||||
final ByteBuffer offsets = buffer.asReadOnlyBuffer().order(byteOrder);
|
||||
final ByteBuffer offsets = buffer.asReadOnlyBuffer().order(compressionOrder);
|
||||
offsets.limit(offsets.position() + offsetsSize);
|
||||
final IntBuffer offsetView = offsets.slice().order(byteOrder).asIntBuffer();
|
||||
final IntBuffer offsetView = offsets.slice().order(compressionOrder).asIntBuffer();
|
||||
final int compressedSize = offsetView.get(numBlocks - 1);
|
||||
|
||||
// move to start of compressed data
|
||||
buffer.position(buffer.position() + offsetsSize);
|
||||
final ByteBuffer compressedData = buffer.asReadOnlyBuffer().order(byteOrder);
|
||||
final ByteBuffer compressedData = buffer.asReadOnlyBuffer().order(compressionOrder);
|
||||
compressedData.limit(compressedData.position() + compressedSize);
|
||||
buffer.position(buffer.position() + compressedSize);
|
||||
|
||||
final ByteBuffer compressedDataView = compressedData.slice().order(byteOrder);
|
||||
final ByteBuffer compressedDataView = compressedData.slice().order(compressionOrder);
|
||||
return () -> new CompressedBlockReader(
|
||||
compression,
|
||||
numBlocks,
|
||||
blockSize,
|
||||
copyValuesOnRead,
|
||||
offsetView.asReadOnlyBuffer(),
|
||||
compressedDataView.asReadOnlyBuffer().order(byteOrder),
|
||||
byteOrder
|
||||
compressedDataView.asReadOnlyBuffer().order(compressionOrder),
|
||||
compressionOrder,
|
||||
valueOrder
|
||||
);
|
||||
}
|
||||
throw new IAE("Unknown version[%s]", versionFromBuffer);
|
||||
|
@ -123,7 +125,8 @@ public final class CompressedBlockReader implements Closeable
|
|||
boolean copyValuesOnRead,
|
||||
IntBuffer endOffsetsBuffer,
|
||||
ByteBuffer compressedDataBuffer,
|
||||
ByteOrder byteOrder
|
||||
ByteOrder compressionByteOrder,
|
||||
ByteOrder valueByteOrder
|
||||
)
|
||||
{
|
||||
this.decompressor = compressionStrategy.getDecompressor();
|
||||
|
@ -134,11 +137,11 @@ public final class CompressedBlockReader implements Closeable
|
|||
this.endOffsetsBuffer = endOffsetsBuffer;
|
||||
this.compressedDataBuffer = compressedDataBuffer;
|
||||
this.closer = Closer.create();
|
||||
this.decompressedDataBufferHolder = CompressedPools.getByteBuf(byteOrder);
|
||||
this.decompressedDataBufferHolder = CompressedPools.getByteBuf(compressionByteOrder);
|
||||
closer.register(decompressedDataBufferHolder);
|
||||
this.decompressedDataBuffer = decompressedDataBufferHolder.get();
|
||||
this.decompressedDataBuffer.clear();
|
||||
this.byteOrder = byteOrder;
|
||||
this.byteOrder = valueByteOrder;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -29,7 +29,12 @@ public final class CompressedLongsReader implements ColumnarLongs
|
|||
{
|
||||
public static Supplier<CompressedLongsReader> fromByteBuffer(ByteBuffer buffer, ByteOrder order)
|
||||
{
|
||||
final Supplier<CompressedBlockReader> baseReader = CompressedBlockReader.fromByteBuffer(buffer, order, false);
|
||||
final Supplier<CompressedBlockReader> baseReader = CompressedBlockReader.fromByteBuffer(
|
||||
buffer,
|
||||
order,
|
||||
order, // long serializer uses native order, same as compression
|
||||
false
|
||||
);
|
||||
return () -> new CompressedLongsReader(baseReader.get());
|
||||
}
|
||||
|
||||
|
|
|
@ -34,17 +34,19 @@ public class CompressedVariableSizedBlobColumnSupplier implements Supplier<Compr
|
|||
public static CompressedVariableSizedBlobColumnSupplier fromByteBuffer(
|
||||
String filenameBase,
|
||||
ByteBuffer buffer,
|
||||
ByteOrder order,
|
||||
ByteOrder compressionOrder,
|
||||
ByteOrder valueOrder,
|
||||
SmooshedFileMapper mapper
|
||||
) throws IOException
|
||||
{
|
||||
return fromByteBuffer(filenameBase, buffer, order, false, mapper);
|
||||
return fromByteBuffer(filenameBase, buffer, compressionOrder, valueOrder, false, mapper);
|
||||
}
|
||||
|
||||
public static CompressedVariableSizedBlobColumnSupplier fromByteBuffer(
|
||||
String filenameBase,
|
||||
ByteBuffer buffer,
|
||||
ByteOrder order,
|
||||
ByteOrder compressionOrder,
|
||||
ByteOrder valueOrder,
|
||||
boolean copyValuesOnRead,
|
||||
SmooshedFileMapper mapper
|
||||
) throws IOException
|
||||
|
@ -59,7 +61,14 @@ public class CompressedVariableSizedBlobColumnSupplier implements Supplier<Compr
|
|||
final ByteBuffer dataBuffer = mapper.mapFile(
|
||||
CompressedVariableSizedBlobColumnSerializer.getCompressedBlobsFileName(filenameBase)
|
||||
);
|
||||
return new CompressedVariableSizedBlobColumnSupplier(offsetsBuffer, dataBuffer, order, numElements, copyValuesOnRead);
|
||||
return new CompressedVariableSizedBlobColumnSupplier(
|
||||
offsetsBuffer,
|
||||
dataBuffer,
|
||||
compressionOrder,
|
||||
valueOrder,
|
||||
numElements,
|
||||
copyValuesOnRead
|
||||
);
|
||||
}
|
||||
throw new IAE("Unknown version[%s]", versionFromBuffer);
|
||||
}
|
||||
|
@ -72,14 +81,15 @@ public class CompressedVariableSizedBlobColumnSupplier implements Supplier<Compr
|
|||
private CompressedVariableSizedBlobColumnSupplier(
|
||||
ByteBuffer offsetsBuffer,
|
||||
ByteBuffer dataBuffer,
|
||||
ByteOrder order,
|
||||
ByteOrder compressionOrder,
|
||||
ByteOrder valueOrder,
|
||||
int numElements,
|
||||
boolean copyValuesOnRead
|
||||
)
|
||||
{
|
||||
this.numElements = numElements;
|
||||
this.offsetReaderSupplier = CompressedLongsReader.fromByteBuffer(offsetsBuffer, order);
|
||||
this.blockDataReaderSupplier = CompressedBlockReader.fromByteBuffer(dataBuffer, order, copyValuesOnRead);
|
||||
this.offsetReaderSupplier = CompressedLongsReader.fromByteBuffer(offsetsBuffer, compressionOrder);
|
||||
this.blockDataReaderSupplier = CompressedBlockReader.fromByteBuffer(dataBuffer, compressionOrder, valueOrder, copyValuesOnRead);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -141,6 +141,7 @@ public class NestedDataColumnSupplier implements Supplier<NestedCommonFormatColu
|
|||
),
|
||||
rawBuffer,
|
||||
byteOrder,
|
||||
byteOrder, // byte order doesn't matter since serde is byte blobs
|
||||
mapper
|
||||
);
|
||||
if (hasNulls) {
|
||||
|
|
|
@ -164,6 +164,7 @@ public class NestedDataColumnSupplierV4 implements Supplier<ComplexColumn>
|
|||
),
|
||||
rawBuffer,
|
||||
metadata.getByteOrder(),
|
||||
metadata.getByteOrder(), // byte order doesn't matter since serde is byte blobs
|
||||
mapper
|
||||
);
|
||||
if (metadata.hasNulls()) {
|
||||
|
|
|
@ -28,18 +28,18 @@ import org.apache.druid.utils.CloseableUtils;
|
|||
import javax.annotation.Nullable;
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
public final class CompressedComplexColumn implements ComplexColumn
|
||||
public final class CompressedComplexColumn<T> implements ComplexColumn
|
||||
{
|
||||
private final String typeName;
|
||||
private final CompressedVariableSizedBlobColumn compressedColumn;
|
||||
private final ImmutableBitmap nullValues;
|
||||
private final ObjectStrategy<?> objectStrategy;
|
||||
private final ObjectStrategy<T> objectStrategy;
|
||||
|
||||
public CompressedComplexColumn(
|
||||
String typeName,
|
||||
CompressedVariableSizedBlobColumn compressedColumn,
|
||||
ImmutableBitmap nullValues,
|
||||
ObjectStrategy<?> objectStrategy
|
||||
ObjectStrategy<T> objectStrategy
|
||||
)
|
||||
{
|
||||
this.typeName = typeName;
|
||||
|
@ -62,7 +62,7 @@ public final class CompressedComplexColumn implements ComplexColumn
|
|||
|
||||
@Override
|
||||
@Nullable
|
||||
public Object getRowValue(int rowNum)
|
||||
public T getRowValue(int rowNum)
|
||||
{
|
||||
if (nullValues.get(rowNum)) {
|
||||
return null;
|
||||
|
|
|
@ -31,14 +31,15 @@ import org.apache.druid.segment.data.ObjectStrategy;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.ByteOrder;
|
||||
|
||||
public class CompressedComplexColumnSupplier implements Supplier<ComplexColumn>
|
||||
public class CompressedComplexColumnSupplier<T> implements Supplier<ComplexColumn>
|
||||
{
|
||||
public static CompressedComplexColumnSupplier read(
|
||||
public static <T> CompressedComplexColumnSupplier<T> read(
|
||||
ByteBuffer bb,
|
||||
ColumnBuilder columnBuilder,
|
||||
String typeName,
|
||||
ObjectStrategy objectStrategy
|
||||
ObjectStrategy<T> objectStrategy
|
||||
)
|
||||
{
|
||||
final byte version = bb.get();
|
||||
|
@ -67,6 +68,9 @@ public class CompressedComplexColumnSupplier implements Supplier<ComplexColumn>
|
|||
),
|
||||
fileBuffer,
|
||||
metadata.getByteOrder(),
|
||||
// object strategies today assume that all buffers are big endian, so we hard-code the value buffer
|
||||
// presented to the object strategy to always be big endian
|
||||
ByteOrder.BIG_ENDIAN,
|
||||
objectStrategy.readRetainsBufferReference(),
|
||||
mapper
|
||||
);
|
||||
|
@ -83,7 +87,7 @@ public class CompressedComplexColumnSupplier implements Supplier<ComplexColumn>
|
|||
nullValues = metadata.getBitmapSerdeFactory().getBitmapFactory().makeEmptyImmutableBitmap();
|
||||
}
|
||||
|
||||
return new CompressedComplexColumnSupplier(typeName, objectStrategy, compressedColumnSupplier, nullValues);
|
||||
return new CompressedComplexColumnSupplier<>(typeName, objectStrategy, compressedColumnSupplier, nullValues);
|
||||
}
|
||||
catch (IOException ex) {
|
||||
throw new RE(ex, "Failed to deserialize V%s column.", version);
|
||||
|
@ -93,13 +97,13 @@ public class CompressedComplexColumnSupplier implements Supplier<ComplexColumn>
|
|||
}
|
||||
|
||||
private final String typeName;
|
||||
private final ObjectStrategy objectStrategy;
|
||||
private final ObjectStrategy<T> objectStrategy;
|
||||
private final CompressedVariableSizedBlobColumnSupplier compressedColumnSupplier;
|
||||
private final ImmutableBitmap nullValues;
|
||||
|
||||
private CompressedComplexColumnSupplier(
|
||||
String typeName,
|
||||
ObjectStrategy objectStrategy,
|
||||
ObjectStrategy<T> objectStrategy,
|
||||
CompressedVariableSizedBlobColumnSupplier compressedColumnSupplier,
|
||||
ImmutableBitmap nullValues
|
||||
)
|
||||
|
@ -113,7 +117,7 @@ public class CompressedComplexColumnSupplier implements Supplier<ComplexColumn>
|
|||
@Override
|
||||
public ComplexColumn get()
|
||||
{
|
||||
return new CompressedComplexColumn(typeName, compressedColumnSupplier.get(), nullValues, objectStrategy);
|
||||
return new CompressedComplexColumn<>(typeName, compressedColumnSupplier.get(), nullValues, objectStrategy);
|
||||
}
|
||||
|
||||
public ImmutableBitmap getNullValues()
|
||||
|
|
|
@ -88,6 +88,7 @@ public class CompressedVariableSizeBlobColumnTest
|
|||
fileNameBase,
|
||||
base,
|
||||
ByteOrder.nativeOrder(),
|
||||
ByteOrder.nativeOrder(),
|
||||
fileMapper
|
||||
).get();
|
||||
for (int row = 0; row < numWritten; row++) {
|
||||
|
@ -151,6 +152,7 @@ public class CompressedVariableSizeBlobColumnTest
|
|||
fileNameBase,
|
||||
base,
|
||||
ByteOrder.nativeOrder(),
|
||||
ByteOrder.nativeOrder(),
|
||||
fileMapper
|
||||
).get();
|
||||
for (int row = 0; row < numWritten; row++) {
|
||||
|
@ -170,6 +172,68 @@ public class CompressedVariableSizeBlobColumnTest
|
|||
fileMapper.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSomeValuesByteBuffersBigEndian() throws IOException
|
||||
{
|
||||
final File tmpFile = tempFolder.newFolder();
|
||||
final FileSmoosher smoosher = new FileSmoosher(tmpFile);
|
||||
|
||||
final File tmpFile2 = tempFolder.newFolder();
|
||||
final SegmentWriteOutMedium writeOutMedium =
|
||||
TmpFileSegmentWriteOutMediumFactory.instance().makeSegmentWriteOutMedium(tmpFile2);
|
||||
|
||||
final String fileNameBase = "test";
|
||||
|
||||
final CompressionStrategy compressionStrategy = CompressionStrategy.LZ4;
|
||||
CompressedVariableSizedBlobColumnSerializer serializer = new CompressedVariableSizedBlobColumnSerializer(
|
||||
fileNameBase,
|
||||
writeOutMedium,
|
||||
compressionStrategy
|
||||
);
|
||||
serializer.open();
|
||||
|
||||
int numWritten = 0;
|
||||
final Random r = ThreadLocalRandom.current();
|
||||
final List<Long> values = new ArrayList<>();
|
||||
final ByteBuffer longValueConverter = ByteBuffer.allocate(Long.BYTES).order(ByteOrder.BIG_ENDIAN);
|
||||
for (int i = 0, offset = 0; offset < CompressedPools.BUFFER_SIZE * 4; i++, offset = 1 << i) {
|
||||
final long l = r.nextLong();
|
||||
values.add(l);
|
||||
longValueConverter.clear();
|
||||
longValueConverter.putLong(l);
|
||||
longValueConverter.rewind();
|
||||
serializer.addValue(longValueConverter.array());
|
||||
numWritten++;
|
||||
}
|
||||
|
||||
SmooshedWriter writer = smoosher.addWithSmooshedWriter(fileNameBase, serializer.getSerializedSize());
|
||||
serializer.writeTo(writer, smoosher);
|
||||
writer.close();
|
||||
smoosher.close();
|
||||
SmooshedFileMapper fileMapper = SmooshedFileMapper.load(tmpFile);
|
||||
|
||||
ByteBuffer base = fileMapper.mapFile(fileNameBase);
|
||||
|
||||
CompressedVariableSizedBlobColumn column = CompressedVariableSizedBlobColumnSupplier.fromByteBuffer(
|
||||
fileNameBase,
|
||||
base,
|
||||
ByteOrder.nativeOrder(),
|
||||
ByteOrder.BIG_ENDIAN,
|
||||
fileMapper
|
||||
).get();
|
||||
for (int row = 0; row < numWritten; row++) {
|
||||
ByteBuffer value = column.get(row);
|
||||
Assert.assertEquals("Row " + row, values.get(row).longValue(), value.getLong());
|
||||
}
|
||||
for (int rando = 0; rando < numWritten; rando++) {
|
||||
int row = ThreadLocalRandom.current().nextInt(0, numWritten - 1);
|
||||
ByteBuffer value = column.get(row);
|
||||
Assert.assertEquals("Row " + row, values.get(row).longValue(), value.getLong());
|
||||
}
|
||||
column.close();
|
||||
fileMapper.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLongs() throws IOException
|
||||
{
|
||||
|
|
Loading…
Reference in New Issue