diff --git a/processing/src/main/java/org/apache/druid/segment/data/CompressedBlockReader.java b/processing/src/main/java/org/apache/druid/segment/data/CompressedBlockReader.java index 4ab7f1ddb45..951a3cca447 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/CompressedBlockReader.java +++ b/processing/src/main/java/org/apache/druid/segment/data/CompressedBlockReader.java @@ -58,7 +58,8 @@ public final class CompressedBlockReader implements Closeable public static Supplier fromByteBuffer( ByteBuffer buffer, - ByteOrder byteOrder, + ByteOrder compressionOrder, + ByteOrder valueOrder, boolean copyValuesOnRead ) { @@ -75,26 +76,27 @@ public final class CompressedBlockReader implements Closeable final int numBlocks = buffer.getInt(); final int offsetsSize = numBlocks * Integer.BYTES; // buffer is at start of ending offsets - final ByteBuffer offsets = buffer.asReadOnlyBuffer().order(byteOrder); + final ByteBuffer offsets = buffer.asReadOnlyBuffer().order(compressionOrder); offsets.limit(offsets.position() + offsetsSize); - final IntBuffer offsetView = offsets.slice().order(byteOrder).asIntBuffer(); + final IntBuffer offsetView = offsets.slice().order(compressionOrder).asIntBuffer(); final int compressedSize = offsetView.get(numBlocks - 1); // move to start of compressed data buffer.position(buffer.position() + offsetsSize); - final ByteBuffer compressedData = buffer.asReadOnlyBuffer().order(byteOrder); + final ByteBuffer compressedData = buffer.asReadOnlyBuffer().order(compressionOrder); compressedData.limit(compressedData.position() + compressedSize); buffer.position(buffer.position() + compressedSize); - final ByteBuffer compressedDataView = compressedData.slice().order(byteOrder); + final ByteBuffer compressedDataView = compressedData.slice().order(compressionOrder); return () -> new CompressedBlockReader( compression, numBlocks, blockSize, copyValuesOnRead, offsetView.asReadOnlyBuffer(), - compressedDataView.asReadOnlyBuffer().order(byteOrder), - byteOrder + compressedDataView.asReadOnlyBuffer().order(compressionOrder), + compressionOrder, + valueOrder ); } throw new IAE("Unknown version[%s]", versionFromBuffer); @@ -123,7 +125,8 @@ public final class CompressedBlockReader implements Closeable boolean copyValuesOnRead, IntBuffer endOffsetsBuffer, ByteBuffer compressedDataBuffer, - ByteOrder byteOrder + ByteOrder compressionByteOrder, + ByteOrder valueByteOrder ) { this.decompressor = compressionStrategy.getDecompressor(); @@ -134,11 +137,11 @@ public final class CompressedBlockReader implements Closeable this.endOffsetsBuffer = endOffsetsBuffer; this.compressedDataBuffer = compressedDataBuffer; this.closer = Closer.create(); - this.decompressedDataBufferHolder = CompressedPools.getByteBuf(byteOrder); + this.decompressedDataBufferHolder = CompressedPools.getByteBuf(compressionByteOrder); closer.register(decompressedDataBufferHolder); this.decompressedDataBuffer = decompressedDataBufferHolder.get(); this.decompressedDataBuffer.clear(); - this.byteOrder = byteOrder; + this.byteOrder = valueByteOrder; } /** diff --git a/processing/src/main/java/org/apache/druid/segment/data/CompressedLongsReader.java b/processing/src/main/java/org/apache/druid/segment/data/CompressedLongsReader.java index 12d97dbfcc3..1b7094ab9d7 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/CompressedLongsReader.java +++ b/processing/src/main/java/org/apache/druid/segment/data/CompressedLongsReader.java @@ -29,7 +29,12 @@ public final class CompressedLongsReader implements ColumnarLongs { public static Supplier fromByteBuffer(ByteBuffer buffer, ByteOrder order) { - final Supplier baseReader = CompressedBlockReader.fromByteBuffer(buffer, order, false); + final Supplier baseReader = CompressedBlockReader.fromByteBuffer( + buffer, + order, + order, // long serializer uses native order, same as compression + false + ); return () -> new CompressedLongsReader(baseReader.get()); } diff --git a/processing/src/main/java/org/apache/druid/segment/data/CompressedVariableSizedBlobColumnSupplier.java b/processing/src/main/java/org/apache/druid/segment/data/CompressedVariableSizedBlobColumnSupplier.java index bf88dbfc8c4..17ef19f7f41 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/CompressedVariableSizedBlobColumnSupplier.java +++ b/processing/src/main/java/org/apache/druid/segment/data/CompressedVariableSizedBlobColumnSupplier.java @@ -34,17 +34,19 @@ public class CompressedVariableSizedBlobColumnSupplier implements Supplier ), rawBuffer, metadata.getByteOrder(), + metadata.getByteOrder(), // byte order doesn't matter since serde is byte blobs mapper ); if (metadata.hasNulls()) { diff --git a/processing/src/main/java/org/apache/druid/segment/serde/CompressedComplexColumn.java b/processing/src/main/java/org/apache/druid/segment/serde/CompressedComplexColumn.java index 0811dd22ccf..23c6bbee7c9 100644 --- a/processing/src/main/java/org/apache/druid/segment/serde/CompressedComplexColumn.java +++ b/processing/src/main/java/org/apache/druid/segment/serde/CompressedComplexColumn.java @@ -28,18 +28,18 @@ import org.apache.druid.utils.CloseableUtils; import javax.annotation.Nullable; import java.nio.ByteBuffer; -public final class CompressedComplexColumn implements ComplexColumn +public final class CompressedComplexColumn implements ComplexColumn { private final String typeName; private final CompressedVariableSizedBlobColumn compressedColumn; private final ImmutableBitmap nullValues; - private final ObjectStrategy objectStrategy; + private final ObjectStrategy objectStrategy; public CompressedComplexColumn( String typeName, CompressedVariableSizedBlobColumn compressedColumn, ImmutableBitmap nullValues, - ObjectStrategy objectStrategy + ObjectStrategy objectStrategy ) { this.typeName = typeName; @@ -62,7 +62,7 @@ public final class CompressedComplexColumn implements ComplexColumn @Override @Nullable - public Object getRowValue(int rowNum) + public T getRowValue(int rowNum) { if (nullValues.get(rowNum)) { return null; diff --git a/processing/src/main/java/org/apache/druid/segment/serde/CompressedComplexColumnSupplier.java b/processing/src/main/java/org/apache/druid/segment/serde/CompressedComplexColumnSupplier.java index b49c655ff14..8687054156b 100644 --- a/processing/src/main/java/org/apache/druid/segment/serde/CompressedComplexColumnSupplier.java +++ b/processing/src/main/java/org/apache/druid/segment/serde/CompressedComplexColumnSupplier.java @@ -31,14 +31,15 @@ import org.apache.druid.segment.data.ObjectStrategy; import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.ByteOrder; -public class CompressedComplexColumnSupplier implements Supplier +public class CompressedComplexColumnSupplier implements Supplier { - public static CompressedComplexColumnSupplier read( + public static CompressedComplexColumnSupplier read( ByteBuffer bb, ColumnBuilder columnBuilder, String typeName, - ObjectStrategy objectStrategy + ObjectStrategy objectStrategy ) { final byte version = bb.get(); @@ -67,6 +68,9 @@ public class CompressedComplexColumnSupplier implements Supplier ), fileBuffer, metadata.getByteOrder(), + // object strategies today assume that all buffers are big endian, so we hard-code the value buffer + // presented to the object strategy to always be big endian + ByteOrder.BIG_ENDIAN, objectStrategy.readRetainsBufferReference(), mapper ); @@ -83,7 +87,7 @@ public class CompressedComplexColumnSupplier implements Supplier nullValues = metadata.getBitmapSerdeFactory().getBitmapFactory().makeEmptyImmutableBitmap(); } - return new CompressedComplexColumnSupplier(typeName, objectStrategy, compressedColumnSupplier, nullValues); + return new CompressedComplexColumnSupplier<>(typeName, objectStrategy, compressedColumnSupplier, nullValues); } catch (IOException ex) { throw new RE(ex, "Failed to deserialize V%s column.", version); @@ -93,13 +97,13 @@ public class CompressedComplexColumnSupplier implements Supplier } private final String typeName; - private final ObjectStrategy objectStrategy; + private final ObjectStrategy objectStrategy; private final CompressedVariableSizedBlobColumnSupplier compressedColumnSupplier; private final ImmutableBitmap nullValues; private CompressedComplexColumnSupplier( String typeName, - ObjectStrategy objectStrategy, + ObjectStrategy objectStrategy, CompressedVariableSizedBlobColumnSupplier compressedColumnSupplier, ImmutableBitmap nullValues ) @@ -113,7 +117,7 @@ public class CompressedComplexColumnSupplier implements Supplier @Override public ComplexColumn get() { - return new CompressedComplexColumn(typeName, compressedColumnSupplier.get(), nullValues, objectStrategy); + return new CompressedComplexColumn<>(typeName, compressedColumnSupplier.get(), nullValues, objectStrategy); } public ImmutableBitmap getNullValues() diff --git a/processing/src/test/java/org/apache/druid/segment/data/CompressedVariableSizeBlobColumnTest.java b/processing/src/test/java/org/apache/druid/segment/data/CompressedVariableSizeBlobColumnTest.java index eaa447a2812..070440d203e 100644 --- a/processing/src/test/java/org/apache/druid/segment/data/CompressedVariableSizeBlobColumnTest.java +++ b/processing/src/test/java/org/apache/druid/segment/data/CompressedVariableSizeBlobColumnTest.java @@ -88,6 +88,7 @@ public class CompressedVariableSizeBlobColumnTest fileNameBase, base, ByteOrder.nativeOrder(), + ByteOrder.nativeOrder(), fileMapper ).get(); for (int row = 0; row < numWritten; row++) { @@ -151,6 +152,7 @@ public class CompressedVariableSizeBlobColumnTest fileNameBase, base, ByteOrder.nativeOrder(), + ByteOrder.nativeOrder(), fileMapper ).get(); for (int row = 0; row < numWritten; row++) { @@ -170,6 +172,68 @@ public class CompressedVariableSizeBlobColumnTest fileMapper.close(); } + @Test + public void testSomeValuesByteBuffersBigEndian() throws IOException + { + final File tmpFile = tempFolder.newFolder(); + final FileSmoosher smoosher = new FileSmoosher(tmpFile); + + final File tmpFile2 = tempFolder.newFolder(); + final SegmentWriteOutMedium writeOutMedium = + TmpFileSegmentWriteOutMediumFactory.instance().makeSegmentWriteOutMedium(tmpFile2); + + final String fileNameBase = "test"; + + final CompressionStrategy compressionStrategy = CompressionStrategy.LZ4; + CompressedVariableSizedBlobColumnSerializer serializer = new CompressedVariableSizedBlobColumnSerializer( + fileNameBase, + writeOutMedium, + compressionStrategy + ); + serializer.open(); + + int numWritten = 0; + final Random r = ThreadLocalRandom.current(); + final List values = new ArrayList<>(); + final ByteBuffer longValueConverter = ByteBuffer.allocate(Long.BYTES).order(ByteOrder.BIG_ENDIAN); + for (int i = 0, offset = 0; offset < CompressedPools.BUFFER_SIZE * 4; i++, offset = 1 << i) { + final long l = r.nextLong(); + values.add(l); + longValueConverter.clear(); + longValueConverter.putLong(l); + longValueConverter.rewind(); + serializer.addValue(longValueConverter.array()); + numWritten++; + } + + SmooshedWriter writer = smoosher.addWithSmooshedWriter(fileNameBase, serializer.getSerializedSize()); + serializer.writeTo(writer, smoosher); + writer.close(); + smoosher.close(); + SmooshedFileMapper fileMapper = SmooshedFileMapper.load(tmpFile); + + ByteBuffer base = fileMapper.mapFile(fileNameBase); + + CompressedVariableSizedBlobColumn column = CompressedVariableSizedBlobColumnSupplier.fromByteBuffer( + fileNameBase, + base, + ByteOrder.nativeOrder(), + ByteOrder.BIG_ENDIAN, + fileMapper + ).get(); + for (int row = 0; row < numWritten; row++) { + ByteBuffer value = column.get(row); + Assert.assertEquals("Row " + row, values.get(row).longValue(), value.getLong()); + } + for (int rando = 0; rando < numWritten; rando++) { + int row = ThreadLocalRandom.current().nextInt(0, numWritten - 1); + ByteBuffer value = column.get(row); + Assert.assertEquals("Row " + row, values.get(row).longValue(), value.getLong()); + } + column.close(); + fileMapper.close(); + } + @Test public void testLongs() throws IOException {