Minor refactors to processing module (#17136)

Refactors a few things.

- Adds SemanticUtils maps to columns.
- Add some addAll functions to reduce duplication, and for future reuse.
- Refactor VariantColumnAndIndexSupplier to only take a SmooshedFileMapper instead.
- Refactor LongColumnSerializerV2 to have separate functions for serializing a value and null.
This commit is contained in:
Adarsh Sanjeev 2024-10-07 13:18:35 +05:30 committed by GitHub
parent 7e35e50052
commit c9201ad658
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
22 changed files with 219 additions and 59 deletions

View File

@ -31,6 +31,7 @@ import org.apache.druid.java.util.common.MappedByteBufferHandler;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.serde.Serializer;
import java.io.BufferedWriter; import java.io.BufferedWriter;
import java.io.Closeable; import java.io.Closeable;
@ -161,6 +162,13 @@ public class FileSmoosher implements Closeable
} }
} }
public void serializeAs(String name, Serializer serializer) throws IOException
{
try (SmooshedWriter smooshChannel = addWithSmooshedWriter(name, serializer.getSerializedSize())) {
serializer.writeTo(smooshChannel, this);
}
}
public SmooshedWriter addWithSmooshedWriter(final String name, final long size) throws IOException public SmooshedWriter addWithSmooshedWriter(final String name, final long size) throws IOException
{ {

View File

@ -121,12 +121,29 @@ public class LongColumnSerializerV2 implements GenericColumnSerializer<Object>
public void serialize(ColumnValueSelector<?> selector) throws IOException public void serialize(ColumnValueSelector<?> selector) throws IOException
{ {
if (selector.isNull()) { if (selector.isNull()) {
nullRowsBitmap.add(rowCount); serializeNull();
writer.add(0L);
} else { } else {
writer.add(selector.getLong()); serializeValue(selector.getLong());
} }
rowCount++; }
/**
* Serializes a null value at the rowCount position, and increments the current rowCount.
*/
public void serializeNull() throws IOException
{
nullRowsBitmap.add(rowCount);
writer.add(0L);
++rowCount;
}
/**
* Serializes a value of val at the rowCount position, and increments the current rowCount.
*/
public void serializeValue(long val) throws IOException
{
writer.add(val);
++rowCount;
} }
@Override @Override

View File

@ -20,6 +20,7 @@
package org.apache.druid.segment.column; package org.apache.druid.segment.column;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import org.apache.druid.common.semantic.SemanticUtils;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.extraction.ExtractionFn; import org.apache.druid.query.extraction.ExtractionFn;
import org.apache.druid.query.filter.DruidObjectPredicate; import org.apache.druid.query.filter.DruidObjectPredicate;
@ -52,6 +53,8 @@ import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.BitSet; import java.util.BitSet;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.function.Function;
/** /**
* {@link DictionaryEncodedColumn<String>} for a column which has a {@link ByteBuffer} based UTF-8 dictionary. * {@link DictionaryEncodedColumn<String>} for a column which has a {@link ByteBuffer} based UTF-8 dictionary.
@ -62,6 +65,9 @@ import java.util.List;
*/ */
public class StringUtf8DictionaryEncodedColumn implements DictionaryEncodedColumn<String>, NestedCommonFormatColumn public class StringUtf8DictionaryEncodedColumn implements DictionaryEncodedColumn<String>, NestedCommonFormatColumn
{ {
private static final Map<Class<?>, Function<StringUtf8DictionaryEncodedColumn, ?>> AS_MAP =
SemanticUtils.makeAsMap(StringUtf8DictionaryEncodedColumn.class);
@Nullable @Nullable
private final ColumnarInts column; private final ColumnarInts column;
@Nullable @Nullable
@ -498,6 +504,15 @@ public class StringUtf8DictionaryEncodedColumn implements DictionaryEncodedColum
} }
} }
@SuppressWarnings("unchecked")
@Nullable
@Override
public <T> T as(Class<? extends T> clazz)
{
//noinspection ReturnOfNull
return (T) AS_MAP.getOrDefault(clazz, arg -> null).apply(this);
}
@Override @Override
public void close() throws IOException public void close() throws IOException
{ {

View File

@ -112,7 +112,7 @@ public class BlockLayoutColumnarDoublesSupplier implements Supplier<ColumnarDoub
} }
@Override @Override
public void get(final double[] out, final int start, final int length) public void get(final double[] out, int offset, final int start, final int length)
{ {
// division + remainder is optimized by the compiler so keep those together // division + remainder is optimized by the compiler so keep those together
int bufferNum = start / sizePer; int bufferNum = start / sizePer;
@ -129,7 +129,7 @@ public class BlockLayoutColumnarDoublesSupplier implements Supplier<ColumnarDoub
final int oldPosition = doubleBuffer.position(); final int oldPosition = doubleBuffer.position();
try { try {
doubleBuffer.position(bufferIndex); doubleBuffer.position(bufferIndex);
doubleBuffer.get(out, p, limit); doubleBuffer.get(out, offset + p, limit);
} }
finally { finally {
doubleBuffer.position(oldPosition); doubleBuffer.position(oldPosition);

View File

@ -21,14 +21,20 @@ package org.apache.druid.segment.data;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
import org.apache.druid.collections.ResourceHolder; import org.apache.druid.collections.ResourceHolder;
import org.apache.druid.common.semantic.SemanticUtils;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.ByteOrder; import java.nio.ByteOrder;
import java.nio.LongBuffer; import java.nio.LongBuffer;
import java.util.Map;
import java.util.function.Function;
public class BlockLayoutColumnarLongsSupplier implements Supplier<ColumnarLongs> public class BlockLayoutColumnarLongsSupplier implements Supplier<ColumnarLongs>
{ {
private static final Map<Class<?>, Function<BlockLayoutColumnarLongs, ?>> AS_MAP =
SemanticUtils.makeAsMap(BlockLayoutColumnarLongs.class);
private final GenericIndexed<ResourceHolder<ByteBuffer>> baseLongBuffers; private final GenericIndexed<ResourceHolder<ByteBuffer>> baseLongBuffers;
// The number of rows in this column. // The number of rows in this column.
@ -222,6 +228,15 @@ public class BlockLayoutColumnarLongsSupplier implements Supplier<ColumnarLongs>
} }
} }
@SuppressWarnings("unchecked")
@Nullable
@Override
public <T> T as(Class<? extends T> clazz)
{
//noinspection ReturnOfNull
return (T) AS_MAP.getOrDefault(clazz, arg -> null).apply(this);
}
@Override @Override
public String toString() public String toString()
{ {

View File

@ -46,9 +46,14 @@ public interface ColumnarDoubles extends Closeable
double get(int index); double get(int index);
default void get(double[] out, int start, int length) default void get(double[] out, int start, int length)
{
get(out, 0, start, length);
}
default void get(double[] out, int offset, int start, int length)
{ {
for (int i = 0; i < length; i++) { for (int i = 0; i < length; i++) {
out[i] = get(i + start); out[offset + i] = get(i + start);
} }
} }

View File

@ -29,6 +29,15 @@ import java.io.IOException;
public interface ColumnarDoublesSerializer extends Serializer public interface ColumnarDoublesSerializer extends Serializer
{ {
void open() throws IOException; void open() throws IOException;
int size(); int size();
void add(double value) throws IOException; void add(double value) throws IOException;
default void addAll(double[] values, int start, int end) throws IOException
{
for (int i = start; i < end; ++i) {
add(values[i]);
}
}
} }

View File

@ -354,6 +354,12 @@ public class CompressedVSizeColumnarIntsSupplier implements WritableSupplier<Col
@Override @Override
public void get(int[] out, int start, int length) public void get(int[] out, int start, int length)
{
get(out, 0, start, length);
}
@Override
public void get(int[] out, int offset, int start, int length)
{ {
int p = 0; int p = 0;
@ -374,7 +380,7 @@ public class CompressedVSizeColumnarIntsSupplier implements WritableSupplier<Col
break; break;
} }
out[i] = _get(buffer, bigEndian, index - currBufferStart); out[offset + i] = _get(buffer, bigEndian, index - currBufferStart);
} }
assert i > p; assert i > p;

View File

@ -74,6 +74,19 @@ public class LongsLongEncodingWriter implements CompressionFactory.LongEncodingW
} }
} }
@Override
public void write(long[] values, int offset, int length) throws IOException
{
if (outBuffer != null) {
outBuffer.asLongBuffer().put(values, offset, length);
outBuffer.position(outBuffer.position() + (length * Long.BYTES));
} else {
for (int i = offset; i < length; ++i) {
write(values[i]);
}
}
}
@Override @Override
public void flush() public void flush()
{ {

View File

@ -27,4 +27,11 @@ import java.io.IOException;
public abstract class SingleValueColumnarIntsSerializer implements ColumnarIntsSerializer public abstract class SingleValueColumnarIntsSerializer implements ColumnarIntsSerializer
{ {
public abstract void addValue(int val) throws IOException; public abstract void addValue(int val) throws IOException;
public void addValues(int[] vals, int start, int stop) throws IOException
{
for (int i = start; i < stop; ++i) {
addValue(vals[i]);
}
}
} }

View File

@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.google.common.primitives.Doubles; import com.google.common.primitives.Doubles;
import org.apache.druid.collections.bitmap.ImmutableBitmap; import org.apache.druid.collections.bitmap.ImmutableBitmap;
import org.apache.druid.common.semantic.SemanticUtils;
import org.apache.druid.error.DruidException; import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.ISE;
@ -84,10 +85,12 @@ import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.SortedMap; import java.util.SortedMap;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
/** /**
* Implementation of {@link NestedDataComplexColumn} which uses a {@link CompressedVariableSizedBlobColumn} for the * Implementation of {@link NestedDataComplexColumn} which uses a {@link CompressedVariableSizedBlobColumn} for the
@ -104,6 +107,9 @@ import java.util.concurrent.ConcurrentHashMap;
public abstract class CompressedNestedDataComplexColumn<TStringDictionary extends Indexed<ByteBuffer>> public abstract class CompressedNestedDataComplexColumn<TStringDictionary extends Indexed<ByteBuffer>>
extends NestedDataComplexColumn implements NestedCommonFormatColumn extends NestedDataComplexColumn implements NestedCommonFormatColumn
{ {
private static final Map<Class<?>, Function<CompressedNestedDataComplexColumn, ?>> AS_MAP =
SemanticUtils.makeAsMap(CompressedNestedDataComplexColumn.class);
private static final ObjectStrategy<Object> STRATEGY = NestedDataComplexTypeSerde.INSTANCE.getObjectStrategy(); private static final ObjectStrategy<Object> STRATEGY = NestedDataComplexTypeSerde.INSTANCE.getObjectStrategy();
public static final IntTypeStrategy INT_TYPE_STRATEGY = new IntTypeStrategy(); public static final IntTypeStrategy INT_TYPE_STRATEGY = new IntTypeStrategy();
private final ColumnConfig columnConfig; private final ColumnConfig columnConfig;
@ -903,6 +909,15 @@ public abstract class CompressedNestedDataComplexColumn<TStringDictionary extend
return getColumnHolder(field, fieldIndex).getCapabilities().isNumeric(); return getColumnHolder(field, fieldIndex).getCapabilities().isNumeric();
} }
@SuppressWarnings("unchecked")
@Nullable
@Override
public <T> T as(Class<? extends T> clazz)
{
//noinspection ReturnOfNull
return (T) AS_MAP.getOrDefault(clazz, arg -> null).apply(this);
}
private ColumnHolder getColumnHolder(String field, int fieldIndex) private ColumnHolder getColumnHolder(String field, int fieldIndex)
{ {
return columns.computeIfAbsent(fieldIndex, (f) -> readNestedFieldColumn(field, fieldIndex)); return columns.computeIfAbsent(fieldIndex, (f) -> readNestedFieldColumn(field, fieldIndex));

View File

@ -19,16 +19,13 @@
package org.apache.druid.segment.nested; package org.apache.druid.segment.nested;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.io.smoosh.FileSmoosher; import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
import org.apache.druid.segment.GenericColumnSerializer; import org.apache.druid.segment.GenericColumnSerializer;
import org.apache.druid.segment.data.VByte;
import org.apache.druid.segment.serde.ColumnSerializerUtils; import org.apache.druid.segment.serde.ColumnSerializerUtils;
import org.apache.druid.segment.serde.Serializer; import org.apache.druid.segment.serde.Serializer;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.WritableByteChannel; import java.nio.channels.WritableByteChannel;
import java.util.SortedMap; import java.util.SortedMap;
@ -85,7 +82,7 @@ public abstract class NestedCommonFormatColumnSerializer implements GenericColum
ColumnSerializerUtils.writeInternal(smoosher, buffer, getColumnName(), fileName); ColumnSerializerUtils.writeInternal(smoosher, buffer, getColumnName(), fileName);
} }
protected void writeV0Header(WritableByteChannel channel, ByteBuffer columnNameBuffer) throws IOException public static void writeV0Header(WritableByteChannel channel, ByteBuffer columnNameBuffer) throws IOException
{ {
channel.write(ByteBuffer.wrap(new byte[]{NestedCommonFormatColumnSerializer.V0})); channel.write(ByteBuffer.wrap(new byte[]{NestedCommonFormatColumnSerializer.V0}));
channel.write(columnNameBuffer); channel.write(columnNameBuffer);
@ -93,12 +90,7 @@ public abstract class NestedCommonFormatColumnSerializer implements GenericColum
protected ByteBuffer computeFilenameBytes() protected ByteBuffer computeFilenameBytes()
{ {
final byte[] bytes = StringUtils.toUtf8(getColumnName()); final String columnName = getColumnName();
final int length = VByte.computeIntSize(bytes.length); return ColumnSerializerUtils.stringToUtf8InVSizeByteBuffer(columnName);
final ByteBuffer buffer = ByteBuffer.allocate(length + bytes.length).order(ByteOrder.nativeOrder());
VByte.writeInt(buffer, bytes.length);
buffer.put(bytes);
buffer.flip();
return buffer;
} }
} }

View File

@ -24,9 +24,8 @@ import com.google.common.collect.Maps;
import org.apache.druid.collections.bitmap.ImmutableBitmap; import org.apache.druid.collections.bitmap.ImmutableBitmap;
import org.apache.druid.collections.bitmap.MutableBitmap; import org.apache.druid.collections.bitmap.MutableBitmap;
import org.apache.druid.common.config.NullHandling; import org.apache.druid.common.config.NullHandling;
import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.FileUtils; import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.io.smoosh.FileSmoosher; import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper; import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper;
@ -109,7 +108,7 @@ public class NestedDataColumnSerializer extends NestedCommonFormatColumnSerializ
return ProcessedValue.NULL_LITERAL; return ProcessedValue.NULL_LITERAL;
} }
catch (IOException e) { catch (IOException e) {
throw new RE(e, "Failed to write field [%s], unhandled value", fieldPath); throw DruidException.defensive(e, "Failed to write field [%s], unhandled value", fieldPath);
} }
} }
return ProcessedValue.NULL_LITERAL; return ProcessedValue.NULL_LITERAL;
@ -134,7 +133,7 @@ public class NestedDataColumnSerializer extends NestedCommonFormatColumnSerializ
return ProcessedValue.NULL_LITERAL; return ProcessedValue.NULL_LITERAL;
} }
catch (IOException e) { catch (IOException e) {
throw new RE(e, "Failed to write field [%s] value [%s]", fieldPath, array); throw DruidException.defensive(e, "Failed to write field [%s] value [%s]", fieldPath, array);
} }
} }
} }
@ -318,7 +317,7 @@ public class NestedDataColumnSerializer extends NestedCommonFormatColumnSerializ
globalDictionaryIdLookup globalDictionaryIdLookup
); );
} else { } else {
throw new ISE("Invalid field type [%s], how did this happen?", type); throw DruidException.defensive("Invalid field type [%s], how did this happen?", type);
} }
} else { } else {
writer = new VariantFieldColumnWriter( writer = new VariantFieldColumnWriter(
@ -343,7 +342,9 @@ public class NestedDataColumnSerializer extends NestedCommonFormatColumnSerializ
) throws IOException ) throws IOException
{ {
if (dictionarySerialized) { if (dictionarySerialized) {
throw new ISE("String dictionary already serialized for column [%s], cannot serialize again", name); throw DruidException.defensive(
"String dictionary already serialized for column [%s], cannot serialize again", name
);
} }
// null is always 0 // null is always 0
@ -383,11 +384,17 @@ public class NestedDataColumnSerializer extends NestedCommonFormatColumnSerializ
@Override @Override
public void serialize(ColumnValueSelector<? extends StructuredData> selector) throws IOException public void serialize(ColumnValueSelector<? extends StructuredData> selector) throws IOException
{
serialize(StructuredData.wrap(selector.getObject()));
}
public void serialize(StructuredData data) throws IOException
{ {
if (!dictionarySerialized) { if (!dictionarySerialized) {
throw new ISE("Must serialize value dictionaries before serializing values for column [%s]", name); throw DruidException.defensive(
"Must serialize value dictionaries before serializing values for column [%s]", name
);
} }
StructuredData data = StructuredData.wrap(selector.getObject());
if (data == null) { if (data == null) {
nullRowsBitmap.add(rowCount); nullRowsBitmap.add(rowCount);
} }

View File

@ -21,6 +21,7 @@ package org.apache.druid.segment.nested;
import org.apache.druid.collections.bitmap.ImmutableBitmap; import org.apache.druid.collections.bitmap.ImmutableBitmap;
import org.apache.druid.common.config.NullHandling; import org.apache.druid.common.config.NullHandling;
import org.apache.druid.common.semantic.SemanticUtils;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.DoubleColumnSelector; import org.apache.druid.segment.DoubleColumnSelector;
@ -37,12 +38,17 @@ import org.apache.druid.segment.vector.VectorValueSelector;
import org.roaringbitmap.PeekableIntIterator; import org.roaringbitmap.PeekableIntIterator;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.util.Map;
import java.util.function.Function;
/** /**
* {@link NestedCommonFormatColumn} for {@link ColumnType#DOUBLE} * {@link NestedCommonFormatColumn} for {@link ColumnType#DOUBLE}
*/ */
public class ScalarDoubleColumn implements NestedCommonFormatColumn public class ScalarDoubleColumn implements NestedCommonFormatColumn
{ {
private static final Map<Class<?>, Function<ScalarDoubleColumn, ?>> AS_MAP =
SemanticUtils.makeAsMap(ScalarDoubleColumn.class);
private final FixedIndexed<Double> doubleDictionary; private final FixedIndexed<Double> doubleDictionary;
private final ColumnarDoubles valueColumn; private final ColumnarDoubles valueColumn;
private final ImmutableBitmap nullValueIndex; private final ImmutableBitmap nullValueIndex;
@ -181,4 +187,13 @@ public class ScalarDoubleColumn implements NestedCommonFormatColumn
{ {
valueColumn.close(); valueColumn.close();
} }
@SuppressWarnings("unchecked")
@Nullable
@Override
public <T> T as(Class<? extends T> clazz)
{
//noinspection ReturnOfNull
return (T) AS_MAP.getOrDefault(clazz, arg -> null).apply(this);
}
} }

View File

@ -21,6 +21,7 @@ package org.apache.druid.segment.nested;
import org.apache.druid.collections.bitmap.ImmutableBitmap; import org.apache.druid.collections.bitmap.ImmutableBitmap;
import org.apache.druid.common.config.NullHandling; import org.apache.druid.common.config.NullHandling;
import org.apache.druid.common.semantic.SemanticUtils;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.LongColumnSelector; import org.apache.druid.segment.LongColumnSelector;
@ -37,12 +38,17 @@ import org.apache.druid.segment.vector.VectorValueSelector;
import org.roaringbitmap.PeekableIntIterator; import org.roaringbitmap.PeekableIntIterator;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.util.Map;
import java.util.function.Function;
/** /**
* {@link NestedCommonFormatColumn} for {@link ColumnType#LONG} * {@link NestedCommonFormatColumn} for {@link ColumnType#LONG}
*/ */
public class ScalarLongColumn implements NestedCommonFormatColumn public class ScalarLongColumn implements NestedCommonFormatColumn
{ {
private static final Map<Class<?>, Function<ScalarLongColumn, ?>> AS_MAP =
SemanticUtils.makeAsMap(ScalarLongColumn.class);
private final FixedIndexed<Long> longDictionary; private final FixedIndexed<Long> longDictionary;
private final ColumnarLongs valueColumn; private final ColumnarLongs valueColumn;
private final ImmutableBitmap nullValueIndex; private final ImmutableBitmap nullValueIndex;
@ -182,4 +188,13 @@ public class ScalarLongColumn implements NestedCommonFormatColumn
{ {
valueColumn.close(); valueColumn.close();
} }
@SuppressWarnings("unchecked")
@Nullable
@Override
public <T> T as(Class<? extends T> clazz)
{
//noinspection ReturnOfNull
return (T) AS_MAP.getOrDefault(clazz, arg -> null).apply(this);
}
} }

View File

@ -25,6 +25,7 @@ import it.unimi.dsi.fastutil.ints.IntArraySet;
import it.unimi.dsi.fastutil.ints.IntSet; import it.unimi.dsi.fastutil.ints.IntSet;
import org.apache.druid.collections.bitmap.ImmutableBitmap; import org.apache.druid.collections.bitmap.ImmutableBitmap;
import org.apache.druid.common.guava.GuavaUtils; import org.apache.druid.common.guava.GuavaUtils;
import org.apache.druid.common.semantic.SemanticUtils;
import org.apache.druid.error.DruidException; import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
@ -69,8 +70,10 @@ import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.BitSet; import java.util.BitSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map;
import java.util.SortedMap; import java.util.SortedMap;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.function.Function;
/** /**
* {@link NestedCommonFormatColumn} for single type array columns, and mixed type columns. If {@link #variantTypes} * {@link NestedCommonFormatColumn} for single type array columns, and mixed type columns. If {@link #variantTypes}
@ -80,6 +83,9 @@ import java.util.TreeMap;
public class VariantColumn<TStringDictionary extends Indexed<ByteBuffer>> public class VariantColumn<TStringDictionary extends Indexed<ByteBuffer>>
implements DictionaryEncodedColumn<String>, NestedCommonFormatColumn implements DictionaryEncodedColumn<String>, NestedCommonFormatColumn
{ {
private static final Map<Class<?>, Function<VariantColumn, ?>> AS_MAP =
SemanticUtils.makeAsMap(VariantColumn.class);
private final TStringDictionary stringDictionary; private final TStringDictionary stringDictionary;
private final FixedIndexed<Long> longDictionary; private final FixedIndexed<Long> longDictionary;
private final FixedIndexed<Double> doubleDictionary; private final FixedIndexed<Double> doubleDictionary;
@ -1008,4 +1014,13 @@ public class VariantColumn<TStringDictionary extends Indexed<ByteBuffer>>
return offset.getCurrentVectorSize(); return offset.getCurrentVectorSize();
} }
} }
@SuppressWarnings("unchecked")
@Nullable
@Override
public <T> T as(Class<? extends T> clazz)
{
//noinspection ReturnOfNull
return (T) AS_MAP.getOrDefault(clazz, arg -> null).apply(this);
}
} }

View File

@ -31,7 +31,6 @@ import org.apache.druid.math.expr.ExprEval;
import org.apache.druid.math.expr.ExprType; import org.apache.druid.math.expr.ExprType;
import org.apache.druid.math.expr.ExpressionType; import org.apache.druid.math.expr.ExpressionType;
import org.apache.druid.query.BitmapResultFactory; import org.apache.druid.query.BitmapResultFactory;
import org.apache.druid.segment.column.ColumnBuilder;
import org.apache.druid.segment.column.ColumnIndexSupplier; import org.apache.druid.segment.column.ColumnIndexSupplier;
import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.StringEncodingStrategies; import org.apache.druid.segment.column.StringEncodingStrategies;
@ -68,7 +67,7 @@ public class VariantColumnAndIndexSupplier implements Supplier<NestedCommonForma
ByteOrder byteOrder, ByteOrder byteOrder,
BitmapSerdeFactory bitmapSerdeFactory, BitmapSerdeFactory bitmapSerdeFactory,
ByteBuffer bb, ByteBuffer bb,
ColumnBuilder columnBuilder, SmooshedFileMapper fileMapper,
@Nullable VariantColumnAndIndexSupplier parent @Nullable VariantColumnAndIndexSupplier parent
) )
{ {
@ -89,7 +88,6 @@ public class VariantColumnAndIndexSupplier implements Supplier<NestedCommonForma
if (version == NestedCommonFormatColumnSerializer.V0) { if (version == NestedCommonFormatColumnSerializer.V0) {
try { try {
final SmooshedFileMapper mapper = columnBuilder.getFileMapper();
final Supplier<? extends Indexed<ByteBuffer>> stringDictionarySupplier; final Supplier<? extends Indexed<ByteBuffer>> stringDictionarySupplier;
final Supplier<FixedIndexed<Long>> longDictionarySupplier; final Supplier<FixedIndexed<Long>> longDictionarySupplier;
final Supplier<FixedIndexed<Double>> doubleDictionarySupplier; final Supplier<FixedIndexed<Double>> doubleDictionarySupplier;
@ -104,33 +102,33 @@ public class VariantColumnAndIndexSupplier implements Supplier<NestedCommonForma
arrayElementDictionarySupplier = parent.arrayElementDictionarySupplier; arrayElementDictionarySupplier = parent.arrayElementDictionarySupplier;
} else { } else {
final ByteBuffer stringDictionaryBuffer = NestedCommonFormatColumnPartSerde.loadInternalFile( final ByteBuffer stringDictionaryBuffer = NestedCommonFormatColumnPartSerde.loadInternalFile(
mapper, fileMapper,
columnName, columnName,
ColumnSerializerUtils.STRING_DICTIONARY_FILE_NAME ColumnSerializerUtils.STRING_DICTIONARY_FILE_NAME
); );
final ByteBuffer longDictionaryBuffer = NestedCommonFormatColumnPartSerde.loadInternalFile( final ByteBuffer longDictionaryBuffer = NestedCommonFormatColumnPartSerde.loadInternalFile(
mapper, fileMapper,
columnName, columnName,
ColumnSerializerUtils.LONG_DICTIONARY_FILE_NAME ColumnSerializerUtils.LONG_DICTIONARY_FILE_NAME
); );
final ByteBuffer doubleDictionaryBuffer = NestedCommonFormatColumnPartSerde.loadInternalFile( final ByteBuffer doubleDictionaryBuffer = NestedCommonFormatColumnPartSerde.loadInternalFile(
mapper, fileMapper,
columnName, columnName,
ColumnSerializerUtils.DOUBLE_DICTIONARY_FILE_NAME ColumnSerializerUtils.DOUBLE_DICTIONARY_FILE_NAME
); );
final ByteBuffer arrayDictionarybuffer = NestedCommonFormatColumnPartSerde.loadInternalFile( final ByteBuffer arrayDictionarybuffer = NestedCommonFormatColumnPartSerde.loadInternalFile(
mapper, fileMapper,
columnName, columnName,
ColumnSerializerUtils.ARRAY_DICTIONARY_FILE_NAME ColumnSerializerUtils.ARRAY_DICTIONARY_FILE_NAME
); );
final ByteBuffer arrayElementDictionaryBuffer = NestedCommonFormatColumnPartSerde.loadInternalFile( final ByteBuffer arrayElementDictionaryBuffer = NestedCommonFormatColumnPartSerde.loadInternalFile(
mapper, fileMapper,
columnName, columnName,
ColumnSerializerUtils.ARRAY_ELEMENT_DICTIONARY_FILE_NAME ColumnSerializerUtils.ARRAY_ELEMENT_DICTIONARY_FILE_NAME
); );
stringDictionarySupplier = StringEncodingStrategies.getStringDictionarySupplier( stringDictionarySupplier = StringEncodingStrategies.getStringDictionarySupplier(
mapper, fileMapper,
stringDictionaryBuffer, stringDictionaryBuffer,
byteOrder byteOrder
); );
@ -159,7 +157,7 @@ public class VariantColumnAndIndexSupplier implements Supplier<NestedCommonForma
} }
final ByteBuffer encodedValueColumn = NestedCommonFormatColumnPartSerde.loadInternalFile( final ByteBuffer encodedValueColumn = NestedCommonFormatColumnPartSerde.loadInternalFile(
mapper, fileMapper,
columnName, columnName,
ColumnSerializerUtils.ENCODED_VALUE_COLUMN_FILE_NAME ColumnSerializerUtils.ENCODED_VALUE_COLUMN_FILE_NAME
); );
@ -168,24 +166,24 @@ public class VariantColumnAndIndexSupplier implements Supplier<NestedCommonForma
byteOrder byteOrder
); );
final ByteBuffer valueIndexBuffer = NestedCommonFormatColumnPartSerde.loadInternalFile( final ByteBuffer valueIndexBuffer = NestedCommonFormatColumnPartSerde.loadInternalFile(
mapper, fileMapper,
columnName, columnName,
ColumnSerializerUtils.BITMAP_INDEX_FILE_NAME ColumnSerializerUtils.BITMAP_INDEX_FILE_NAME
); );
final GenericIndexed<ImmutableBitmap> valueIndexes = GenericIndexed.read( final GenericIndexed<ImmutableBitmap> valueIndexes = GenericIndexed.read(
valueIndexBuffer, valueIndexBuffer,
bitmapSerdeFactory.getObjectStrategy(), bitmapSerdeFactory.getObjectStrategy(),
columnBuilder.getFileMapper() fileMapper
); );
final ByteBuffer elementIndexBuffer = NestedCommonFormatColumnPartSerde.loadInternalFile( final ByteBuffer elementIndexBuffer = NestedCommonFormatColumnPartSerde.loadInternalFile(
mapper, fileMapper,
columnName, columnName,
ColumnSerializerUtils.ARRAY_ELEMENT_BITMAP_INDEX_FILE_NAME ColumnSerializerUtils.ARRAY_ELEMENT_BITMAP_INDEX_FILE_NAME
); );
final GenericIndexed<ImmutableBitmap> arrayElementIndexes = GenericIndexed.read( final GenericIndexed<ImmutableBitmap> arrayElementIndexes = GenericIndexed.read(
elementIndexBuffer, elementIndexBuffer,
bitmapSerdeFactory.getObjectStrategy(), bitmapSerdeFactory.getObjectStrategy(),
columnBuilder.getFileMapper() fileMapper
); );
return new VariantColumnAndIndexSupplier( return new VariantColumnAndIndexSupplier(

View File

@ -24,11 +24,13 @@ import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.fasterxml.jackson.dataformat.smile.SmileGenerator; import com.fasterxml.jackson.dataformat.smile.SmileGenerator;
import org.apache.druid.guice.BuiltInTypesModule; import org.apache.druid.guice.BuiltInTypesModule;
import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.io.smoosh.FileSmoosher; import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
import org.apache.druid.java.util.common.io.smoosh.SmooshedWriter; import org.apache.druid.segment.data.VByte;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.ByteOrder;
public class ColumnSerializerUtils public class ColumnSerializerUtils
{ {
@ -59,23 +61,33 @@ public class ColumnSerializerUtils
public static void writeInternal(FileSmoosher smoosher, Serializer serializer, String columnName, String fileName) public static void writeInternal(FileSmoosher smoosher, Serializer serializer, String columnName, String fileName)
throws IOException throws IOException
{ {
final String internalName = getInternalFileName(columnName, fileName); smoosher.serializeAs(getInternalFileName(columnName, fileName), serializer);
try (SmooshedWriter smooshChannel = smoosher.addWithSmooshedWriter(internalName, serializer.getSerializedSize())) {
serializer.writeTo(smooshChannel, smoosher);
}
} }
public static void writeInternal(FileSmoosher smoosher, ByteBuffer buffer, String columnName, String fileName) public static void writeInternal(FileSmoosher smoosher, ByteBuffer buffer, String columnName, String fileName)
throws IOException throws IOException
{ {
final String internalName = getInternalFileName(columnName, fileName); smoosher.add(getInternalFileName(columnName, fileName), buffer);
try (SmooshedWriter smooshChannel = smoosher.addWithSmooshedWriter(internalName, buffer.capacity())) {
smooshChannel.write(buffer);
}
} }
public static String getInternalFileName(String fileNameBase, String field) public static String getInternalFileName(String fileNameBase, String field)
{ {
return fileNameBase + "." + field; return fileNameBase + "." + field;
} }
/**
* Convert a String to a ByteBuffer with a variable size length prepended to it.
* @param stringVal the value to store in the ByteBuffer
* @return ByteBuffer with the string converted to utf8 bytes and stored with a variable size length int prepended
*/
public static ByteBuffer stringToUtf8InVSizeByteBuffer(String stringVal)
{
final byte[] bytes = StringUtils.toUtf8(stringVal);
final int length = VByte.computeIntSize(bytes.length);
final ByteBuffer buffer = ByteBuffer.allocate(length + bytes.length).order(ByteOrder.nativeOrder());
VByte.writeInt(buffer, bytes.length);
buffer.put(bytes);
buffer.flip();
return buffer;
}
} }

