mirror of https://github.com/apache/druid.git
improve nested column serializer (#13051)
changes: * long and double value columns are now written directly, at the same time as writing out the 'intermediary' dictionaryid column with unsorted ids * remove reverse value lookup from GlobalDictionaryIdLookup since it is no longer needed
This commit is contained in:
parent
d57557d51d
commit
6438f4198d
|
@ -36,7 +36,7 @@ import java.util.function.Supplier;
|
|||
* Reader for a virtual contiguous address range backed by compressed blocks of data.
|
||||
*
|
||||
* Format:
|
||||
* | version (byte) | compression (byte) | num blocks (int) | block size (int) | end offsets | compressed data |
|
||||
* | version (byte) | compression (byte) | block size (int) | num blocks (int) | end offsets | compressed data |
|
||||
*
|
||||
* This mechanism supports two modes of use, the first where callers may ask for a range of data from the underlying
|
||||
* blocks, provided by {@link #getRange(long, int)}. The {@link ByteBuffer} provided by this method may or may not
|
||||
|
@ -48,6 +48,8 @@ import java.util.function.Supplier;
|
|||
* {@link #seekBlock(int)} to change which block is currently loaded.
|
||||
*
|
||||
* {@link #getRange(long, int)} uses these same mechanisms internally to supply data.
|
||||
*
|
||||
* @see CompressedBlockSerializer for writer
|
||||
*/
|
||||
public final class CompressedBlockReader implements Closeable
|
||||
{
|
||||
|
|
|
@ -22,10 +22,6 @@ package org.apache.druid.segment.nested;
|
|||
import com.google.common.base.Preconditions;
|
||||
import it.unimi.dsi.fastutil.doubles.Double2IntLinkedOpenHashMap;
|
||||
import it.unimi.dsi.fastutil.doubles.Double2IntMap;
|
||||
import it.unimi.dsi.fastutil.ints.Int2DoubleLinkedOpenHashMap;
|
||||
import it.unimi.dsi.fastutil.ints.Int2DoubleMap;
|
||||
import it.unimi.dsi.fastutil.ints.Int2LongLinkedOpenHashMap;
|
||||
import it.unimi.dsi.fastutil.ints.Int2LongMap;
|
||||
import it.unimi.dsi.fastutil.longs.Long2IntLinkedOpenHashMap;
|
||||
import it.unimi.dsi.fastutil.longs.Long2IntMap;
|
||||
import it.unimi.dsi.fastutil.objects.Object2IntLinkedOpenHashMap;
|
||||
|
@ -35,18 +31,15 @@ import javax.annotation.Nullable;
|
|||
|
||||
/**
|
||||
* Ingestion time dictionary identifier lookup, used by {@link NestedDataColumnSerializer} to build a global dictionary
|
||||
* id to value mapping for the 'stacked' global value dictionaries. Also provides reverse lookup for numeric values,
|
||||
* since they serialize a value column
|
||||
* id to value mapping for the 'stacked' global value dictionaries.
|
||||
*/
|
||||
public class GlobalDictionaryIdLookup
|
||||
{
|
||||
private final Object2IntMap<String> stringLookup;
|
||||
|
||||
private final Long2IntMap longLookup;
|
||||
private final Int2LongMap reverseLongLookup;
|
||||
|
||||
private final Double2IntMap doubleLookup;
|
||||
private final Int2DoubleMap reverseDoubleLookup;
|
||||
|
||||
private int dictionarySize;
|
||||
|
||||
|
@ -54,9 +47,7 @@ public class GlobalDictionaryIdLookup
|
|||
{
|
||||
this.stringLookup = new Object2IntLinkedOpenHashMap<>();
|
||||
this.longLookup = new Long2IntLinkedOpenHashMap();
|
||||
this.reverseLongLookup = new Int2LongLinkedOpenHashMap();
|
||||
this.doubleLookup = new Double2IntLinkedOpenHashMap();
|
||||
this.reverseDoubleLookup = new Int2DoubleLinkedOpenHashMap();
|
||||
}
|
||||
|
||||
public void addString(@Nullable String value)
|
||||
|
@ -82,7 +73,6 @@ public class GlobalDictionaryIdLookup
|
|||
);
|
||||
int id = dictionarySize++;
|
||||
longLookup.put(value, id);
|
||||
reverseLongLookup.put(id, value);
|
||||
}
|
||||
|
||||
public int lookupLong(@Nullable Long value)
|
||||
|
@ -93,20 +83,10 @@ public class GlobalDictionaryIdLookup
|
|||
return longLookup.get(value.longValue());
|
||||
}
|
||||
|
||||
@Nullable
|
||||
public Long lookupLong(int id)
|
||||
{
|
||||
if (id == 0) {
|
||||
return null;
|
||||
}
|
||||
return reverseLongLookup.get(id);
|
||||
}
|
||||
|
||||
public void addDouble(double value)
|
||||
{
|
||||
int id = dictionarySize++;
|
||||
doubleLookup.put(value, id);
|
||||
reverseDoubleLookup.put(id, value);
|
||||
}
|
||||
|
||||
public int lookupDouble(@Nullable Double value)
|
||||
|
@ -116,13 +96,4 @@ public class GlobalDictionaryIdLookup
|
|||
}
|
||||
return doubleLookup.get(value.doubleValue());
|
||||
}
|
||||
|
||||
@Nullable
|
||||
public Double lookupDouble(int id)
|
||||
{
|
||||
if (id == 0) {
|
||||
return null;
|
||||
}
|
||||
return reverseDoubleLookup.get(id);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -63,6 +63,7 @@ import org.apache.druid.segment.serde.DictionaryEncodedColumnPartSerde;
|
|||
import org.apache.druid.segment.serde.Serializer;
|
||||
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
|
@ -196,7 +197,7 @@ public class NestedDataColumnSerializer implements GenericColumnSerializer<Struc
|
|||
} else {
|
||||
writer = new VariantLiteralFieldColumnWriter();
|
||||
}
|
||||
writer.open();
|
||||
writer.open(field.getKey());
|
||||
fieldWriters.put(field.getKey(), writer);
|
||||
}
|
||||
}
|
||||
|
@ -376,6 +377,12 @@ public class NestedDataColumnSerializer implements GenericColumnSerializer<Struc
|
|||
{
|
||||
return (T) value;
|
||||
}
|
||||
|
||||
void writeValue(@Nullable T value) throws IOException
|
||||
{
|
||||
// do nothing, if a value column is present this method should be overridden to write the value to the serializer
|
||||
}
|
||||
|
||||
abstract int lookupGlobalId(T value);
|
||||
|
||||
abstract void writeColumnTo(WritableByteChannel channel, FileSmoosher smoosher) throws IOException;
|
||||
|
@ -398,17 +405,12 @@ public class NestedDataColumnSerializer implements GenericColumnSerializer<Struc
|
|||
encodedValueSerializer.open();
|
||||
}
|
||||
|
||||
void serializeRow(int globalId, int localId) throws IOException
|
||||
{
|
||||
encodedValueSerializer.addValue(localId);
|
||||
}
|
||||
|
||||
long getSerializedColumnSize() throws IOException
|
||||
{
|
||||
return Integer.BYTES + Integer.BYTES + encodedValueSerializer.getSerializedSize();
|
||||
}
|
||||
|
||||
public void open() throws IOException
|
||||
public void open(String field) throws IOException
|
||||
{
|
||||
intermediateValueWriter = new FixedIndexedIntWriter(segmentWriteOutMedium, false);
|
||||
intermediateValueWriter.open();
|
||||
|
@ -435,6 +437,7 @@ public class NestedDataColumnSerializer implements GenericColumnSerializer<Struc
|
|||
final int globalId = lookupGlobalId(value);
|
||||
final int localId = localDictionary.add(globalId);
|
||||
intermediateValueWriter.write(localId);
|
||||
writeValue(value);
|
||||
}
|
||||
|
||||
public void writeTo(String field, FileSmoosher smoosher) throws IOException
|
||||
|
@ -472,10 +475,8 @@ public class NestedDataColumnSerializer implements GenericColumnSerializer<Struc
|
|||
int rowCount = 0;
|
||||
while (rows.hasNext()) {
|
||||
final int unsortedLocalId = rows.nextInt();
|
||||
final int globalId = unsortedToGlobal[unsortedLocalId];
|
||||
final int sortedLocalId = unsortedToSorted[unsortedLocalId];
|
||||
|
||||
serializeRow(globalId, sortedLocalId);
|
||||
encodedValueSerializer.addValue(sortedLocalId);
|
||||
bitmaps[sortedLocalId].add(rowCount++);
|
||||
}
|
||||
|
||||
|
@ -551,12 +552,12 @@ public class NestedDataColumnSerializer implements GenericColumnSerializer<Struc
|
|||
}
|
||||
|
||||
@Override
|
||||
void openColumnSerializer(String field, SegmentWriteOutMedium medium, int maxId) throws IOException
|
||||
public void open(String field) throws IOException
|
||||
{
|
||||
super.openColumnSerializer(field, medium, maxId);
|
||||
super.open(field);
|
||||
longsSerializer = CompressionFactory.getLongSerializer(
|
||||
field,
|
||||
medium,
|
||||
segmentWriteOutMedium,
|
||||
StringUtils.format("%s.long_column", name),
|
||||
ByteOrder.nativeOrder(),
|
||||
indexSpec.getLongEncoding(),
|
||||
|
@ -566,14 +567,12 @@ public class NestedDataColumnSerializer implements GenericColumnSerializer<Struc
|
|||
}
|
||||
|
||||
@Override
|
||||
void serializeRow(int globalId, int localId) throws IOException
|
||||
void writeValue(@Nullable Long value) throws IOException
|
||||
{
|
||||
super.serializeRow(globalId, localId);
|
||||
Long l = globalDictionaryIdLookup.lookupLong(globalId);
|
||||
if (l == null) {
|
||||
if (value == null) {
|
||||
longsSerializer.add(0L);
|
||||
} else {
|
||||
longsSerializer.add(l);
|
||||
longsSerializer.add(value);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -603,12 +602,12 @@ public class NestedDataColumnSerializer implements GenericColumnSerializer<Struc
|
|||
}
|
||||
|
||||
@Override
|
||||
void openColumnSerializer(String field, SegmentWriteOutMedium medium, int maxId) throws IOException
|
||||
public void open(String field) throws IOException
|
||||
{
|
||||
super.openColumnSerializer(field, medium, maxId);
|
||||
super.open(field);
|
||||
doublesSerializer = CompressionFactory.getDoubleSerializer(
|
||||
field,
|
||||
medium,
|
||||
segmentWriteOutMedium,
|
||||
StringUtils.format("%s.double_column", name),
|
||||
ByteOrder.nativeOrder(),
|
||||
indexSpec.getDimensionCompression()
|
||||
|
@ -617,14 +616,12 @@ public class NestedDataColumnSerializer implements GenericColumnSerializer<Struc
|
|||
}
|
||||
|
||||
@Override
|
||||
void serializeRow(int globalId, int localId) throws IOException
|
||||
void writeValue(@Nullable Double value) throws IOException
|
||||
{
|
||||
super.serializeRow(globalId, localId);
|
||||
Double d = globalDictionaryIdLookup.lookupDouble(globalId);
|
||||
if (d == null) {
|
||||
if (value == null) {
|
||||
doublesSerializer.add(0.0);
|
||||
} else {
|
||||
doublesSerializer.add(d);
|
||||
doublesSerializer.add(value);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue