reduce heap footprint of ingesting auto typed columns by pushing compression and index generation into writeTo (#14615)

This commit is contained in:
Clint Wylie 2023-07-20 00:54:58 -07:00 committed by GitHub
parent c2e6758580
commit 024ce40f1a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 623 additions and 571 deletions

View File

@ -99,11 +99,11 @@ public class EqualityFilter extends AbstractOptimizableDimFilter implements Filt
throw InvalidInput.exception("Invalid equality filter on column [%s], matchValueType cannot be null", column); throw InvalidInput.exception("Invalid equality filter on column [%s], matchValueType cannot be null", column);
} }
this.matchValueType = matchValueType; this.matchValueType = matchValueType;
if (matchValue == null) {
throw InvalidInput.exception("Invalid equality filter on column [%s], matchValue cannot be null", column);
}
this.matchValue = matchValue; this.matchValue = matchValue;
this.matchValueEval = ExprEval.ofType(ExpressionType.fromColumnTypeStrict(matchValueType), matchValue); this.matchValueEval = ExprEval.ofType(ExpressionType.fromColumnTypeStrict(matchValueType), matchValue);
if (matchValueEval.value() == null) {
throw InvalidInput.exception("Invalid equality filter on column [%s], matchValue cannot be null", column);
}
this.filterTuning = filterTuning; this.filterTuning = filterTuning;
this.predicateFactory = new EqualityPredicateFactory(matchValueEval); this.predicateFactory = new EqualityPredicateFactory(matchValueEval);
} }
@ -239,6 +239,8 @@ public class EqualityFilter extends AbstractOptimizableDimFilter implements Filt
final ValueIndexes valueIndexes = indexSupplier.as(ValueIndexes.class); final ValueIndexes valueIndexes = indexSupplier.as(ValueIndexes.class);
if (valueIndexes != null) { if (valueIndexes != null) {
// matchValueEval.value() cannot be null here due to check in the constructor
//noinspection DataFlowIssue
return valueIndexes.forValue(matchValueEval.value(), matchValueType); return valueIndexes.forValue(matchValueEval.value(), matchValueType);
} }

View File

