diff --git a/processing/src/main/java/org/apache/druid/java/util/common/io/smoosh/FileSmoosher.java b/processing/src/main/java/org/apache/druid/java/util/common/io/smoosh/FileSmoosher.java index f4103ec1847..edb7589fb82 100644 --- a/processing/src/main/java/org/apache/druid/java/util/common/io/smoosh/FileSmoosher.java +++ b/processing/src/main/java/org/apache/druid/java/util/common/io/smoosh/FileSmoosher.java @@ -31,6 +31,7 @@ import org.apache.druid.java.util.common.MappedByteBufferHandler; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.segment.serde.Serializer; import java.io.BufferedWriter; import java.io.Closeable; @@ -161,6 +162,13 @@ public class FileSmoosher implements Closeable } } + public void serializeAs(String name, Serializer serializer) throws IOException + { + try (SmooshedWriter smooshChannel = addWithSmooshedWriter(name, serializer.getSerializedSize())) { + serializer.writeTo(smooshChannel, this); + } + } + public SmooshedWriter addWithSmooshedWriter(final String name, final long size) throws IOException { diff --git a/processing/src/main/java/org/apache/druid/segment/LongColumnSerializerV2.java b/processing/src/main/java/org/apache/druid/segment/LongColumnSerializerV2.java index 226cd8bafb0..0611d96e9c0 100644 --- a/processing/src/main/java/org/apache/druid/segment/LongColumnSerializerV2.java +++ b/processing/src/main/java/org/apache/druid/segment/LongColumnSerializerV2.java @@ -121,12 +121,29 @@ public class LongColumnSerializerV2 implements GenericColumnSerializer public void serialize(ColumnValueSelector selector) throws IOException { if (selector.isNull()) { - nullRowsBitmap.add(rowCount); - writer.add(0L); + serializeNull(); } else { - writer.add(selector.getLong()); + serializeValue(selector.getLong()); } - rowCount++; + } + + /** + * Serializes a null value at the rowCount position, and increments the current rowCount. + */ + public void serializeNull() throws IOException + { + nullRowsBitmap.add(rowCount); + writer.add(0L); + ++rowCount; + } + + /** + * Serializes a value of val at the rowCount position, and increments the current rowCount. + */ + public void serializeValue(long val) throws IOException + { + writer.add(val); + ++rowCount; } @Override diff --git a/processing/src/main/java/org/apache/druid/segment/column/StringUtf8DictionaryEncodedColumn.java b/processing/src/main/java/org/apache/druid/segment/column/StringUtf8DictionaryEncodedColumn.java index a5966096a6e..bed58a43675 100644 --- a/processing/src/main/java/org/apache/druid/segment/column/StringUtf8DictionaryEncodedColumn.java +++ b/processing/src/main/java/org/apache/druid/segment/column/StringUtf8DictionaryEncodedColumn.java @@ -20,6 +20,7 @@ package org.apache.druid.segment.column; import com.google.common.collect.Lists; +import org.apache.druid.common.semantic.SemanticUtils; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.query.extraction.ExtractionFn; import org.apache.druid.query.filter.DruidObjectPredicate; @@ -52,6 +53,8 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.BitSet; import java.util.List; +import java.util.Map; +import java.util.function.Function; /** * {@link DictionaryEncodedColumn} for a column which has a {@link ByteBuffer} based UTF-8 dictionary. @@ -62,6 +65,9 @@ import java.util.List; */ public class StringUtf8DictionaryEncodedColumn implements DictionaryEncodedColumn, NestedCommonFormatColumn { + private static final Map, Function> AS_MAP = + SemanticUtils.makeAsMap(StringUtf8DictionaryEncodedColumn.class); + @Nullable private final ColumnarInts column; @Nullable @@ -498,6 +504,15 @@ public class StringUtf8DictionaryEncodedColumn implements DictionaryEncodedColum } } + @SuppressWarnings("unchecked") + @Nullable + @Override + public T as(Class clazz) + { + //noinspection ReturnOfNull + return (T) AS_MAP.getOrDefault(clazz, arg -> null).apply(this); + } + @Override public void close() throws IOException { diff --git a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSupplier.java b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSupplier.java index e3699ea3db8..4473fa77d46 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSupplier.java +++ b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSupplier.java @@ -112,7 +112,7 @@ public class BlockLayoutColumnarDoublesSupplier implements Supplier { + private static final Map, Function> AS_MAP = + SemanticUtils.makeAsMap(BlockLayoutColumnarLongs.class); + private final GenericIndexed> baseLongBuffers; // The number of rows in this column. @@ -222,6 +228,15 @@ public class BlockLayoutColumnarLongsSupplier implements Supplier } } + @SuppressWarnings("unchecked") + @Nullable + @Override + public T as(Class clazz) + { + //noinspection ReturnOfNull + return (T) AS_MAP.getOrDefault(clazz, arg -> null).apply(this); + } + @Override public String toString() { diff --git a/processing/src/main/java/org/apache/druid/segment/data/ColumnarDoubles.java b/processing/src/main/java/org/apache/druid/segment/data/ColumnarDoubles.java index 9e098673da3..4c8ee6f0651 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/ColumnarDoubles.java +++ b/processing/src/main/java/org/apache/druid/segment/data/ColumnarDoubles.java @@ -46,9 +46,14 @@ public interface ColumnarDoubles extends Closeable double get(int index); default void get(double[] out, int start, int length) + { + get(out, 0, start, length); + } + + default void get(double[] out, int offset, int start, int length) { for (int i = 0; i < length; i++) { - out[i] = get(i + start); + out[offset + i] = get(i + start); } } diff --git a/processing/src/main/java/org/apache/druid/segment/data/ColumnarDoublesSerializer.java b/processing/src/main/java/org/apache/druid/segment/data/ColumnarDoublesSerializer.java index f33cdee4c4e..3d3fd4287d1 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/ColumnarDoublesSerializer.java +++ b/processing/src/main/java/org/apache/druid/segment/data/ColumnarDoublesSerializer.java @@ -29,6 +29,15 @@ import java.io.IOException; public interface ColumnarDoublesSerializer extends Serializer { void open() throws IOException; + int size(); + void add(double value) throws IOException; + + default void addAll(double[] values, int start, int end) throws IOException + { + for (int i = start; i < end; ++i) { + add(values[i]); + } + } } diff --git a/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSupplier.java b/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSupplier.java index 503d4f65fe9..eaf0b2a47a9 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSupplier.java +++ b/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSupplier.java @@ -354,6 +354,12 @@ public class CompressedVSizeColumnarIntsSupplier implements WritableSupplier p; diff --git a/processing/src/main/java/org/apache/druid/segment/data/LongsLongEncodingWriter.java b/processing/src/main/java/org/apache/druid/segment/data/LongsLongEncodingWriter.java index 728a50aa2fc..f2b198b7e7a 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/LongsLongEncodingWriter.java +++ b/processing/src/main/java/org/apache/druid/segment/data/LongsLongEncodingWriter.java @@ -74,6 +74,19 @@ public class LongsLongEncodingWriter implements CompressionFactory.LongEncodingW } } + @Override + public void write(long[] values, int offset, int length) throws IOException + { + if (outBuffer != null) { + outBuffer.asLongBuffer().put(values, offset, length); + outBuffer.position(outBuffer.position() + (length * Long.BYTES)); + } else { + for (int i = offset; i < length; ++i) { + write(values[i]); + } + } + } + @Override public void flush() { diff --git a/processing/src/main/java/org/apache/druid/segment/data/SingleValueColumnarIntsSerializer.java b/processing/src/main/java/org/apache/druid/segment/data/SingleValueColumnarIntsSerializer.java index 5f4fad50814..892549638f8 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/SingleValueColumnarIntsSerializer.java +++ b/processing/src/main/java/org/apache/druid/segment/data/SingleValueColumnarIntsSerializer.java @@ -27,4 +27,11 @@ import java.io.IOException; public abstract class SingleValueColumnarIntsSerializer implements ColumnarIntsSerializer { public abstract void addValue(int val) throws IOException; + + public void addValues(int[] vals, int start, int stop) throws IOException + { + for (int i = start; i < stop; ++i) { + addValue(vals[i]); + } + } } diff --git a/processing/src/main/java/org/apache/druid/segment/nested/CompressedNestedDataComplexColumn.java b/processing/src/main/java/org/apache/druid/segment/nested/CompressedNestedDataComplexColumn.java index 9821155e178..c2e46cec107 100644 --- a/processing/src/main/java/org/apache/druid/segment/nested/CompressedNestedDataComplexColumn.java +++ b/processing/src/main/java/org/apache/druid/segment/nested/CompressedNestedDataComplexColumn.java @@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Sets; import com.google.common.primitives.Doubles; import org.apache.druid.collections.bitmap.ImmutableBitmap; +import org.apache.druid.common.semantic.SemanticUtils; import org.apache.druid.error.DruidException; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; @@ -84,10 +85,12 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; /** * Implementation of {@link NestedDataComplexColumn} which uses a {@link CompressedVariableSizedBlobColumn} for the @@ -104,6 +107,9 @@ import java.util.concurrent.ConcurrentHashMap; public abstract class CompressedNestedDataComplexColumn> extends NestedDataComplexColumn implements NestedCommonFormatColumn { + private static final Map, Function> AS_MAP = + SemanticUtils.makeAsMap(CompressedNestedDataComplexColumn.class); + private static final ObjectStrategy STRATEGY = NestedDataComplexTypeSerde.INSTANCE.getObjectStrategy(); public static final IntTypeStrategy INT_TYPE_STRATEGY = new IntTypeStrategy(); private final ColumnConfig columnConfig; @@ -903,6 +909,15 @@ public abstract class CompressedNestedDataComplexColumn T as(Class clazz) + { + //noinspection ReturnOfNull + return (T) AS_MAP.getOrDefault(clazz, arg -> null).apply(this); + } + private ColumnHolder getColumnHolder(String field, int fieldIndex) { return columns.computeIfAbsent(fieldIndex, (f) -> readNestedFieldColumn(field, fieldIndex)); diff --git a/processing/src/main/java/org/apache/druid/segment/nested/NestedCommonFormatColumnSerializer.java b/processing/src/main/java/org/apache/druid/segment/nested/NestedCommonFormatColumnSerializer.java index 50bca997735..ab5b09417c5 100644 --- a/processing/src/main/java/org/apache/druid/segment/nested/NestedCommonFormatColumnSerializer.java +++ b/processing/src/main/java/org/apache/druid/segment/nested/NestedCommonFormatColumnSerializer.java @@ -19,16 +19,13 @@ package org.apache.druid.segment.nested; -import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.io.smoosh.FileSmoosher; import org.apache.druid.segment.GenericColumnSerializer; -import org.apache.druid.segment.data.VByte; import org.apache.druid.segment.serde.ColumnSerializerUtils; import org.apache.druid.segment.serde.Serializer; import java.io.IOException; import java.nio.ByteBuffer; -import java.nio.ByteOrder; import java.nio.channels.WritableByteChannel; import java.util.SortedMap; @@ -85,7 +82,7 @@ public abstract class NestedCommonFormatColumnSerializer implements GenericColum ColumnSerializerUtils.writeInternal(smoosher, buffer, getColumnName(), fileName); } - protected void writeV0Header(WritableByteChannel channel, ByteBuffer columnNameBuffer) throws IOException + public static void writeV0Header(WritableByteChannel channel, ByteBuffer columnNameBuffer) throws IOException { channel.write(ByteBuffer.wrap(new byte[]{NestedCommonFormatColumnSerializer.V0})); channel.write(columnNameBuffer); @@ -93,12 +90,7 @@ public abstract class NestedCommonFormatColumnSerializer implements GenericColum protected ByteBuffer computeFilenameBytes() { - final byte[] bytes = StringUtils.toUtf8(getColumnName()); - final int length = VByte.computeIntSize(bytes.length); - final ByteBuffer buffer = ByteBuffer.allocate(length + bytes.length).order(ByteOrder.nativeOrder()); - VByte.writeInt(buffer, bytes.length); - buffer.put(bytes); - buffer.flip(); - return buffer; + final String columnName = getColumnName(); + return ColumnSerializerUtils.stringToUtf8InVSizeByteBuffer(columnName); } } diff --git a/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializer.java b/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializer.java index 6a3405e58fc..cad28eada06 100644 --- a/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializer.java +++ b/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializer.java @@ -24,9 +24,8 @@ import com.google.common.collect.Maps; import org.apache.druid.collections.bitmap.ImmutableBitmap; import org.apache.druid.collections.bitmap.MutableBitmap; import org.apache.druid.common.config.NullHandling; +import org.apache.druid.error.DruidException; import org.apache.druid.java.util.common.FileUtils; -import org.apache.druid.java.util.common.ISE; -import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.io.smoosh.FileSmoosher; import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper; @@ -109,7 +108,7 @@ public class NestedDataColumnSerializer extends NestedCommonFormatColumnSerializ return ProcessedValue.NULL_LITERAL; } catch (IOException e) { - throw new RE(e, "Failed to write field [%s], unhandled value", fieldPath); + throw DruidException.defensive(e, "Failed to write field [%s], unhandled value", fieldPath); } } return ProcessedValue.NULL_LITERAL; @@ -134,7 +133,7 @@ public class NestedDataColumnSerializer extends NestedCommonFormatColumnSerializ return ProcessedValue.NULL_LITERAL; } catch (IOException e) { - throw new RE(e, "Failed to write field [%s] value [%s]", fieldPath, array); + throw DruidException.defensive(e, "Failed to write field [%s] value [%s]", fieldPath, array); } } } @@ -318,7 +317,7 @@ public class NestedDataColumnSerializer extends NestedCommonFormatColumnSerializ globalDictionaryIdLookup ); } else { - throw new ISE("Invalid field type [%s], how did this happen?", type); + throw DruidException.defensive("Invalid field type [%s], how did this happen?", type); } } else { writer = new VariantFieldColumnWriter( @@ -343,7 +342,9 @@ public class NestedDataColumnSerializer extends NestedCommonFormatColumnSerializ ) throws IOException { if (dictionarySerialized) { - throw new ISE("String dictionary already serialized for column [%s], cannot serialize again", name); + throw DruidException.defensive( + "String dictionary already serialized for column [%s], cannot serialize again", name + ); } // null is always 0 @@ -383,11 +384,17 @@ public class NestedDataColumnSerializer extends NestedCommonFormatColumnSerializ @Override public void serialize(ColumnValueSelector selector) throws IOException + { + serialize(StructuredData.wrap(selector.getObject())); + } + + public void serialize(StructuredData data) throws IOException { if (!dictionarySerialized) { - throw new ISE("Must serialize value dictionaries before serializing values for column [%s]", name); + throw DruidException.defensive( + "Must serialize value dictionaries before serializing values for column [%s]", name + ); } - StructuredData data = StructuredData.wrap(selector.getObject()); if (data == null) { nullRowsBitmap.add(rowCount); } diff --git a/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleColumn.java b/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleColumn.java index 689ad57cf7a..6ade80b6e83 100644 --- a/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleColumn.java +++ b/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleColumn.java @@ -21,6 +21,7 @@ package org.apache.druid.segment.nested; import org.apache.druid.collections.bitmap.ImmutableBitmap; import org.apache.druid.common.config.NullHandling; +import org.apache.druid.common.semantic.SemanticUtils; import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.segment.DoubleColumnSelector; @@ -37,12 +38,17 @@ import org.apache.druid.segment.vector.VectorValueSelector; import org.roaringbitmap.PeekableIntIterator; import javax.annotation.Nullable; +import java.util.Map; +import java.util.function.Function; /** * {@link NestedCommonFormatColumn} for {@link ColumnType#DOUBLE} */ public class ScalarDoubleColumn implements NestedCommonFormatColumn { + private static final Map, Function> AS_MAP = + SemanticUtils.makeAsMap(ScalarDoubleColumn.class); + private final FixedIndexed doubleDictionary; private final ColumnarDoubles valueColumn; private final ImmutableBitmap nullValueIndex; @@ -181,4 +187,13 @@ public class ScalarDoubleColumn implements NestedCommonFormatColumn { valueColumn.close(); } + + @SuppressWarnings("unchecked") + @Nullable + @Override + public T as(Class clazz) + { + //noinspection ReturnOfNull + return (T) AS_MAP.getOrDefault(clazz, arg -> null).apply(this); + } } diff --git a/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongColumn.java b/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongColumn.java index 6002b87c126..8a54ff31278 100644 --- a/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongColumn.java +++ b/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongColumn.java @@ -21,6 +21,7 @@ package org.apache.druid.segment.nested; import org.apache.druid.collections.bitmap.ImmutableBitmap; import org.apache.druid.common.config.NullHandling; +import org.apache.druid.common.semantic.SemanticUtils; import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.segment.LongColumnSelector; @@ -37,12 +38,17 @@ import org.apache.druid.segment.vector.VectorValueSelector; import org.roaringbitmap.PeekableIntIterator; import javax.annotation.Nullable; +import java.util.Map; +import java.util.function.Function; /** * {@link NestedCommonFormatColumn} for {@link ColumnType#LONG} */ public class ScalarLongColumn implements NestedCommonFormatColumn { + private static final Map, Function> AS_MAP = + SemanticUtils.makeAsMap(ScalarLongColumn.class); + private final FixedIndexed longDictionary; private final ColumnarLongs valueColumn; private final ImmutableBitmap nullValueIndex; @@ -182,4 +188,13 @@ public class ScalarLongColumn implements NestedCommonFormatColumn { valueColumn.close(); } + + @SuppressWarnings("unchecked") + @Nullable + @Override + public T as(Class clazz) + { + //noinspection ReturnOfNull + return (T) AS_MAP.getOrDefault(clazz, arg -> null).apply(this); + } } diff --git a/processing/src/main/java/org/apache/druid/segment/nested/VariantColumn.java b/processing/src/main/java/org/apache/druid/segment/nested/VariantColumn.java index 3eef2ac36e7..dfd6307148d 100644 --- a/processing/src/main/java/org/apache/druid/segment/nested/VariantColumn.java +++ b/processing/src/main/java/org/apache/druid/segment/nested/VariantColumn.java @@ -25,6 +25,7 @@ import it.unimi.dsi.fastutil.ints.IntArraySet; import it.unimi.dsi.fastutil.ints.IntSet; import org.apache.druid.collections.bitmap.ImmutableBitmap; import org.apache.druid.common.guava.GuavaUtils; +import org.apache.druid.common.semantic.SemanticUtils; import org.apache.druid.error.DruidException; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.StringUtils; @@ -69,8 +70,10 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.BitSet; import java.util.Iterator; +import java.util.Map; import java.util.SortedMap; import java.util.TreeMap; +import java.util.function.Function; /** * {@link NestedCommonFormatColumn} for single type array columns, and mixed type columns. If {@link #variantTypes} @@ -80,6 +83,9 @@ import java.util.TreeMap; public class VariantColumn> implements DictionaryEncodedColumn, NestedCommonFormatColumn { + private static final Map, Function> AS_MAP = + SemanticUtils.makeAsMap(VariantColumn.class); + private final TStringDictionary stringDictionary; private final FixedIndexed longDictionary; private final FixedIndexed doubleDictionary; @@ -1008,4 +1014,13 @@ public class VariantColumn> return offset.getCurrentVectorSize(); } } + + @SuppressWarnings("unchecked") + @Nullable + @Override + public T as(Class clazz) + { + //noinspection ReturnOfNull + return (T) AS_MAP.getOrDefault(clazz, arg -> null).apply(this); + } } diff --git a/processing/src/main/java/org/apache/druid/segment/nested/VariantColumnAndIndexSupplier.java b/processing/src/main/java/org/apache/druid/segment/nested/VariantColumnAndIndexSupplier.java index 23555b2ea2d..b93235b5bab 100644 --- a/processing/src/main/java/org/apache/druid/segment/nested/VariantColumnAndIndexSupplier.java +++ b/processing/src/main/java/org/apache/druid/segment/nested/VariantColumnAndIndexSupplier.java @@ -31,7 +31,6 @@ import org.apache.druid.math.expr.ExprEval; import org.apache.druid.math.expr.ExprType; import org.apache.druid.math.expr.ExpressionType; import org.apache.druid.query.BitmapResultFactory; -import org.apache.druid.segment.column.ColumnBuilder; import org.apache.druid.segment.column.ColumnIndexSupplier; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.StringEncodingStrategies; @@ -68,7 +67,7 @@ public class VariantColumnAndIndexSupplier implements Supplier> stringDictionarySupplier; final Supplier> longDictionarySupplier; final Supplier> doubleDictionarySupplier; @@ -104,33 +102,33 @@ public class VariantColumnAndIndexSupplier implements Supplier valueIndexes = GenericIndexed.read( valueIndexBuffer, bitmapSerdeFactory.getObjectStrategy(), - columnBuilder.getFileMapper() + fileMapper ); final ByteBuffer elementIndexBuffer = NestedCommonFormatColumnPartSerde.loadInternalFile( - mapper, + fileMapper, columnName, ColumnSerializerUtils.ARRAY_ELEMENT_BITMAP_INDEX_FILE_NAME ); final GenericIndexed arrayElementIndexes = GenericIndexed.read( elementIndexBuffer, bitmapSerdeFactory.getObjectStrategy(), - columnBuilder.getFileMapper() + fileMapper ); return new VariantColumnAndIndexSupplier( diff --git a/processing/src/main/java/org/apache/druid/segment/serde/ColumnSerializerUtils.java b/processing/src/main/java/org/apache/druid/segment/serde/ColumnSerializerUtils.java index 8396f07fd35..c8974f5e6f4 100644 --- a/processing/src/main/java/org/apache/druid/segment/serde/ColumnSerializerUtils.java +++ b/processing/src/main/java/org/apache/druid/segment/serde/ColumnSerializerUtils.java @@ -24,11 +24,13 @@ import com.fasterxml.jackson.dataformat.smile.SmileFactory; import com.fasterxml.jackson.dataformat.smile.SmileGenerator; import org.apache.druid.guice.BuiltInTypesModule; import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.io.smoosh.FileSmoosher; -import org.apache.druid.java.util.common.io.smoosh.SmooshedWriter; +import org.apache.druid.segment.data.VByte; import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.ByteOrder; public class ColumnSerializerUtils { @@ -59,23 +61,33 @@ public class ColumnSerializerUtils public static void writeInternal(FileSmoosher smoosher, Serializer serializer, String columnName, String fileName) throws IOException { - final String internalName = getInternalFileName(columnName, fileName); - try (SmooshedWriter smooshChannel = smoosher.addWithSmooshedWriter(internalName, serializer.getSerializedSize())) { - serializer.writeTo(smooshChannel, smoosher); - } + smoosher.serializeAs(getInternalFileName(columnName, fileName), serializer); } public static void writeInternal(FileSmoosher smoosher, ByteBuffer buffer, String columnName, String fileName) throws IOException { - final String internalName = getInternalFileName(columnName, fileName); - try (SmooshedWriter smooshChannel = smoosher.addWithSmooshedWriter(internalName, buffer.capacity())) { - smooshChannel.write(buffer); - } + smoosher.add(getInternalFileName(columnName, fileName), buffer); } public static String getInternalFileName(String fileNameBase, String field) { return fileNameBase + "." + field; } + + /** + * Convert a String to a ByteBuffer with a variable size length prepended to it. + * @param stringVal the value to store in the ByteBuffer + * @return ByteBuffer with the string converted to utf8 bytes and stored with a variable size length int prepended + */ + public static ByteBuffer stringToUtf8InVSizeByteBuffer(String stringVal) + { + final byte[] bytes = StringUtils.toUtf8(stringVal); + final int length = VByte.computeIntSize(bytes.length); + final ByteBuffer buffer = ByteBuffer.allocate(length + bytes.length).order(ByteOrder.nativeOrder()); + VByte.writeInt(buffer, bytes.length); + buffer.put(bytes); + buffer.flip(); + return buffer; + } } diff --git a/processing/src/main/java/org/apache/druid/segment/serde/NestedCommonFormatColumnPartSerde.java b/processing/src/main/java/org/apache/druid/segment/serde/NestedCommonFormatColumnPartSerde.java index d923de51d09..6e1f0967c83 100644 --- a/processing/src/main/java/org/apache/druid/segment/serde/NestedCommonFormatColumnPartSerde.java +++ b/processing/src/main/java/org/apache/druid/segment/serde/NestedCommonFormatColumnPartSerde.java @@ -281,7 +281,7 @@ public class NestedCommonFormatColumnPartSerde implements ColumnPartSerde byteOrder, bitmapSerdeFactory, buffer, - builder, + builder.getFileMapper(), parent == null ? null : (VariantColumnAndIndexSupplier) parent.getColumnSupplier() ); ColumnCapabilitiesImpl capabilitiesBuilder = builder.getCapabilitiesBuilder(); diff --git a/processing/src/test/java/org/apache/druid/segment/data/CompressedColumnarIntsSerializerTest.java b/processing/src/test/java/org/apache/druid/segment/data/CompressedColumnarIntsSerializerTest.java index 6e1c4229084..2d213eddb5f 100644 --- a/processing/src/test/java/org/apache/druid/segment/data/CompressedColumnarIntsSerializerTest.java +++ b/processing/src/test/java/org/apache/druid/segment/data/CompressedColumnarIntsSerializerTest.java @@ -253,9 +253,7 @@ public class CompressedColumnarIntsSerializerTest ); writer.open(); - for (int val : vals) { - writer.addValue(val); - } + writer.addValues(vals, 0, vals.length); final SmooshedWriter channel = smoosher.addWithSmooshedWriter("test", writer.getSerializedSize()); writer.writeTo(channel, smoosher); channel.close(); diff --git a/processing/src/test/java/org/apache/druid/segment/data/CompressedDoublesSerdeTest.java b/processing/src/test/java/org/apache/druid/segment/data/CompressedDoublesSerdeTest.java index be472a71ed9..613809b836f 100644 --- a/processing/src/test/java/org/apache/druid/segment/data/CompressedDoublesSerdeTest.java +++ b/processing/src/test/java/org/apache/druid/segment/data/CompressedDoublesSerdeTest.java @@ -172,9 +172,7 @@ public class CompressedDoublesSerdeTest ); serializer.open(); - for (double value : values) { - serializer.add(value); - } + serializer.addAll(values, 0, values.length); Assert.assertEquals(values.length, serializer.size()); final ByteArrayOutputStream baos = new ByteArrayOutputStream(); diff --git a/processing/src/test/java/org/apache/druid/segment/nested/VariantColumnSupplierTest.java b/processing/src/test/java/org/apache/druid/segment/nested/VariantColumnSupplierTest.java index 6aea2ace234..0598552d519 100644 --- a/processing/src/test/java/org/apache/druid/segment/nested/VariantColumnSupplierTest.java +++ b/processing/src/test/java/org/apache/druid/segment/nested/VariantColumnSupplierTest.java @@ -317,7 +317,7 @@ public class VariantColumnSupplierTest extends InitializedNullHandlingTest ByteOrder.nativeOrder(), bitmapSerdeFactory, baseBuffer, - bob, + bob.getFileMapper(), null ); try (VariantColumn column = (VariantColumn) supplier.get()) { @@ -336,7 +336,7 @@ public class VariantColumnSupplierTest extends InitializedNullHandlingTest ByteOrder.nativeOrder(), bitmapSerdeFactory, baseBuffer, - bob, + bob.getFileMapper(), null ); final String expectedReason = "none";