View File

@ -281,7 +281,7 @@ public class NestedCommonFormatColumnPartSerde implements ColumnPartSerde
byteOrder, byteOrder,
bitmapSerdeFactory, bitmapSerdeFactory,
buffer, buffer,
builder, builder.getFileMapper(),
parent == null ? null : (VariantColumnAndIndexSupplier) parent.getColumnSupplier() parent == null ? null : (VariantColumnAndIndexSupplier) parent.getColumnSupplier()
); );
ColumnCapabilitiesImpl capabilitiesBuilder = builder.getCapabilitiesBuilder(); ColumnCapabilitiesImpl capabilitiesBuilder = builder.getCapabilitiesBuilder();

View File

@ -253,9 +253,7 @@ public class CompressedColumnarIntsSerializerTest
); );
writer.open(); writer.open();
for (int val : vals) { writer.addValues(vals, 0, vals.length);
writer.addValue(val);
}
final SmooshedWriter channel = smoosher.addWithSmooshedWriter("test", writer.getSerializedSize()); final SmooshedWriter channel = smoosher.addWithSmooshedWriter("test", writer.getSerializedSize());
writer.writeTo(channel, smoosher); writer.writeTo(channel, smoosher);
channel.close(); channel.close();

View File

@ -172,9 +172,7 @@ public class CompressedDoublesSerdeTest
); );
serializer.open(); serializer.open();
for (double value : values) { serializer.addAll(values, 0, values.length);
serializer.add(value);
}
Assert.assertEquals(values.length, serializer.size()); Assert.assertEquals(values.length, serializer.size());
final ByteArrayOutputStream baos = new ByteArrayOutputStream(); final ByteArrayOutputStream baos = new ByteArrayOutputStream();

View File

@ -317,7 +317,7 @@ public class VariantColumnSupplierTest extends InitializedNullHandlingTest
ByteOrder.nativeOrder(), ByteOrder.nativeOrder(),
bitmapSerdeFactory, bitmapSerdeFactory,
baseBuffer, baseBuffer,
bob, bob.getFileMapper(),
null null
); );
try (VariantColumn<?> column = (VariantColumn<?>) supplier.get()) { try (VariantColumn<?> column = (VariantColumn<?>) supplier.get()) {
@ -336,7 +336,7 @@ public class VariantColumnSupplierTest extends InitializedNullHandlingTest
ByteOrder.nativeOrder(), ByteOrder.nativeOrder(),
bitmapSerdeFactory, bitmapSerdeFactory,
baseBuffer, baseBuffer,
bob, bob.getFileMapper(),
null null
); );
final String expectedReason = "none"; final String expectedReason = "none";