@ -24,7 +24,6 @@ import org.apache.druid.io.Channels;
import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.io.smoosh.FileSmoosher; import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
import org.apache.druid.segment.column.TypeStrategy; import org.apache.druid.segment.column.TypeStrategy;
import org.apache.druid.segment.serde.Serializer;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium; import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import org.apache.druid.segment.writeout.WriteOutBytes; import org.apache.druid.segment.writeout.WriteOutBytes;
@ -39,7 +38,7 @@ import java.util.Iterator;
/** /**
* Writer for a {@link FixedIndexed} * Writer for a {@link FixedIndexed}
*/ */
public class FixedIndexedWriter<T> implements Serializer public class FixedIndexedWriter<T> implements DictionaryWriter<T>
{ {
private static final int PAGE_SIZE = 4096; private static final int PAGE_SIZE = 4096;
private final SegmentWriteOutMedium segmentWriteOutMedium; private final SegmentWriteOutMedium segmentWriteOutMedium;
@ -73,11 +72,19 @@ public class FixedIndexedWriter<T> implements Serializer
this.isSorted = isSorted; this.isSorted = isSorted;
} }
@Override
public boolean isSorted()
{
return isSorted;
}
@Override
public void open() throws IOException public void open() throws IOException
{ {
this.valuesOut = segmentWriteOutMedium.makeWriteOutBytes(); this.valuesOut = segmentWriteOutMedium.makeWriteOutBytes();
} }
@Override
public int getCardinality() public int getCardinality()
{ {
return hasNulls ? numWritten + 1 : numWritten; return hasNulls ? numWritten + 1 : numWritten;
@ -89,6 +96,7 @@ public class FixedIndexedWriter<T> implements Serializer
return Byte.BYTES + Byte.BYTES + Integer.BYTES + valuesOut.size(); return Byte.BYTES + Byte.BYTES + Integer.BYTES + valuesOut.size();
} }
@Override
public void write(@Nullable T objectToWrite) throws IOException public void write(@Nullable T objectToWrite) throws IOException
{ {
if (prevObject != null && isSorted && comparator.compare(prevObject, objectToWrite) >= 0) { if (prevObject != null && isSorted && comparator.compare(prevObject, objectToWrite) >= 0) {
@ -140,6 +148,7 @@ public class FixedIndexedWriter<T> implements Serializer
} }
@SuppressWarnings("unused") @SuppressWarnings("unused")
@Override
@Nullable @Nullable
public T get(int index) throws IOException public T get(int index) throws IOException
{ {

View File

@ -19,6 +19,7 @@
package org.apache.druid.segment.index; package org.apache.druid.segment.index;
import org.apache.druid.collections.bitmap.BitmapFactory;
import org.apache.druid.query.BitmapResultFactory; import org.apache.druid.query.BitmapResultFactory;
import org.apache.druid.query.filter.ColumnIndexSelector; import org.apache.druid.query.filter.ColumnIndexSelector;
import org.apache.druid.segment.column.ColumnIndexCapabilities; import org.apache.druid.segment.column.ColumnIndexCapabilities;
@ -26,11 +27,16 @@ import org.apache.druid.segment.column.SimpleColumnIndexCapabilities;
public class AllFalseBitmapColumnIndex implements BitmapColumnIndex public class AllFalseBitmapColumnIndex implements BitmapColumnIndex
{ {
private final ColumnIndexSelector selector; private final BitmapFactory bitmapFactory;
public AllFalseBitmapColumnIndex(ColumnIndexSelector indexSelector) public AllFalseBitmapColumnIndex(ColumnIndexSelector indexSelector)
{ {
this.selector = indexSelector; this(indexSelector.getBitmapFactory());
}
public AllFalseBitmapColumnIndex(BitmapFactory bitmapFactory)
{
this.bitmapFactory = bitmapFactory;
} }
@Override @Override
@ -48,6 +54,6 @@ public class AllFalseBitmapColumnIndex implements BitmapColumnIndex
@Override @Override
public <T> T computeBitmapResult(BitmapResultFactory<T> bitmapResultFactory) public <T> T computeBitmapResult(BitmapResultFactory<T> bitmapResultFactory)
{ {
return bitmapResultFactory.wrapAllFalse(selector.getBitmapFactory().makeEmptyImmutableBitmap()); return bitmapResultFactory.wrapAllFalse(bitmapFactory.makeEmptyImmutableBitmap());
} }
} }

View File

@ -37,6 +37,7 @@ import org.apache.druid.segment.index.semantic.StringValueSetIndexes;
import org.apache.druid.segment.index.semantic.Utf8ValueSetIndexes; import org.apache.druid.segment.index.semantic.Utf8ValueSetIndexes;
import org.apache.druid.segment.index.semantic.ValueIndexes; import org.apache.druid.segment.index.semantic.ValueIndexes;
import javax.annotation.Nonnull;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Comparator; import java.util.Comparator;
@ -101,7 +102,7 @@ public final class IndexedUtf8ValueIndexes<TDictionary extends Indexed<ByteBuffe
@Nullable @Nullable
@Override @Override
public BitmapColumnIndex forValue(Object value, TypeSignature<ValueType> valueType) public BitmapColumnIndex forValue(@Nonnull Object value, TypeSignature<ValueType> valueType)
{ {
if (valueType.isPrimitive()) { if (valueType.isPrimitive()) {
return forValue( return forValue(

View File

@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.index.semantic;
import org.apache.druid.collections.bitmap.ImmutableBitmap;
import org.apache.druid.segment.column.TypeSignature;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.index.BitmapColumnIndex;
import javax.annotation.Nullable;
public interface ArrayElementIndexes
{
/**
* Get the {@link ImmutableBitmap} corresponding to rows with array elements matching the supplied value. Generates
* an empty bitmap when passed a value that doesn't exist in any array. May return null if a value index cannot be
* computed for the supplied value type.
*
* @param value value to match against any array element in a row
* @param valueType type of the value to match, used to assist conversion from the match value type to the column
* value type
* @return {@link ImmutableBitmap} corresponding to the rows with array elements which match the value, or
* null if an index connot be computed for the supplied value type
*/
@Nullable
BitmapColumnIndex containsValue(@Nullable Object value, TypeSignature<ValueType> valueType);
}

View File

@ -24,14 +24,17 @@ import org.apache.druid.segment.column.TypeSignature;
import org.apache.druid.segment.column.ValueType; import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.index.BitmapColumnIndex; import org.apache.druid.segment.index.BitmapColumnIndex;
import javax.annotation.Nonnull;
import javax.annotation.Nullable; import javax.annotation.Nullable;
public interface ValueIndexes public interface ValueIndexes
{ {
/** /**
* Get the {@link ImmutableBitmap} corresponding to the supplied value. Generates an empty bitmap when passed a * Get the {@link ImmutableBitmap} corresponding to rows matching the supplied value. Generates an empty bitmap when
* value that doesn't exist. May return null if a value index cannot be computed for the supplied value type. * passed a value that doesn't exist. May return null if a value index cannot be computed for the supplied value type.
*
* Does not match null, use {@link NullValueIndex} for matching nulls.
* *
* @param value value to match * @param value value to match
* @param valueType type of the value to match, used to assist conversion from the match value type to the column * @param valueType type of the value to match, used to assist conversion from the match value type to the column
@ -40,5 +43,5 @@ public interface ValueIndexes
* connot be computed for the supplied value type * connot be computed for the supplied value type
*/ */
@Nullable @Nullable
BitmapColumnIndex forValue(Object value, TypeSignature<ValueType> valueType); BitmapColumnIndex forValue(@Nonnull Object value, TypeSignature<ValueType> valueType);
} }

View File

@ -65,6 +65,7 @@ import org.apache.druid.segment.index.semantic.StringValueSetIndexes;
import org.apache.druid.segment.index.semantic.ValueIndexes; import org.apache.druid.segment.index.semantic.ValueIndexes;
import org.apache.druid.segment.serde.NestedCommonFormatColumnPartSerde; import org.apache.druid.segment.serde.NestedCommonFormatColumnPartSerde;
import javax.annotation.Nonnull;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
@ -228,11 +229,12 @@ public class ScalarDoubleColumnAndIndexSupplier implements Supplier<NestedCommon
{ {
@Nullable @Nullable
@Override @Override
public BitmapColumnIndex forValue(Object value, TypeSignature<ValueType> valueType) public BitmapColumnIndex forValue(@Nonnull Object value, TypeSignature<ValueType> valueType)
{ {
final ExprEval<?> eval = ExprEval.ofType(ExpressionType.fromColumnTypeStrict(valueType), value) final ExprEval<?> eval = ExprEval.ofType(ExpressionType.fromColumnTypeStrict(valueType), value)
.castTo(ExpressionType.DOUBLE); .castTo(ExpressionType.DOUBLE);
if (eval.isNumericNull()) { if (eval.isNumericNull()) {
// value wasn't null, but not a number?
return null; return null;
} }
final double doubleValue = eval.asDouble(); final double doubleValue = eval.asDouble();

View File

@ -19,55 +19,28 @@
package org.apache.druid.segment.nested; package org.apache.druid.segment.nested;
import com.google.common.base.Preconditions;
import org.apache.druid.collections.bitmap.ImmutableBitmap;
import org.apache.druid.collections.bitmap.MutableBitmap;
import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.io.smoosh.FileSmoosher; import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.math.expr.ExprEval; import org.apache.druid.math.expr.ExprEval;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.data.ColumnarDoublesSerializer; import org.apache.druid.segment.data.ColumnarDoublesSerializer;
import org.apache.druid.segment.data.CompressedVSizeColumnarIntsSerializer;
import org.apache.druid.segment.data.CompressionFactory; import org.apache.druid.segment.data.CompressionFactory;
import org.apache.druid.segment.data.CompressionStrategy;
import org.apache.druid.segment.data.FixedIndexedWriter; import org.apache.druid.segment.data.FixedIndexedWriter;
import org.apache.druid.segment.data.GenericIndexedWriter;
import org.apache.druid.segment.data.SingleValueColumnarIntsSerializer;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium; import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import javax.annotation.Nullable;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder; import java.nio.ByteOrder;
import java.nio.channels.WritableByteChannel;
/** /**
* Serializer for a {@link ScalarDoubleColumn} * Serializer for a {@link ScalarDoubleColumn}
*/ */
public class ScalarDoubleColumnSerializer extends NestedCommonFormatColumnSerializer public class ScalarDoubleColumnSerializer extends ScalarNestedCommonFormatColumnSerializer<Double>
{ {
private static final Logger log = new Logger(ScalarDoubleColumnSerializer.class);
private final String name;
private final SegmentWriteOutMedium segmentWriteOutMedium;
private final IndexSpec indexSpec;
@SuppressWarnings("unused")
private final Closer closer;
private DictionaryIdLookup dictionaryIdLookup;
private FixedIndexedWriter<Double> doubleDictionaryWriter;
private int rowCount = 0;
private boolean closedForWrite = false;
private boolean dictionarySerialized = false;
private SingleValueColumnarIntsSerializer encodedValueSerializer;
private ColumnarDoublesSerializer doublesSerializer; private ColumnarDoublesSerializer doublesSerializer;
private GenericIndexedWriter<ImmutableBitmap> bitmapIndexWriter;
private MutableBitmap[] bitmaps;
private ByteBuffer columnNameBytes = null;
public ScalarDoubleColumnSerializer( public ScalarDoubleColumnSerializer(
String name, String name,
@ -76,54 +49,35 @@ public class ScalarDoubleColumnSerializer extends NestedCommonFormatColumnSerial
Closer closer Closer closer
) )
{ {
this.name = name; super(name, DOUBLE_DICTIONARY_FILE_NAME, indexSpec, segmentWriteOutMedium, closer);
this.segmentWriteOutMedium = segmentWriteOutMedium;
this.indexSpec = indexSpec;
this.closer = closer;
this.dictionaryIdLookup = new DictionaryIdLookup();
} }
@Override @Override
public String getColumnName() protected int processValue(@Nullable Object rawValue) throws IOException
{ {
return name; final ExprEval<?> eval = ExprEval.bestEffortOf(rawValue);
final double val = eval.asDouble();
final int dictId = eval.isNumericNull() ? 0 : dictionaryIdLookup.lookupDouble(val);
doublesSerializer.add(dictId == 0 ? 0.0 : val);
return dictId;
} }
@Override @Override
public DictionaryIdLookup getGlobalLookup() public void openDictionaryWriter() throws IOException
{ {
return dictionaryIdLookup; dictionaryWriter = new FixedIndexedWriter<>(
}
@Override
public boolean hasNulls()
{
return !bitmaps[0].isEmpty();
}
@Override
public void open() throws IOException
{
if (!dictionarySerialized) {
throw new IllegalStateException("Dictionary not serialized, cannot open value serializer");
}
String filenameBase = StringUtils.format("%s.forward_dim", name);
final CompressionStrategy compression = indexSpec.getDimensionCompression();
final CompressionStrategy compressionToUse;
if (compression != CompressionStrategy.UNCOMPRESSED && compression != CompressionStrategy.NONE) {
compressionToUse = compression;
} else {
compressionToUse = CompressionStrategy.LZ4;
}
encodedValueSerializer = CompressedVSizeColumnarIntsSerializer.create(
name,
segmentWriteOutMedium, segmentWriteOutMedium,
filenameBase, ColumnType.DOUBLE.getStrategy(),
doubleDictionaryWriter.getCardinality(), ByteOrder.nativeOrder(),
compressionToUse Long.BYTES,
true
); );
encodedValueSerializer.open(); dictionaryWriter.open();
}
@Override
protected void openValueColumnSerializer() throws IOException
{
doublesSerializer = CompressionFactory.getDoubleSerializer( doublesSerializer = CompressionFactory.getDoubleSerializer(
name, name,
segmentWriteOutMedium, segmentWriteOutMedium,
@ -132,31 +86,6 @@ public class ScalarDoubleColumnSerializer extends NestedCommonFormatColumnSerial
indexSpec.getDimensionCompression() indexSpec.getDimensionCompression()
); );
doublesSerializer.open(); doublesSerializer.open();
bitmapIndexWriter = new GenericIndexedWriter<>(
segmentWriteOutMedium,
name,
indexSpec.getBitmapSerdeFactory().getObjectStrategy()
);
bitmapIndexWriter.open();
bitmapIndexWriter.setObjectsNotSorted();
bitmaps = new MutableBitmap[doubleDictionaryWriter.getCardinality()];
for (int i = 0; i < bitmaps.length; i++) {
bitmaps[i] = indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap();
}
}
@Override
public void openDictionaryWriter() throws IOException
{
doubleDictionaryWriter = new FixedIndexedWriter<>(
segmentWriteOutMedium,
ColumnType.DOUBLE.getStrategy(),
ByteOrder.nativeOrder(),
Long.BYTES,
true
);
doubleDictionaryWriter.open();
} }
@Override @Override
@ -168,81 +97,26 @@ public class ScalarDoubleColumnSerializer extends NestedCommonFormatColumnSerial
) throws IOException ) throws IOException
{ {
if (dictionarySerialized) { if (dictionarySerialized) {
throw new ISE("String dictionary already serialized for column [%s], cannot serialize again", name); throw new ISE("Double dictionary already serialized for column [%s], cannot serialize again", name);
} }
// null is always 0 // null is always 0
doubleDictionaryWriter.write(null); dictionaryWriter.write(null);
dictionaryIdLookup.addNumericNull(); dictionaryIdLookup.addNumericNull();
for (Double value : doubles) { for (Double value : doubles) {
if (value == null) { if (value == null) {
continue; continue;
} }
doubleDictionaryWriter.write(value); dictionaryWriter.write(value);
dictionaryIdLookup.addDouble(value); dictionaryIdLookup.addDouble(value);
} }
dictionarySerialized = true; dictionarySerialized = true;
} }
@Override @Override
public void serialize(ColumnValueSelector<? extends StructuredData> selector) throws IOException protected void writeValueColumn(FileSmoosher smoosher) throws IOException
{ {
if (!dictionarySerialized) {
throw new ISE("Must serialize value dictionaries before serializing values for column [%s]", name);
}
final Object value = StructuredData.unwrap(selector.getObject());
final ExprEval<?> eval = ExprEval.bestEffortOf(value);
final double val = eval.asDouble();
final int dictId = eval.isNumericNull() ? 0 : dictionaryIdLookup.lookupDouble(val);
encodedValueSerializer.addValue(dictId);
doublesSerializer.add(dictId == 0 ? 0.0 : val);
bitmaps[dictId].add(rowCount);
rowCount++;
}
private void closeForWrite() throws IOException
{
if (!closedForWrite) {
for (int i = 0; i < bitmaps.length; i++) {
final MutableBitmap bitmap = bitmaps[i];
bitmapIndexWriter.write(
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeImmutableBitmap(bitmap)
);
bitmaps[i] = null; // Reclaim memory
}
columnNameBytes = computeFilenameBytes();
closedForWrite = true;
}
}
@Override
public long getSerializedSize() throws IOException
{
closeForWrite();
long size = 1 + columnNameBytes.capacity();
// the value dictionaries, raw column, and null index are all stored in separate files
return size;
}
@Override
public void writeTo(
WritableByteChannel channel,
FileSmoosher smoosher
) throws IOException
{
Preconditions.checkState(closedForWrite, "Not closed yet!");
writeV0Header(channel, columnNameBytes);
writeInternal(smoosher, doubleDictionaryWriter, DOUBLE_DICTIONARY_FILE_NAME);
writeInternal(smoosher, encodedValueSerializer, ENCODED_VALUE_COLUMN_FILE_NAME);
writeInternal(smoosher, doublesSerializer, DOUBLE_VALUE_COLUMN_FILE_NAME); writeInternal(smoosher, doublesSerializer, DOUBLE_VALUE_COLUMN_FILE_NAME);
writeInternal(smoosher, bitmapIndexWriter, BITMAP_INDEX_FILE_NAME);
log.info("Column [%s] serialized successfully.", name);
} }
} }

View File

@ -64,6 +64,7 @@ import org.apache.druid.segment.index.semantic.StringValueSetIndexes;
import org.apache.druid.segment.index.semantic.ValueIndexes; import org.apache.druid.segment.index.semantic.ValueIndexes;
import org.apache.druid.segment.serde.NestedCommonFormatColumnPartSerde; import org.apache.druid.segment.serde.NestedCommonFormatColumnPartSerde;
import javax.annotation.Nonnull;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
@ -228,11 +229,13 @@ public class ScalarLongColumnAndIndexSupplier implements Supplier<NestedCommonFo
{ {
@Nullable @Nullable
@Override @Override
public BitmapColumnIndex forValue(Object value, TypeSignature<ValueType> valueType) public BitmapColumnIndex forValue(@Nonnull Object value, TypeSignature<ValueType> valueType)
{ {
final ExprEval<?> eval = ExprEval.ofType(ExpressionType.fromColumnTypeStrict(valueType), value) final ExprEval<?> eval = ExprEval.ofType(ExpressionType.fromColumnTypeStrict(valueType), value)
.castTo(ExpressionType.LONG); .castTo(ExpressionType.LONG);
if (eval.isNumericNull()) { if (eval.isNumericNull()) {
// value wasn't null, but not a number
return null; return null;
} }
final long longValue = eval.asLong(); final long longValue = eval.asLong();

View File

@ -19,55 +19,28 @@
package org.apache.druid.segment.nested; package org.apache.druid.segment.nested;
import com.google.common.base.Preconditions;
import org.apache.druid.collections.bitmap.ImmutableBitmap;
import org.apache.druid.collections.bitmap.MutableBitmap;
import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.io.smoosh.FileSmoosher; import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.math.expr.ExprEval; import org.apache.druid.math.expr.ExprEval;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.data.ColumnarLongsSerializer; import org.apache.druid.segment.data.ColumnarLongsSerializer;
import org.apache.druid.segment.data.CompressedVSizeColumnarIntsSerializer;
import org.apache.druid.segment.data.CompressionFactory; import org.apache.druid.segment.data.CompressionFactory;
import org.apache.druid.segment.data.CompressionStrategy;
import org.apache.druid.segment.data.FixedIndexedWriter; import org.apache.druid.segment.data.FixedIndexedWriter;
import org.apache.druid.segment.data.GenericIndexedWriter;
import org.apache.druid.segment.data.SingleValueColumnarIntsSerializer;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium; import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import javax.annotation.Nullable;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder; import java.nio.ByteOrder;
import java.nio.channels.WritableByteChannel;
/** /**
* Serializer for a {@link ScalarLongColumn} * Serializer for a {@link ScalarLongColumn}
*/ */
public class ScalarLongColumnSerializer extends NestedCommonFormatColumnSerializer public class ScalarLongColumnSerializer extends ScalarNestedCommonFormatColumnSerializer<Long>
{ {
private static final Logger log = new Logger(ScalarLongColumnSerializer.class);
private final String name;
private final SegmentWriteOutMedium segmentWriteOutMedium;
private final IndexSpec indexSpec;
@SuppressWarnings("unused")
private final Closer closer;
private DictionaryIdLookup dictionaryIdLookup;
private FixedIndexedWriter<Long> longDictionaryWriter;
private int rowCount = 0;
private boolean closedForWrite = false;
private boolean dictionarySerialized = false;
private SingleValueColumnarIntsSerializer encodedValueSerializer;
private ColumnarLongsSerializer longsSerializer; private ColumnarLongsSerializer longsSerializer;
private GenericIndexedWriter<ImmutableBitmap> bitmapIndexWriter;
private MutableBitmap[] bitmaps;
private ByteBuffer columnNameBytes = null;
public ScalarLongColumnSerializer( public ScalarLongColumnSerializer(
String name, String name,
@ -76,54 +49,36 @@ public class ScalarLongColumnSerializer extends NestedCommonFormatColumnSerializ
Closer closer Closer closer
) )
{ {
this.name = name; super(name, LONG_DICTIONARY_FILE_NAME, indexSpec, segmentWriteOutMedium, closer);
this.segmentWriteOutMedium = segmentWriteOutMedium;
this.indexSpec = indexSpec;
this.closer = closer;
this.dictionaryIdLookup = new DictionaryIdLookup();
} }
@Override @Override
public String getColumnName() protected int processValue(@Nullable Object rawValue) throws IOException
{ {
return name; final ExprEval<?> eval = ExprEval.bestEffortOf(rawValue);
final long val = eval.asLong();
final int dictId = eval.isNumericNull() ? 0 : dictionaryIdLookup.lookupLong(val);
longsSerializer.add(dictId == 0 ? 0L : val);
return dictId;
} }
@Override @Override
public DictionaryIdLookup getGlobalLookup() public void openDictionaryWriter() throws IOException
{ {
return dictionaryIdLookup; dictionaryWriter = new FixedIndexedWriter<>(
}
@Override
public boolean hasNulls()
{
return !bitmaps[0].isEmpty();
}
@Override
public void open() throws IOException
{
if (!dictionarySerialized) {
throw new IllegalStateException("Dictionary not serialized, cannot open value serializer");
}
String filenameBase = StringUtils.format("%s.forward_dim", name);
final CompressionStrategy compression = indexSpec.getDimensionCompression();
final CompressionStrategy compressionToUse;
if (compression != CompressionStrategy.UNCOMPRESSED && compression != CompressionStrategy.NONE) {
compressionToUse = compression;
} else {
compressionToUse = CompressionStrategy.LZ4;
}
encodedValueSerializer = CompressedVSizeColumnarIntsSerializer.create(
name,
segmentWriteOutMedium, segmentWriteOutMedium,
filenameBase, ColumnType.LONG.getStrategy(),
longDictionaryWriter.getCardinality(), ByteOrder.nativeOrder(),
compressionToUse Long.BYTES,
true
); );
encodedValueSerializer.open(); dictionaryWriter.open();
}
@Override
protected void openValueColumnSerializer() throws IOException
{
longsSerializer = CompressionFactory.getLongSerializer( longsSerializer = CompressionFactory.getLongSerializer(
name, name,
segmentWriteOutMedium, segmentWriteOutMedium,
@ -133,33 +88,7 @@ public class ScalarLongColumnSerializer extends NestedCommonFormatColumnSerializ
indexSpec.getDimensionCompression() indexSpec.getDimensionCompression()
); );
longsSerializer.open(); longsSerializer.open();
bitmapIndexWriter = new GenericIndexedWriter<>(
segmentWriteOutMedium,
name,
indexSpec.getBitmapSerdeFactory().getObjectStrategy()
);
bitmapIndexWriter.open();
bitmapIndexWriter.setObjectsNotSorted();
bitmaps = new MutableBitmap[longDictionaryWriter.getCardinality()];
for (int i = 0; i < bitmaps.length; i++) {
bitmaps[i] = indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap();
} }
}
@Override
public void openDictionaryWriter() throws IOException
{
longDictionaryWriter = new FixedIndexedWriter<>(
segmentWriteOutMedium,
ColumnType.LONG.getStrategy(),
ByteOrder.nativeOrder(),
Long.BYTES,
true
);
longDictionaryWriter.open();
}
@Override @Override
public void serializeDictionaries( public void serializeDictionaries(
@ -170,81 +99,26 @@ public class ScalarLongColumnSerializer extends NestedCommonFormatColumnSerializ
) throws IOException ) throws IOException
{ {
if (dictionarySerialized) { if (dictionarySerialized) {
throw new ISE("String dictionary already serialized for column [%s], cannot serialize again", name); throw new ISE("Long dictionary already serialized for column [%s], cannot serialize again", name);
} }
// null is always 0 // null is always 0
longDictionaryWriter.write(null); dictionaryWriter.write(null);
dictionaryIdLookup.addNumericNull(); dictionaryIdLookup.addNumericNull();
for (Long value : longs) { for (Long value : longs) {
if (value == null) { if (value == null) {
continue; continue;
} }
longDictionaryWriter.write(value); dictionaryWriter.write(value);
dictionaryIdLookup.addLong(value); dictionaryIdLookup.addLong(value);
} }
dictionarySerialized = true; dictionarySerialized = true;
} }
@Override @Override
public void serialize(ColumnValueSelector<? extends StructuredData> selector) throws IOException protected void writeValueColumn(FileSmoosher smoosher) throws IOException
{ {
if (!dictionarySerialized) {
throw new ISE("Must serialize value dictionaries before serializing values for column [%s]", name);
}
final Object value = StructuredData.unwrap(selector.getObject());
final ExprEval<?> eval = ExprEval.bestEffortOf(value);
final long val = eval.asLong();
final int dictId = eval.isNumericNull() ? 0 : dictionaryIdLookup.lookupLong(val);
encodedValueSerializer.addValue(dictId);
longsSerializer.add(dictId == 0 ? 0L : val);
bitmaps[dictId].add(rowCount);
rowCount++;
}
private void closeForWrite() throws IOException
{
if (!closedForWrite) {
for (int i = 0; i < bitmaps.length; i++) {
final MutableBitmap bitmap = bitmaps[i];
bitmapIndexWriter.write(
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeImmutableBitmap(bitmap)
);
bitmaps[i] = null; // Reclaim memory
}
columnNameBytes = computeFilenameBytes();
closedForWrite = true;
}
}
@Override
public long getSerializedSize() throws IOException
{
closeForWrite();
long size = 1 + columnNameBytes.capacity();
// the value dictionaries, raw column, and null index are all stored in separate files
return size;
}
@Override
public void writeTo(
WritableByteChannel channel,
FileSmoosher smoosher
) throws IOException
{
Preconditions.checkState(closedForWrite, "Not closed yet!");
writeV0Header(channel, columnNameBytes);
writeInternal(smoosher, longDictionaryWriter, LONG_DICTIONARY_FILE_NAME);
writeInternal(smoosher, encodedValueSerializer, ENCODED_VALUE_COLUMN_FILE_NAME);
writeInternal(smoosher, longsSerializer, LONG_VALUE_COLUMN_FILE_NAME); writeInternal(smoosher, longsSerializer, LONG_VALUE_COLUMN_FILE_NAME);
writeInternal(smoosher, bitmapIndexWriter, BITMAP_INDEX_FILE_NAME);
log.info("Column [%s] serialized successfully.", name);
} }
} }

View File

@ -0,0 +1,230 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.nested;
import com.google.common.base.Preconditions;
import it.unimi.dsi.fastutil.ints.IntIterator;
import org.apache.druid.collections.bitmap.ImmutableBitmap;
import org.apache.druid.collections.bitmap.MutableBitmap;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.data.CompressedVSizeColumnarIntsSerializer;
import org.apache.druid.segment.data.CompressionStrategy;
import org.apache.druid.segment.data.DictionaryWriter;
import org.apache.druid.segment.data.FixedIndexedIntWriter;
import org.apache.druid.segment.data.GenericIndexedWriter;
import org.apache.druid.segment.data.SingleValueColumnarIntsSerializer;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import javax.annotation.Nullable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
public abstract class ScalarNestedCommonFormatColumnSerializer<T> extends NestedCommonFormatColumnSerializer
{
protected static final Logger log = new Logger(ScalarNestedCommonFormatColumnSerializer.class);
protected final String name;
protected final SegmentWriteOutMedium segmentWriteOutMedium;
protected final IndexSpec indexSpec;
@SuppressWarnings("unused")
protected final Closer closer;
protected final String dictionaryFileName;
protected DictionaryIdLookup dictionaryIdLookup;
protected DictionaryWriter<T> dictionaryWriter;
protected boolean closedForWrite = false;
protected boolean dictionarySerialized = false;
protected FixedIndexedIntWriter intermediateValueWriter;
protected ByteBuffer columnNameBytes = null;
protected boolean hasNulls;
public ScalarNestedCommonFormatColumnSerializer(
String name,
String dictionaryFileName,
IndexSpec indexSpec,
SegmentWriteOutMedium segmentWriteOutMedium,
Closer closer
)
{
this.name = name;
this.dictionaryFileName = dictionaryFileName;
this.segmentWriteOutMedium = segmentWriteOutMedium;
this.indexSpec = indexSpec;
this.closer = closer;
this.dictionaryIdLookup = new DictionaryIdLookup();
}
/**
* Called during {@link #serialize(ColumnValueSelector)} to convert value to dictionary id.
* <p>
* Implementations may optionally also serialize the value to a type specific value column if they opened one with
* {@link #openValueColumnSerializer()}, or do whatever else is useful to do while handling a single row value.
*/
protected abstract int processValue(@Nullable Object rawValue) throws IOException;
/**
* Called during {@link #open()} to allow opening any separate type specific value column serializers
*/
protected abstract void openValueColumnSerializer() throws IOException;
/**
* Called during {@link #writeTo(WritableByteChannel, FileSmoosher)} to allow any type specific value column
* serializers to use the {@link FileSmoosher} to write stuff to places.
*/
protected abstract void writeValueColumn(FileSmoosher smoosher) throws IOException;
@Override
public String getColumnName()
{
return name;
}
@Override
public DictionaryIdLookup getGlobalLookup()
{
return dictionaryIdLookup;
}
@Override
public boolean hasNulls()
{
return hasNulls;
}
@Override
public void open() throws IOException
{
if (!dictionarySerialized) {
throw new IllegalStateException("Dictionary not serialized, cannot open value serializer");
}
intermediateValueWriter = new FixedIndexedIntWriter(segmentWriteOutMedium, false);
intermediateValueWriter.open();
openValueColumnSerializer();
}
@Override
public void serialize(ColumnValueSelector<? extends StructuredData> selector) throws IOException
{
if (!dictionarySerialized) {
throw new ISE("Must serialize value dictionaries before serializing values for column [%s]", name);
}
final Object value = StructuredData.unwrap(selector.getObject());
final int dictId = processValue(value);
intermediateValueWriter.write(dictId);
hasNulls = hasNulls || dictId == 0;
}
private void closeForWrite()
{
if (!closedForWrite) {
columnNameBytes = computeFilenameBytes();
closedForWrite = true;
}
}
@Override
public long getSerializedSize()
{
closeForWrite();
// standard string version
long size = 1 + columnNameBytes.capacity();
// the value dictionaries, raw column, and null index are all stored in separate files
return size;
}
@Override
public void writeTo(
WritableByteChannel channel,
FileSmoosher smoosher
) throws IOException
{
Preconditions.checkState(closedForWrite, "Not closed yet!");
Preconditions.checkArgument(dictionaryWriter.isSorted(), "Dictionary not sorted?!?");
// write out compressed dictionaryId int column and bitmap indexes by iterating intermediate value column
// the intermediate value column should be replaced someday by a cooler compressed int column writer that allows
// easy iteration of the values it writes out, so that we could just build the bitmap indexes here instead of
// doing both things
String filenameBase = StringUtils.format("%s.forward_dim", name);
final CompressionStrategy compression = indexSpec.getDimensionCompression();
final CompressionStrategy compressionToUse;
if (compression != CompressionStrategy.UNCOMPRESSED && compression != CompressionStrategy.NONE) {
compressionToUse = compression;
} else {
compressionToUse = CompressionStrategy.LZ4;
}
final SingleValueColumnarIntsSerializer encodedValueSerializer = CompressedVSizeColumnarIntsSerializer.create(
name,
segmentWriteOutMedium,
filenameBase,
dictionaryWriter.getCardinality(),
compressionToUse
);
encodedValueSerializer.open();
final GenericIndexedWriter<ImmutableBitmap> bitmapIndexWriter = new GenericIndexedWriter<>(
segmentWriteOutMedium,
name,
indexSpec.getBitmapSerdeFactory().getObjectStrategy()
);
bitmapIndexWriter.open();
bitmapIndexWriter.setObjectsNotSorted();
final MutableBitmap[] bitmaps;
bitmaps = new MutableBitmap[dictionaryWriter.getCardinality()];
for (int i = 0; i < bitmaps.length; i++) {
bitmaps[i] = indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap();
}
final IntIterator rows = intermediateValueWriter.getIterator();
int rowCount = 0;
while (rows.hasNext()) {
final int dictId = rows.nextInt();
encodedValueSerializer.addValue(dictId);
bitmaps[dictId].add(rowCount++);
}
for (int i = 0; i < bitmaps.length; i++) {
final MutableBitmap bitmap = bitmaps[i];
bitmapIndexWriter.write(
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeImmutableBitmap(bitmap)
);
bitmaps[i] = null; // Reclaim memory
}
writeV0Header(channel, columnNameBytes);
writeInternal(smoosher, dictionaryWriter, dictionaryFileName);
writeInternal(smoosher, encodedValueSerializer, ENCODED_VALUE_COLUMN_FILE_NAME);
writeValueColumn(smoosher);
writeInternal(smoosher, bitmapIndexWriter, BITMAP_INDEX_FILE_NAME);
log.info("Column [%s] serialized successfully.", name);
}
}

View File

@ -19,56 +19,26 @@
package org.apache.druid.segment.nested; package org.apache.druid.segment.nested;
import com.google.common.base.Preconditions;
import org.apache.druid.collections.bitmap.ImmutableBitmap;
import org.apache.druid.collections.bitmap.MutableBitmap;
import org.apache.druid.common.config.NullHandling; import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.io.smoosh.FileSmoosher; import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.math.expr.ExprEval; import org.apache.druid.math.expr.ExprEval;
import org.apache.druid.math.expr.ExpressionType; import org.apache.druid.math.expr.ExpressionType;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.column.StringEncodingStrategies; import org.apache.druid.segment.column.StringEncodingStrategies;
import org.apache.druid.segment.column.StringUtf8DictionaryEncodedColumn; import org.apache.druid.segment.column.StringUtf8DictionaryEncodedColumn;
import org.apache.druid.segment.data.CompressedVSizeColumnarIntsSerializer;
import org.apache.druid.segment.data.CompressionStrategy;
import org.apache.druid.segment.data.DictionaryWriter;
import org.apache.druid.segment.data.GenericIndexedWriter;
import org.apache.druid.segment.data.SingleValueColumnarIntsSerializer;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium; import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import javax.annotation.Nullable;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
/** /**
* Serializer for a string {@link NestedCommonFormatColumn} that can be read with * Serializer for a string {@link NestedCommonFormatColumn} that can be read with
* {@link StringUtf8DictionaryEncodedColumn}. * {@link StringUtf8DictionaryEncodedColumn}.
*/ */
public class ScalarStringColumnSerializer extends NestedCommonFormatColumnSerializer public class ScalarStringColumnSerializer extends ScalarNestedCommonFormatColumnSerializer<String>
{ {
private static final Logger log = new Logger(ScalarStringColumnSerializer.class);
private final String name;
private final SegmentWriteOutMedium segmentWriteOutMedium;
private final IndexSpec indexSpec;
@SuppressWarnings("unused")
private final Closer closer;
private DictionaryIdLookup dictionaryIdLookup;
private DictionaryWriter<String> dictionaryWriter;
private int rowCount = 0;
private boolean closedForWrite = false;
private boolean dictionarySerialized = false;
private SingleValueColumnarIntsSerializer encodedValueSerializer;
private GenericIndexedWriter<ImmutableBitmap> bitmapIndexWriter;
private MutableBitmap[] bitmaps;
private ByteBuffer columnNameBytes = null;
public ScalarStringColumnSerializer( public ScalarStringColumnSerializer(
String name, String name,
IndexSpec indexSpec, IndexSpec indexSpec,
@ -76,29 +46,16 @@ public class ScalarStringColumnSerializer extends NestedCommonFormatColumnSerial
Closer closer Closer closer
) )
{ {
this.name = name; super(name, STRING_DICTIONARY_FILE_NAME, indexSpec, segmentWriteOutMedium, closer);
this.segmentWriteOutMedium = segmentWriteOutMedium;
this.indexSpec = indexSpec;
this.closer = closer;
this.dictionaryIdLookup = new DictionaryIdLookup();
} }
@Override @Override
public String getColumnName() protected int processValue(@Nullable Object rawValue)
{ {
return name; final ExprEval<?> eval = ExprEval.bestEffortOf(rawValue);
} final String s = eval.castTo(ExpressionType.STRING).asString();
final int dictId = dictionaryIdLookup.lookupString(s);
@Override return dictId;
public DictionaryIdLookup getGlobalLookup()
{
return dictionaryIdLookup;
}
@Override
public boolean hasNulls()
{
return !bitmaps[0].isEmpty();
} }
@Override @Override
@ -113,40 +70,9 @@ public class ScalarStringColumnSerializer extends NestedCommonFormatColumnSerial
} }
@Override @Override
public void open() throws IOException protected void openValueColumnSerializer()
{ {
if (!dictionarySerialized) { // no extra value column for strings
throw new IllegalStateException("Dictionary not serialized, cannot open value serializer");
}
String filenameBase = StringUtils.format("%s.forward_dim", name);
final CompressionStrategy compression = indexSpec.getDimensionCompression();
final CompressionStrategy compressionToUse;
if (compression != CompressionStrategy.UNCOMPRESSED && compression != CompressionStrategy.NONE) {
compressionToUse = compression;
} else {
// always compress
compressionToUse = CompressionStrategy.LZ4;
}
encodedValueSerializer = CompressedVSizeColumnarIntsSerializer.create(
name,
segmentWriteOutMedium,
filenameBase,
dictionaryWriter.getCardinality(),
compressionToUse
);
encodedValueSerializer.open();
bitmapIndexWriter = new GenericIndexedWriter<>(
segmentWriteOutMedium,
name,
indexSpec.getBitmapSerdeFactory().getObjectStrategy()
);
bitmapIndexWriter.open();
bitmapIndexWriter.setObjectsNotSorted();
bitmaps = new MutableBitmap[dictionaryWriter.getCardinality()];
for (int i = 0; i < bitmaps.length; i++) {
bitmaps[i] = indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap();
}
} }
@Override @Override
@ -177,61 +103,8 @@ public class ScalarStringColumnSerializer extends NestedCommonFormatColumnSerial
} }
@Override @Override
public void serialize(ColumnValueSelector<? extends StructuredData> selector) throws IOException protected void writeValueColumn(FileSmoosher smoosher)
{ {
if (!dictionarySerialized) { // no extra value column for strings
throw new ISE("Must serialize value dictionaries before serializing values for column [%s]", name);
}
final Object value = StructuredData.unwrap(selector.getObject());
final ExprEval<?> eval = ExprEval.bestEffortOf(value);
final String s = eval.castTo(ExpressionType.STRING).asString();
final int dictId = dictionaryIdLookup.lookupString(s);
encodedValueSerializer.addValue(dictId);
bitmaps[dictId].add(rowCount);
rowCount++;
}
private void closeForWrite() throws IOException
{
if (!closedForWrite) {
for (int i = 0; i < bitmaps.length; i++) {
final MutableBitmap bitmap = bitmaps[i];
bitmapIndexWriter.write(
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeImmutableBitmap(bitmap)
);
bitmaps[i] = null; // Reclaim memory
}
columnNameBytes = computeFilenameBytes();
closedForWrite = true;
}
}
@Override
public long getSerializedSize() throws IOException
{
closeForWrite();
// standard string version
long size = 1 + columnNameBytes.capacity();
// the value dictionaries, raw column, and null index are all stored in separate files
return size;
}
@Override
public void writeTo(
WritableByteChannel channel,
FileSmoosher smoosher
) throws IOException
{
Preconditions.checkState(closedForWrite, "Not closed yet!");
Preconditions.checkArgument(dictionaryWriter.isSorted(), "Dictionary not sorted?!?");
writeV0Header(channel, columnNameBytes);
writeInternal(smoosher, dictionaryWriter, STRING_DICTIONARY_FILE_NAME);
writeInternal(smoosher, encodedValueSerializer, ENCODED_VALUE_COLUMN_FILE_NAME);
writeInternal(smoosher, bitmapIndexWriter, BITMAP_INDEX_FILE_NAME);
log.info("Column [%s] serialized successfully.", name);
} }
} }

View File

@ -28,6 +28,7 @@ import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper; import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper;
import org.apache.druid.math.expr.ExprEval; import org.apache.druid.math.expr.ExprEval;
import org.apache.druid.math.expr.ExprType;
import org.apache.druid.math.expr.ExpressionType; import org.apache.druid.math.expr.ExpressionType;
import org.apache.druid.query.BitmapResultFactory; import org.apache.druid.query.BitmapResultFactory;
import org.apache.druid.segment.column.ColumnBuilder; import org.apache.druid.segment.column.ColumnBuilder;
@ -47,13 +48,16 @@ import org.apache.druid.segment.data.FrontCodedIntArrayIndexed;
import org.apache.druid.segment.data.GenericIndexed; import org.apache.druid.segment.data.GenericIndexed;
import org.apache.druid.segment.data.Indexed; import org.apache.druid.segment.data.Indexed;
import org.apache.druid.segment.data.VByte; import org.apache.druid.segment.data.VByte;
import org.apache.druid.segment.index.AllFalseBitmapColumnIndex;
import org.apache.druid.segment.index.BitmapColumnIndex; import org.apache.druid.segment.index.BitmapColumnIndex;
import org.apache.druid.segment.index.SimpleBitmapColumnIndex; import org.apache.druid.segment.index.SimpleBitmapColumnIndex;
import org.apache.druid.segment.index.SimpleImmutableBitmapIndex; import org.apache.druid.segment.index.SimpleImmutableBitmapIndex;
import org.apache.druid.segment.index.semantic.ArrayElementIndexes;
import org.apache.druid.segment.index.semantic.NullValueIndex; import org.apache.druid.segment.index.semantic.NullValueIndex;
import org.apache.druid.segment.index.semantic.ValueIndexes; import org.apache.druid.segment.index.semantic.ValueIndexes;
import org.apache.druid.segment.serde.NestedCommonFormatColumnPartSerde; import org.apache.druid.segment.serde.NestedCommonFormatColumnPartSerde;
import javax.annotation.Nonnull;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
@ -93,6 +97,7 @@ public class VariantColumnAndIndexSupplier implements Supplier<NestedCommonForma
final Supplier<FixedIndexed<Long>> longDictionarySupplier; final Supplier<FixedIndexed<Long>> longDictionarySupplier;
final Supplier<FixedIndexed<Double>> doubleDictionarySupplier; final Supplier<FixedIndexed<Double>> doubleDictionarySupplier;
final Supplier<FrontCodedIntArrayIndexed> arrayDictionarySupplier; final Supplier<FrontCodedIntArrayIndexed> arrayDictionarySupplier;
final Supplier<FixedIndexed<Integer>> arrayElementDictionarySupplier;
final ByteBuffer stringDictionaryBuffer = NestedCommonFormatColumnPartSerde.loadInternalFile( final ByteBuffer stringDictionaryBuffer = NestedCommonFormatColumnPartSerde.loadInternalFile(
mapper, mapper,
@ -147,6 +152,11 @@ public class VariantColumnAndIndexSupplier implements Supplier<NestedCommonForma
columnName, columnName,
NestedCommonFormatColumnSerializer.DOUBLE_DICTIONARY_FILE_NAME NestedCommonFormatColumnSerializer.DOUBLE_DICTIONARY_FILE_NAME
); );
final ByteBuffer arrayElementDictionaryBuffer = NestedCommonFormatColumnPartSerde.loadInternalFile(
mapper,
columnName,
NestedCommonFormatColumnSerializer.ARRAY_ELEMENT_DICTIONARY_FILE_NAME
);
final ByteBuffer valueIndexBuffer = NestedCommonFormatColumnPartSerde.loadInternalFile( final ByteBuffer valueIndexBuffer = NestedCommonFormatColumnPartSerde.loadInternalFile(
mapper, mapper,
columnName, columnName,
@ -180,6 +190,7 @@ public class VariantColumnAndIndexSupplier implements Supplier<NestedCommonForma
byteOrder, byteOrder,
Double.BYTES Double.BYTES
); );
final ByteBuffer arrayDictionarybuffer = NestedCommonFormatColumnPartSerde.loadInternalFile( final ByteBuffer arrayDictionarybuffer = NestedCommonFormatColumnPartSerde.loadInternalFile(
mapper, mapper,
columnName, columnName,
@ -193,6 +204,12 @@ public class VariantColumnAndIndexSupplier implements Supplier<NestedCommonForma
try (ColumnarInts throwAway = ints.get()) { try (ColumnarInts throwAway = ints.get()) {
size = throwAway.size(); size = throwAway.size();
} }
arrayElementDictionarySupplier = FixedIndexed.read(
arrayElementDictionaryBuffer,
CompressedNestedDataComplexColumn.INT_TYPE_STRATEGY,
byteOrder,
Integer.BYTES
);
return new VariantColumnAndIndexSupplier( return new VariantColumnAndIndexSupplier(
logicalType, logicalType,
variantTypeByte, variantTypeByte,
@ -201,6 +218,7 @@ public class VariantColumnAndIndexSupplier implements Supplier<NestedCommonForma
longDictionarySupplier, longDictionarySupplier,
doubleDictionarySupplier, doubleDictionarySupplier,
arrayDictionarySupplier, arrayDictionarySupplier,
arrayElementDictionarySupplier,
ints, ints,
valueIndexes, valueIndexes,
arrayElementIndexes, arrayElementIndexes,
@ -222,11 +240,11 @@ public class VariantColumnAndIndexSupplier implements Supplier<NestedCommonForma
@Nullable @Nullable
private final Byte variantTypeSetByte; private final Byte variantTypeSetByte;
private final BitmapFactory bitmapFactory; private final BitmapFactory bitmapFactory;
private final GenericIndexed<ByteBuffer> stringDictionary; private final Supplier<? extends Indexed<ByteBuffer>> stringDictionarySupplier;
private final Supplier<FrontCodedIndexed> frontCodedStringDictionarySupplier;
private final Supplier<FixedIndexed<Long>> longDictionarySupplier; private final Supplier<FixedIndexed<Long>> longDictionarySupplier;
private final Supplier<FixedIndexed<Double>> doubleDictionarySupplier; private final Supplier<FixedIndexed<Double>> doubleDictionarySupplier;
private final Supplier<FrontCodedIntArrayIndexed> arrayDictionarySupplier; private final Supplier<FrontCodedIntArrayIndexed> arrayDictionarySupplier;
private final Supplier<FixedIndexed<Integer>> arrayElementDictionarySupplier;
private final Supplier<ColumnarInts> encodedValueColumnSupplier; private final Supplier<ColumnarInts> encodedValueColumnSupplier;
@SuppressWarnings("unused") @SuppressWarnings("unused")
private final GenericIndexed<ImmutableBitmap> valueIndexes; private final GenericIndexed<ImmutableBitmap> valueIndexes;
@ -242,6 +260,7 @@ public class VariantColumnAndIndexSupplier implements Supplier<NestedCommonForma
Supplier<FixedIndexed<Long>> longDictionarySupplier, Supplier<FixedIndexed<Long>> longDictionarySupplier,
Supplier<FixedIndexed<Double>> doubleDictionarySupplier, Supplier<FixedIndexed<Double>> doubleDictionarySupplier,
Supplier<FrontCodedIntArrayIndexed> arrayDictionarySupplier, Supplier<FrontCodedIntArrayIndexed> arrayDictionarySupplier,
Supplier<FixedIndexed<Integer>> arrayElementDictionarySupplier,
Supplier<ColumnarInts> encodedValueColumnSupplier, Supplier<ColumnarInts> encodedValueColumnSupplier,
GenericIndexed<ImmutableBitmap> valueIndexes, GenericIndexed<ImmutableBitmap> valueIndexes,
GenericIndexed<ImmutableBitmap> elementIndexes, GenericIndexed<ImmutableBitmap> elementIndexes,
@ -252,11 +271,13 @@ public class VariantColumnAndIndexSupplier implements Supplier<NestedCommonForma
{ {
this.logicalType = logicalType; this.logicalType = logicalType;
this.variantTypeSetByte = variantTypeSetByte; this.variantTypeSetByte = variantTypeSetByte;
this.stringDictionary = stringDictionary; stringDictionarySupplier = frontCodedStringDictionarySupplier != null
this.frontCodedStringDictionarySupplier = frontCodedStringDictionarySupplier; ? frontCodedStringDictionarySupplier
: stringDictionary::singleThreaded;
this.longDictionarySupplier = longDictionarySupplier; this.longDictionarySupplier = longDictionarySupplier;
this.doubleDictionarySupplier = doubleDictionarySupplier; this.doubleDictionarySupplier = doubleDictionarySupplier;
this.arrayDictionarySupplier = arrayDictionarySupplier; this.arrayDictionarySupplier = arrayDictionarySupplier;
this.arrayElementDictionarySupplier = arrayElementDictionarySupplier;
this.encodedValueColumnSupplier = encodedValueColumnSupplier; this.encodedValueColumnSupplier = encodedValueColumnSupplier;
this.valueIndexes = valueIndexes; this.valueIndexes = valueIndexes;
this.arrayElementIndexes = elementIndexes; this.arrayElementIndexes = elementIndexes;
@ -273,20 +294,8 @@ public class VariantColumnAndIndexSupplier implements Supplier<NestedCommonForma
@Override @Override
public NestedCommonFormatColumn get() public NestedCommonFormatColumn get()
{ {
if (frontCodedStringDictionarySupplier != null) {
return new VariantColumn<>( return new VariantColumn<>(
frontCodedStringDictionarySupplier.get(), stringDictionarySupplier.get(),
longDictionarySupplier.get(),
doubleDictionarySupplier.get(),
arrayDictionarySupplier.get(),
encodedValueColumnSupplier.get(),
nullValueBitmap,
logicalType,
variantTypeSetByte
);
}
return new VariantColumn<>(
stringDictionary.singleThreaded(),
longDictionarySupplier.get(), longDictionarySupplier.get(),
doubleDictionarySupplier.get(), doubleDictionarySupplier.get(),
arrayDictionarySupplier.get(), arrayDictionarySupplier.get(),
@ -306,6 +315,8 @@ public class VariantColumnAndIndexSupplier implements Supplier<NestedCommonForma
return (T) (NullValueIndex) () -> nullIndex; return (T) (NullValueIndex) () -> nullIndex;
} else if (clazz.equals(ValueIndexes.class) && variantTypeSetByte == null && logicalType.isArray()) { } else if (clazz.equals(ValueIndexes.class) && variantTypeSetByte == null && logicalType.isArray()) {
return (T) new ArrayValueIndexes(); return (T) new ArrayValueIndexes();
} else if (clazz.equals(ArrayElementIndexes.class) && variantTypeSetByte == null && logicalType.isArray()) {
return (T) new VariantArrayElementIndexes();
} }
return null; return null;
} }
@ -320,30 +331,42 @@ public class VariantColumnAndIndexSupplier implements Supplier<NestedCommonForma
return bitmap == null ? bitmapFactory.makeEmptyImmutableBitmap() : bitmap; return bitmap == null ? bitmapFactory.makeEmptyImmutableBitmap() : bitmap;
} }
private ImmutableBitmap getElementBitmap(int idx)
{
if (idx < 0) {
return bitmapFactory.makeEmptyImmutableBitmap();
}
final int elementDictionaryIndex = arrayElementDictionarySupplier.get().indexOf(idx);
if (elementDictionaryIndex < 0) {
return bitmapFactory.makeEmptyImmutableBitmap();
}
final ImmutableBitmap bitmap = arrayElementIndexes.get(elementDictionaryIndex);
return bitmap == null ? bitmapFactory.makeEmptyImmutableBitmap() : bitmap;
}
private class ArrayValueIndexes implements ValueIndexes private class ArrayValueIndexes implements ValueIndexes
{ {
@Nullable @Nullable
@Override @Override
public BitmapColumnIndex forValue(Object value, TypeSignature<ValueType> valueType) public BitmapColumnIndex forValue(@Nonnull Object value, TypeSignature<ValueType> valueType)
{ {
final ExprEval<?> eval = ExprEval.ofType(ExpressionType.fromColumnTypeStrict(valueType), value) final ExprEval<?> eval = ExprEval.ofType(ExpressionType.fromColumnTypeStrict(valueType), value)
.castTo(ExpressionType.fromColumnTypeStrict(logicalType)); .castTo(ExpressionType.fromColumnTypeStrict(logicalType));
if (eval.value() == null) {
return null;
}
final Object[] arrayToMatch = eval.asArray(); final Object[] arrayToMatch = eval.asArray();
Indexed elements; Indexed elements;
final int elementOffset;
switch (logicalType.getElementType().getType()) { switch (logicalType.getElementType().getType()) {
case STRING: case STRING:
elements = frontCodedStringDictionarySupplier != null elements = stringDictionarySupplier.get();
? frontCodedStringDictionarySupplier.get() elementOffset = 0;
: stringDictionary.singleThreaded();
break; break;
case LONG: case LONG:
elements = longDictionarySupplier.get(); elements = longDictionarySupplier.get();
elementOffset = stringDictionarySupplier.get().size();
break; break;
case DOUBLE: case DOUBLE:
elements = doubleDictionarySupplier.get(); elements = doubleDictionarySupplier.get();
elementOffset = stringDictionarySupplier.get().size() + longDictionarySupplier.get().size();
break; break;
default: default:
throw DruidException.defensive( throw DruidException.defensive(
@ -353,30 +376,29 @@ public class VariantColumnAndIndexSupplier implements Supplier<NestedCommonForma
} }
final int[] ids = new int[arrayToMatch.length]; final int[] ids = new int[arrayToMatch.length];
boolean hasMissingElement = false; final int arrayOffset = stringDictionarySupplier.get().size() + longDictionarySupplier.get().size() + doubleDictionarySupplier.get().size();
for (int i = 0; i < arrayToMatch.length; i++) { for (int i = 0; i < arrayToMatch.length; i++) {
if (logicalType.getElementType().is(ValueType.STRING)) { if (arrayToMatch[i] == null) {
ids[i] = 0;
} else if (logicalType.getElementType().is(ValueType.STRING)) {
ids[i] = elements.indexOf(StringUtils.toUtf8ByteBuffer((String) arrayToMatch[i])); ids[i] = elements.indexOf(StringUtils.toUtf8ByteBuffer((String) arrayToMatch[i]));
} else { } else {
ids[i] = elements.indexOf(arrayToMatch[i]); ids[i] = elements.indexOf(arrayToMatch[i]) + elementOffset;
} }
if (ids[i] < 0) { if (ids[i] < 0) {
hasMissingElement = true; if (value == null) {
break; return new AllFalseBitmapColumnIndex(bitmapFactory);
}
} }
} }
final boolean noMatch = hasMissingElement;
final FrontCodedIntArrayIndexed dictionary = arrayDictionarySupplier.get(); final FrontCodedIntArrayIndexed dictionary = arrayDictionarySupplier.get();
return new SimpleBitmapColumnIndex() return new SimpleBitmapColumnIndex()
{ {
@Override @Override
public double estimateSelectivity(int totalRows) public double estimateSelectivity(int totalRows)
{ {
if (noMatch) { final int id = dictionary.indexOf(ids) + arrayOffset;
return 0.0;
}
final int id = dictionary.indexOf(ids);
if (id < 0) { if (id < 0) {
return 0.0; return 0.0;
} }
@ -386,10 +408,7 @@ public class VariantColumnAndIndexSupplier implements Supplier<NestedCommonForma
@Override @Override
public <T> T computeBitmapResult(BitmapResultFactory<T> bitmapResultFactory) public <T> T computeBitmapResult(BitmapResultFactory<T> bitmapResultFactory)
{ {
if (noMatch) { final int id = dictionary.indexOf(ids) + arrayOffset;
return bitmapResultFactory.wrapDimensionValue(bitmapFactory.makeEmptyImmutableBitmap());
}
final int id = dictionary.indexOf(ids);
if (id < 0) { if (id < 0) {
return bitmapResultFactory.wrapDimensionValue(bitmapFactory.makeEmptyImmutableBitmap()); return bitmapResultFactory.wrapDimensionValue(bitmapFactory.makeEmptyImmutableBitmap());
} }
@ -398,4 +417,73 @@ public class VariantColumnAndIndexSupplier implements Supplier<NestedCommonForma
}; };
} }
} }
private class VariantArrayElementIndexes implements ArrayElementIndexes
{
@Nullable
@Override
public BitmapColumnIndex containsValue(@Nullable Object value, TypeSignature<ValueType> elementValueType)
{
final ExprEval<?> eval = ExprEval.ofType(ExpressionType.fromColumnTypeStrict(elementValueType), value)
.castTo(ExpressionType.fromColumnTypeStrict(logicalType.getElementType()));
Indexed elements;
final int elementOffset;
switch (logicalType.getElementType().getType()) {
case STRING:
elements = stringDictionarySupplier.get();
elementOffset = 0;
break;
case LONG:
elements = longDictionarySupplier.get();
elementOffset = stringDictionarySupplier.get().size();
break;
case DOUBLE:
elements = doubleDictionarySupplier.get();
elementOffset = stringDictionarySupplier.get().size() + longDictionarySupplier.get().size();
break;
default:
throw DruidException.defensive(
"Unhandled array type [%s] how did this happen?",
logicalType.getElementType()
);
}
return new SimpleBitmapColumnIndex()
{
@Override
public double estimateSelectivity(int totalRows)
{
final int elementId = getElementId();
if (elementId < 0) {
return 0.0;
}
return (double) getElementBitmap(elementId).size() / totalRows;
}
@Override
public <T> T computeBitmapResult(BitmapResultFactory<T> bitmapResultFactory)
{
final int elementId = getElementId();
if (elementId < 0) {
return bitmapResultFactory.wrapDimensionValue(bitmapFactory.makeEmptyImmutableBitmap());
}
return bitmapResultFactory.wrapDimensionValue(getElementBitmap(elementId));
}
private int getElementId()
{
if (eval.value() == null) {
return 0;
} else if (eval.type().is(ExprType.STRING)) {
return elements.indexOf(StringUtils.toUtf8ByteBuffer(eval.asString()));
} else {
return elements.indexOf(eval.value()) + elementOffset;
}
}
};
}
}
} }

View File

@ -22,6 +22,7 @@ package org.apache.druid.segment.nested;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap; import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectRBTreeMap; import it.unimi.dsi.fastutil.ints.Int2ObjectRBTreeMap;
import it.unimi.dsi.fastutil.ints.IntIterator;
import org.apache.druid.collections.bitmap.ImmutableBitmap; import org.apache.druid.collections.bitmap.ImmutableBitmap;
import org.apache.druid.collections.bitmap.MutableBitmap; import org.apache.druid.collections.bitmap.MutableBitmap;
import org.apache.druid.common.config.NullHandling; import org.apache.druid.common.config.NullHandling;
@ -71,15 +72,12 @@ public class VariantColumnSerializer extends NestedCommonFormatColumnSerializer
private FixedIndexedWriter<Double> doubleDictionaryWriter; private FixedIndexedWriter<Double> doubleDictionaryWriter;
private FrontCodedIntArrayIndexedWriter arrayDictionaryWriter; private FrontCodedIntArrayIndexedWriter arrayDictionaryWriter;
private FixedIndexedIntWriter arrayElementDictionaryWriter; private FixedIndexedIntWriter arrayElementDictionaryWriter;
private int rowCount = 0;
private boolean closedForWrite = false; private boolean closedForWrite = false;
private boolean dictionarySerialized = false; private boolean dictionarySerialized = false;
private SingleValueColumnarIntsSerializer encodedValueSerializer; private FixedIndexedIntWriter intermediateValueWriter;
private GenericIndexedWriter<ImmutableBitmap> bitmapIndexWriter;
private GenericIndexedWriter<ImmutableBitmap> arrayElementIndexWriter;
private MutableBitmap[] bitmaps;
private ByteBuffer columnNameBytes = null; private ByteBuffer columnNameBytes = null;
private final Int2ObjectRBTreeMap<MutableBitmap> arrayElements = new Int2ObjectRBTreeMap<>(); private boolean hasNulls;
@Nullable @Nullable
private final Byte variantTypeSetByte; private final Byte variantTypeSetByte;
@ -114,7 +112,7 @@ public class VariantColumnSerializer extends NestedCommonFormatColumnSerializer
@Override @Override
public boolean hasNulls() public boolean hasNulls()
{ {
return !bitmaps[0].isEmpty(); return hasNulls;
} }
@Override @Override
@ -161,45 +159,8 @@ public class VariantColumnSerializer extends NestedCommonFormatColumnSerializer
if (!dictionarySerialized) { if (!dictionarySerialized) {
throw new IllegalStateException("Dictionary not serialized, cannot open value serializer"); throw new IllegalStateException("Dictionary not serialized, cannot open value serializer");
} }
String filenameBase = StringUtils.format("%s.forward_dim", name); intermediateValueWriter = new FixedIndexedIntWriter(segmentWriteOutMedium, false);
final int cardinality = dictionaryWriter.getCardinality() intermediateValueWriter.open();
+ longDictionaryWriter.getCardinality()
+ doubleDictionaryWriter.getCardinality()
+ arrayDictionaryWriter.getCardinality();
final CompressionStrategy compression = indexSpec.getDimensionCompression();
final CompressionStrategy compressionToUse;
if (compression != CompressionStrategy.UNCOMPRESSED && compression != CompressionStrategy.NONE) {
compressionToUse = compression;
} else {
compressionToUse = CompressionStrategy.LZ4;
}
encodedValueSerializer = CompressedVSizeColumnarIntsSerializer.create(
name,
segmentWriteOutMedium,
filenameBase,
cardinality,
compressionToUse
);
encodedValueSerializer.open();
bitmapIndexWriter = new GenericIndexedWriter<>(
segmentWriteOutMedium,
name,
indexSpec.getBitmapSerdeFactory().getObjectStrategy()
);
bitmapIndexWriter.open();
bitmapIndexWriter.setObjectsNotSorted();
bitmaps = new MutableBitmap[cardinality];
for (int i = 0; i < bitmaps.length; i++) {
bitmaps[i] = indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap();
}
arrayElementIndexWriter = new GenericIndexedWriter<>(
segmentWriteOutMedium,
name + "_arrays",
indexSpec.getBitmapSerdeFactory().getObjectStrategy()
);
arrayElementIndexWriter.open();
arrayElementIndexWriter.setObjectsNotSorted();
} }
@Override @Override
@ -211,7 +172,7 @@ public class VariantColumnSerializer extends NestedCommonFormatColumnSerializer
) throws IOException ) throws IOException
{ {
if (dictionarySerialized) { if (dictionarySerialized) {
throw new ISE("String dictionary already serialized for column [%s], cannot serialize again", name); throw new ISE("Value dictionaries already serialized for column [%s], cannot serialize again", name);
} }
// null is always 0 // null is always 0
@ -277,14 +238,10 @@ public class VariantColumnSerializer extends NestedCommonFormatColumnSerializer
globalIds[i] = -1; globalIds[i] = -1;
} }
Preconditions.checkArgument(globalIds[i] >= 0, "unknown global id [%s] for value [%s]", globalIds[i], array[i]); Preconditions.checkArgument(globalIds[i] >= 0, "unknown global id [%s] for value [%s]", globalIds[i], array[i]);
arrayElements.computeIfAbsent(
globalIds[i],
(id) -> indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap()
).add(rowCount);
} }
final int dictId = dictionaryIdLookup.lookupArray(globalIds); final int dictId = dictionaryIdLookup.lookupArray(globalIds);
encodedValueSerializer.addValue(dictId); intermediateValueWriter.write(dictId);
bitmaps[dictId].add(rowCount); hasNulls = hasNulls || dictId == 0;
} else { } else {
final Object o = eval.value(); final Object o = eval.value();
final int dictId; final int dictId;
@ -300,43 +257,21 @@ public class VariantColumnSerializer extends NestedCommonFormatColumnSerializer
dictId = -1; dictId = -1;
} }
Preconditions.checkArgument(dictId >= 0, "unknown global id [%s] for value [%s]", dictId, o); Preconditions.checkArgument(dictId >= 0, "unknown global id [%s] for value [%s]", dictId, o);
if (dictId != 0) { intermediateValueWriter.write(dictId);
// treat as single element array hasNulls = hasNulls || dictId == 0;
arrayElements.computeIfAbsent(
dictId,
(id) -> indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap()
).add(rowCount);
} }
encodedValueSerializer.addValue(dictId);
bitmaps[dictId].add(rowCount);
} }
rowCount++; private void closeForWrite()
}
private void closeForWrite() throws IOException
{ {
if (!closedForWrite) { if (!closedForWrite) {
for (int i = 0; i < bitmaps.length; i++) {
final MutableBitmap bitmap = bitmaps[i];
bitmapIndexWriter.write(
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeImmutableBitmap(bitmap)
);
bitmaps[i] = null; // Reclaim memory
}
for (Int2ObjectMap.Entry<MutableBitmap> arrayElement : arrayElements.int2ObjectEntrySet()) {
arrayElementDictionaryWriter.write(arrayElement.getIntKey());
arrayElementIndexWriter.write(
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeImmutableBitmap(arrayElement.getValue())
);
}
columnNameBytes = computeFilenameBytes(); columnNameBytes = computeFilenameBytes();
closedForWrite = true; closedForWrite = true;
} }
} }
@Override @Override
public long getSerializedSize() throws IOException public long getSerializedSize()
{ {
closeForWrite(); closeForWrite();
@ -357,6 +292,87 @@ public class VariantColumnSerializer extends NestedCommonFormatColumnSerializer
Preconditions.checkState(closedForWrite, "Not closed yet!"); Preconditions.checkState(closedForWrite, "Not closed yet!");
Preconditions.checkArgument(dictionaryWriter.isSorted(), "Dictionary not sorted?!?"); Preconditions.checkArgument(dictionaryWriter.isSorted(), "Dictionary not sorted?!?");
// write out compressed dictionaryId int column, bitmap indexes, and array element bitmap indexes
// by iterating intermediate value column the intermediate value column should be replaced someday by a cooler
// compressed int column writer that allows easy iteration of the values it writes out, so that we could just
// build the bitmap indexes here instead of doing both things
String filenameBase = StringUtils.format("%s.forward_dim", name);
final int cardinality = dictionaryWriter.getCardinality()
+ longDictionaryWriter.getCardinality()
+ doubleDictionaryWriter.getCardinality()
+ arrayDictionaryWriter.getCardinality();
final CompressionStrategy compression = indexSpec.getDimensionCompression();
final CompressionStrategy compressionToUse;
if (compression != CompressionStrategy.UNCOMPRESSED && compression != CompressionStrategy.NONE) {
compressionToUse = compression;
} else {
compressionToUse = CompressionStrategy.LZ4;
}
final SingleValueColumnarIntsSerializer encodedValueSerializer = CompressedVSizeColumnarIntsSerializer.create(
name,
segmentWriteOutMedium,
filenameBase,
cardinality,
compressionToUse
);
encodedValueSerializer.open();
final GenericIndexedWriter<ImmutableBitmap> bitmapIndexWriter = new GenericIndexedWriter<>(
segmentWriteOutMedium,
name,
indexSpec.getBitmapSerdeFactory().getObjectStrategy()
);
bitmapIndexWriter.open();
bitmapIndexWriter.setObjectsNotSorted();
final MutableBitmap[] bitmaps = new MutableBitmap[cardinality];
final Int2ObjectRBTreeMap<MutableBitmap> arrayElements = new Int2ObjectRBTreeMap<>();
for (int i = 0; i < bitmaps.length; i++) {
bitmaps[i] = indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap();
}
final GenericIndexedWriter<ImmutableBitmap> arrayElementIndexWriter = new GenericIndexedWriter<>(
segmentWriteOutMedium,
name + "_arrays",
indexSpec.getBitmapSerdeFactory().getObjectStrategy()
);
arrayElementIndexWriter.open();
arrayElementIndexWriter.setObjectsNotSorted();
final IntIterator rows = intermediateValueWriter.getIterator();
int rowCount = 0;
final int arrayBaseId = dictionaryWriter.getCardinality()
+ longDictionaryWriter.getCardinality()
+ doubleDictionaryWriter.getCardinality();
while (rows.hasNext()) {
final int dictId = rows.nextInt();
encodedValueSerializer.addValue(dictId);
bitmaps[dictId].add(rowCount);
if (dictId >= arrayBaseId) {
int[] array = arrayDictionaryWriter.get(dictId - arrayBaseId);
for (int elementId : array) {
arrayElements.computeIfAbsent(
elementId,
(id) -> indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap()
).add(rowCount);
}
}
rowCount++;
}
for (int i = 0; i < bitmaps.length; i++) {
final MutableBitmap bitmap = bitmaps[i];
bitmapIndexWriter.write(
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeImmutableBitmap(bitmap)
);
bitmaps[i] = null; // Reclaim memory
}
for (Int2ObjectMap.Entry<MutableBitmap> arrayElement : arrayElements.int2ObjectEntrySet()) {
arrayElementDictionaryWriter.write(arrayElement.getIntKey());
arrayElementIndexWriter.write(
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeImmutableBitmap(arrayElement.getValue())
);
}
writeV0Header(channel, columnNameBytes); writeV0Header(channel, columnNameBytes);
if (variantTypeSetByte != null) { if (variantTypeSetByte != null) {
channel.write(ByteBuffer.wrap(new byte[]{variantTypeSetByte})); channel.write(ByteBuffer.wrap(new byte[]{variantTypeSetByte}));

View File

@ -47,6 +47,7 @@ import org.apache.druid.segment.data.RoaringBitmapSerdeFactory;
import org.apache.druid.segment.index.semantic.DruidPredicateIndexes; import org.apache.druid.segment.index.semantic.DruidPredicateIndexes;
import org.apache.druid.segment.index.semantic.NullValueIndex; import org.apache.druid.segment.index.semantic.NullValueIndex;
import org.apache.druid.segment.index.semantic.StringValueSetIndexes; import org.apache.druid.segment.index.semantic.StringValueSetIndexes;
import org.apache.druid.segment.index.semantic.ValueIndexes;
import org.apache.druid.segment.vector.NoFilterVectorOffset; import org.apache.druid.segment.vector.NoFilterVectorOffset;
import org.apache.druid.segment.vector.VectorValueSelector; import org.apache.druid.segment.vector.VectorValueSelector;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory; import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
@ -248,6 +249,7 @@ public class ScalarDoubleColumnSupplierTest extends InitializedNullHandlingTest
ColumnValueSelector<?> valueSelector = column.makeColumnValueSelector(offset); ColumnValueSelector<?> valueSelector = column.makeColumnValueSelector(offset);
VectorValueSelector vectorValueSelector = column.makeVectorValueSelector(vectorOffset); VectorValueSelector vectorValueSelector = column.makeVectorValueSelector(vectorOffset);
ValueIndexes valueIndexes = supplier.as(ValueIndexes.class);
StringValueSetIndexes valueSetIndex = supplier.as(StringValueSetIndexes.class); StringValueSetIndexes valueSetIndex = supplier.as(StringValueSetIndexes.class);
DruidPredicateIndexes predicateIndex = supplier.as(DruidPredicateIndexes.class); DruidPredicateIndexes predicateIndex = supplier.as(DruidPredicateIndexes.class);
NullValueIndex nullValueIndex = supplier.as(NullValueIndex.class); NullValueIndex nullValueIndex = supplier.as(NullValueIndex.class);
@ -279,6 +281,7 @@ public class ScalarDoubleColumnSupplierTest extends InitializedNullHandlingTest
} }
Assert.assertTrue(valueSetIndex.forValue(String.valueOf(row)).computeBitmapResult(resultFactory).get(i)); Assert.assertTrue(valueSetIndex.forValue(String.valueOf(row)).computeBitmapResult(resultFactory).get(i));
Assert.assertTrue(valueIndexes.forValue(row, ColumnType.DOUBLE).computeBitmapResult(resultFactory).get(i));
Assert.assertTrue(valueSetIndex.forSortedValues(new TreeSet<>(ImmutableSet.of(String.valueOf(row)))) Assert.assertTrue(valueSetIndex.forSortedValues(new TreeSet<>(ImmutableSet.of(String.valueOf(row))))
.computeBitmapResult(resultFactory) .computeBitmapResult(resultFactory)
.get(i)); .get(i));

View File

@ -47,6 +47,7 @@ import org.apache.druid.segment.data.RoaringBitmapSerdeFactory;
import org.apache.druid.segment.index.semantic.DruidPredicateIndexes; import org.apache.druid.segment.index.semantic.DruidPredicateIndexes;
import org.apache.druid.segment.index.semantic.NullValueIndex; import org.apache.druid.segment.index.semantic.NullValueIndex;
import org.apache.druid.segment.index.semantic.StringValueSetIndexes; import org.apache.druid.segment.index.semantic.StringValueSetIndexes;
import org.apache.druid.segment.index.semantic.ValueIndexes;
import org.apache.druid.segment.vector.NoFilterVectorOffset; import org.apache.druid.segment.vector.NoFilterVectorOffset;
import org.apache.druid.segment.vector.VectorValueSelector; import org.apache.druid.segment.vector.VectorValueSelector;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory; import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
@ -248,6 +249,7 @@ public class ScalarLongColumnSupplierTest extends InitializedNullHandlingTest
ColumnValueSelector<?> valueSelector = column.makeColumnValueSelector(offset); ColumnValueSelector<?> valueSelector = column.makeColumnValueSelector(offset);
VectorValueSelector vectorValueSelector = column.makeVectorValueSelector(vectorOffset); VectorValueSelector vectorValueSelector = column.makeVectorValueSelector(vectorOffset);
ValueIndexes valueIndexes = supplier.as(ValueIndexes.class);
StringValueSetIndexes valueSetIndex = supplier.as(StringValueSetIndexes.class); StringValueSetIndexes valueSetIndex = supplier.as(StringValueSetIndexes.class);
DruidPredicateIndexes predicateIndex = supplier.as(DruidPredicateIndexes.class); DruidPredicateIndexes predicateIndex = supplier.as(DruidPredicateIndexes.class);
NullValueIndex nullValueIndex = supplier.as(NullValueIndex.class); NullValueIndex nullValueIndex = supplier.as(NullValueIndex.class);
@ -279,6 +281,7 @@ public class ScalarLongColumnSupplierTest extends InitializedNullHandlingTest
} }
Assert.assertTrue(valueSetIndex.forValue(String.valueOf(row)).computeBitmapResult(resultFactory).get(i)); Assert.assertTrue(valueSetIndex.forValue(String.valueOf(row)).computeBitmapResult(resultFactory).get(i));
Assert.assertTrue(valueIndexes.forValue(row, ColumnType.LONG).computeBitmapResult(resultFactory).get(i));
Assert.assertTrue(valueSetIndex.forSortedValues(new TreeSet<>(ImmutableSet.of(String.valueOf(row)))) Assert.assertTrue(valueSetIndex.forSortedValues(new TreeSet<>(ImmutableSet.of(String.valueOf(row))))
.computeBitmapResult(resultFactory) .computeBitmapResult(resultFactory)
.get(i)); .get(i));

View File

@ -49,6 +49,7 @@ import org.apache.druid.segment.data.RoaringBitmapSerdeFactory;
import org.apache.druid.segment.index.semantic.DruidPredicateIndexes; import org.apache.druid.segment.index.semantic.DruidPredicateIndexes;
import org.apache.druid.segment.index.semantic.NullValueIndex; import org.apache.druid.segment.index.semantic.NullValueIndex;
import org.apache.druid.segment.index.semantic.StringValueSetIndexes; import org.apache.druid.segment.index.semantic.StringValueSetIndexes;
import org.apache.druid.segment.index.semantic.ValueIndexes;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory; import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory; import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory;
import org.apache.druid.testing.InitializedNullHandlingTest; import org.apache.druid.testing.InitializedNullHandlingTest;
@ -248,6 +249,7 @@ public class ScalarStringColumnSupplierTest extends InitializedNullHandlingTest
ColumnValueSelector<?> valueSelector = column.makeColumnValueSelector(offset); ColumnValueSelector<?> valueSelector = column.makeColumnValueSelector(offset);
DimensionSelector dimSelector = column.makeDimensionSelector(offset, null); DimensionSelector dimSelector = column.makeDimensionSelector(offset, null);
ValueIndexes valueIndexes = supplier.as(ValueIndexes.class);
StringValueSetIndexes valueSetIndex = supplier.as(StringValueSetIndexes.class); StringValueSetIndexes valueSetIndex = supplier.as(StringValueSetIndexes.class);
DruidPredicateIndexes predicateIndex = supplier.as(DruidPredicateIndexes.class); DruidPredicateIndexes predicateIndex = supplier.as(DruidPredicateIndexes.class);
NullValueIndex nullValueIndex = supplier.as(NullValueIndex.class); NullValueIndex nullValueIndex = supplier.as(NullValueIndex.class);
@ -270,6 +272,7 @@ public class ScalarStringColumnSupplierTest extends InitializedNullHandlingTest
Assert.assertEquals(dimSelector.idLookup().lookupId(dimSelectorLookupVal), dimSelector.getRow().get(0)); Assert.assertEquals(dimSelector.idLookup().lookupId(dimSelectorLookupVal), dimSelector.getRow().get(0));
Assert.assertTrue(valueSetIndex.forValue(row).computeBitmapResult(resultFactory).get(i)); Assert.assertTrue(valueSetIndex.forValue(row).computeBitmapResult(resultFactory).get(i));
Assert.assertTrue(valueIndexes.forValue(row, ColumnType.STRING).computeBitmapResult(resultFactory).get(i));
Assert.assertTrue(valueSetIndex.forSortedValues(new TreeSet<>(ImmutableSet.of(row))) Assert.assertTrue(valueSetIndex.forSortedValues(new TreeSet<>(ImmutableSet.of(row)))
.computeBitmapResult(resultFactory) .computeBitmapResult(resultFactory)
.get(i)); .get(i));

View File

@ -47,9 +47,11 @@ import org.apache.druid.segment.data.BitmapSerdeFactory;
import org.apache.druid.segment.data.CompressionFactory; import org.apache.druid.segment.data.CompressionFactory;
import org.apache.druid.segment.data.FrontCodedIndexed; import org.apache.druid.segment.data.FrontCodedIndexed;
import org.apache.druid.segment.data.RoaringBitmapSerdeFactory; import org.apache.druid.segment.data.RoaringBitmapSerdeFactory;
import org.apache.druid.segment.index.semantic.ArrayElementIndexes;
import org.apache.druid.segment.index.semantic.DruidPredicateIndexes; import org.apache.druid.segment.index.semantic.DruidPredicateIndexes;
import org.apache.druid.segment.index.semantic.NullValueIndex; import org.apache.druid.segment.index.semantic.NullValueIndex;
import org.apache.druid.segment.index.semantic.StringValueSetIndexes; import org.apache.druid.segment.index.semantic.StringValueSetIndexes;
import org.apache.druid.segment.index.semantic.ValueIndexes;
import org.apache.druid.segment.vector.NoFilterVectorOffset; import org.apache.druid.segment.vector.NoFilterVectorOffset;
import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector; import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector;
import org.apache.druid.segment.vector.VectorObjectSelector; import org.apache.druid.segment.vector.VectorObjectSelector;
@ -377,6 +379,15 @@ public class VariantColumnSupplierTest extends InitializedNullHandlingTest
Assert.assertNull(predicateIndex); Assert.assertNull(predicateIndex);
NullValueIndex nullValueIndex = supplier.as(NullValueIndex.class); NullValueIndex nullValueIndex = supplier.as(NullValueIndex.class);
Assert.assertNotNull(nullValueIndex); Assert.assertNotNull(nullValueIndex);
ValueIndexes valueIndexes = supplier.as(ValueIndexes.class);
ArrayElementIndexes arrayElementIndexes = supplier.as(ArrayElementIndexes.class);
if (expectedType.getSingleType() != null && expectedType.getSingleType().isArray()) {
Assert.assertNotNull(valueIndexes);
Assert.assertNotNull(arrayElementIndexes);
} else {
Assert.assertNull(valueIndexes);
Assert.assertNull(arrayElementIndexes);
}
SortedMap<String, FieldTypeInfo.MutableTypeSet> fields = column.getFieldTypeInfo(); SortedMap<String, FieldTypeInfo.MutableTypeSet> fields = column.getFieldTypeInfo();
Assert.assertEquals(1, fields.size()); Assert.assertEquals(1, fields.size());
@ -397,6 +408,10 @@ public class VariantColumnSupplierTest extends InitializedNullHandlingTest
Assert.assertArrayEquals(((List) row).toArray(), (Object[]) valueSelector.getObject()); Assert.assertArrayEquals(((List) row).toArray(), (Object[]) valueSelector.getObject());
if (expectedType.getSingleType() != null) { if (expectedType.getSingleType() != null) {
Assert.assertArrayEquals(((List) row).toArray(), (Object[]) vectorObjectSelector.getObjectVector()[0]); Assert.assertArrayEquals(((List) row).toArray(), (Object[]) vectorObjectSelector.getObjectVector()[0]);
Assert.assertTrue(valueIndexes.forValue(row, expectedType.getSingleType()).computeBitmapResult(resultFactory).get(i));
for (Object o : ((List) row)) {
Assert.assertTrue("Failed on row: " + row, arrayElementIndexes.containsValue(o, expectedType.getSingleType().getElementType()).computeBitmapResult(resultFactory).get(i));
}
} else { } else {
// mixed type vector object selector coerces to the most common type // mixed type vector object selector coerces to the most common type
Assert.assertArrayEquals(ExprEval.ofType(expressionType, row).asArray(), (Object[]) vectorObjectSelector.getObjectVector()[0]); Assert.assertArrayEquals(ExprEval.ofType(expressionType, row).asArray(), (Object[]) vectorObjectSelector.getObjectVector()[0]);
@ -440,6 +455,9 @@ public class VariantColumnSupplierTest extends InitializedNullHandlingTest
} }
} }
Assert.assertTrue(nullValueIndex.get().computeBitmapResult(resultFactory).get(i)); Assert.assertTrue(nullValueIndex.get().computeBitmapResult(resultFactory).get(i));
if (expectedType.getSingleType() != null) {
Assert.assertFalse(arrayElementIndexes.containsValue(null, expectedType.getSingleType()).computeBitmapResult(resultFactory).get(i));
}
} }
offset.increment(); offset.increment();