From 23b78c0f955860d51add780178ef8a4526b334db Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Tue, 12 Sep 2023 21:01:18 -0700 Subject: [PATCH] use mmap for nested column value to dictionary id lookup for more chill heap usage during serialization (#14919) --- .../druid/segment/AutoTypeColumnMerger.java | 3 +- .../segment/NestedDataColumnMergerV4.java | 13 +- .../column/StringEncodingStrategies.java | 37 ++ .../segment/data/FixedIndexedWriter.java | 12 +- .../segment/data/FrontCodedIndexedWriter.java | 1 + .../segment/data/GenericIndexedWriter.java | 12 +- .../segment/nested/DictionaryIdLookup.java | 352 ++++++++++++++---- .../NestedCommonFormatColumnSerializer.java | 8 + .../nested/NestedDataColumnSerializer.java | 45 ++- .../nested/NestedDataColumnSerializerV4.java | 15 +- .../nested/NestedDataColumnSupplier.java | 73 +--- .../nested/NestedDataColumnSupplierV4.java | 112 +----- .../nested/ScalarDoubleColumnSerializer.java | 24 +- .../nested/ScalarLongColumnSerializer.java | 24 +- ...larNestedCommonFormatColumnSerializer.java | 8 +- .../ScalarStringColumnAndIndexSupplier.java | 43 +-- .../nested/ScalarStringColumnSerializer.java | 27 +- .../nested/VariantColumnAndIndexSupplier.java | 51 +-- .../nested/VariantColumnSerializer.java | 44 ++- .../DictionaryEncodedColumnPartSerde.java | 40 +- .../apache/druid/query/DoubleStorageTest.java | 3 +- .../druid/segment/data/FixedIndexedTest.java | 29 ++ 22 files changed, 571 insertions(+), 405 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/segment/AutoTypeColumnMerger.java b/processing/src/main/java/org/apache/druid/segment/AutoTypeColumnMerger.java index 7aadc5cd53c..7c978f63fec 100644 --- a/processing/src/main/java/org/apache/druid/segment/AutoTypeColumnMerger.java +++ b/processing/src/main/java/org/apache/druid/segment/AutoTypeColumnMerger.java @@ -228,13 +228,12 @@ public class AutoTypeColumnMerger implements DimensionMergerV9 } else { // all the bells and whistles logicalType = ColumnType.NESTED_DATA; - final NestedDataColumnSerializer defaultSerializer = new NestedDataColumnSerializer( + serializer = new NestedDataColumnSerializer( name, indexSpec, segmentWriteOutMedium, closer ); - serializer = defaultSerializer; } serializer.openDictionaryWriter(); diff --git a/processing/src/main/java/org/apache/druid/segment/NestedDataColumnMergerV4.java b/processing/src/main/java/org/apache/druid/segment/NestedDataColumnMergerV4.java index 9a0dc139fbf..6006dbdab2e 100644 --- a/processing/src/main/java/org/apache/druid/segment/NestedDataColumnMergerV4.java +++ b/processing/src/main/java/org/apache/druid/segment/NestedDataColumnMergerV4.java @@ -57,7 +57,7 @@ public class NestedDataColumnMergerV4 implements DimensionMergerV9 private final Closer closer; private ColumnDescriptor.Builder descriptorBuilder; - private GenericColumnSerializer serializer; + private NestedDataColumnSerializerV4 serializer; public NestedDataColumnMergerV4( String name, @@ -111,13 +111,12 @@ public class NestedDataColumnMergerV4 implements DimensionMergerV9 descriptorBuilder = new ColumnDescriptor.Builder(); - final NestedDataColumnSerializerV4 defaultSerializer = new NestedDataColumnSerializerV4( + serializer = new NestedDataColumnSerializerV4( name, indexSpec, segmentWriteOutMedium, closer ); - serializer = defaultSerializer; final ComplexColumnPartSerde partSerde = ComplexColumnPartSerde.serializerBuilder() .withTypeName(NestedDataComplexTypeSerde.TYPE_NAME) @@ -127,14 +126,14 @@ public class NestedDataColumnMergerV4 implements DimensionMergerV9 .setHasMultipleValues(false) .addSerde(partSerde); - defaultSerializer.open(); - defaultSerializer.serializeFields(mergedFields); + serializer.open(); + serializer.serializeFields(mergedFields); int stringCardinality; int longCardinality; int doubleCardinality; if (numMergeIndex == 1) { - defaultSerializer.serializeDictionaries( + serializer.serializeDictionaries( sortedLookup.getSortedStrings(), sortedLookup.getSortedLongs(), sortedLookup.getSortedDoubles() @@ -155,7 +154,7 @@ public class NestedDataColumnMergerV4 implements DimensionMergerV9 sortedDoubleLookups, DOUBLE_MERGING_COMPARATOR ); - defaultSerializer.serializeDictionaries( + serializer.serializeDictionaries( () -> stringIterator, () -> longIterator, () -> doubleIterator diff --git a/processing/src/main/java/org/apache/druid/segment/column/StringEncodingStrategies.java b/processing/src/main/java/org/apache/druid/segment/column/StringEncodingStrategies.java index c4a9b0661a9..bf95f1ed70c 100644 --- a/processing/src/main/java/org/apache/druid/segment/column/StringEncodingStrategies.java +++ b/processing/src/main/java/org/apache/druid/segment/column/StringEncodingStrategies.java @@ -19,12 +19,15 @@ package org.apache.druid.segment.column; +import com.google.common.base.Supplier; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper; import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.data.DictionaryWriter; import org.apache.druid.segment.data.EncodedStringDictionaryWriter; +import org.apache.druid.segment.data.FrontCodedIndexed; import org.apache.druid.segment.data.FrontCodedIndexedWriter; import org.apache.druid.segment.data.GenericIndexed; import org.apache.druid.segment.data.GenericIndexedWriter; @@ -33,6 +36,7 @@ import org.apache.druid.segment.writeout.SegmentWriteOutMedium; import javax.annotation.Nullable; import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.util.Iterator; public class StringEncodingStrategies @@ -67,6 +71,39 @@ public class StringEncodingStrategies } } + public static Supplier> getStringDictionarySupplier( + SmooshedFileMapper mapper, + ByteBuffer stringDictionaryBuffer, + ByteOrder byteOrder + ) + { + final int dictionaryStartPosition = stringDictionaryBuffer.position(); + final byte dictionaryVersion = stringDictionaryBuffer.get(); + + if (dictionaryVersion == EncodedStringDictionaryWriter.VERSION) { + final byte encodingId = stringDictionaryBuffer.get(); + if (encodingId == StringEncodingStrategy.FRONT_CODED_ID) { + return FrontCodedIndexed.read( + stringDictionaryBuffer, + byteOrder + ); + } else if (encodingId == StringEncodingStrategy.UTF8_ID) { + // this cannot happen naturally right now since generic indexed is written in the 'legacy' format, but + // this provides backwards compatibility should we switch at some point in the future to always + // writing dictionaryVersion + return GenericIndexed.read(stringDictionaryBuffer, GenericIndexed.UTF8_STRATEGY, mapper)::singleThreaded; + } else { + throw new ISE("impossible, unknown encoding strategy id: %s", encodingId); + } + } else { + // legacy format that only supports plain utf8 enoding stored in GenericIndexed and the byte we are reading + // as dictionaryVersion is actually also the GenericIndexed version, so we reset start position so the + // GenericIndexed version can be correctly read + stringDictionaryBuffer.position(dictionaryStartPosition); + return GenericIndexed.read(stringDictionaryBuffer, GenericIndexed.UTF8_STRATEGY, mapper)::singleThreaded; + } + } + /** * Adapter to convert {@link Indexed} with utf8 encoded bytes into {@link Indexed} to be friendly * to consumers. diff --git a/processing/src/main/java/org/apache/druid/segment/data/FixedIndexedWriter.java b/processing/src/main/java/org/apache/druid/segment/data/FixedIndexedWriter.java index 01468566574..b1b473b3419 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/FixedIndexedWriter.java +++ b/processing/src/main/java/org/apache/druid/segment/data/FixedIndexedWriter.java @@ -155,7 +155,7 @@ public class FixedIndexedWriter implements DictionaryWriter if (index == 0 && hasNulls) { return null; } - int startOffset = index * width; + int startOffset = (hasNulls ? index - 1 : index) * width; readBuffer.clear(); valuesOut.readFully(startOffset, readBuffer); readBuffer.clear(); @@ -197,14 +197,14 @@ public class FixedIndexedWriter implements DictionaryWriter { iteratorBuffer.clear(); try { - if (totalCount - pos < PAGE_SIZE) { - int size = (totalCount - pos) * width; + if (numWritten - (pos - startPos) < PAGE_SIZE) { + int size = (numWritten - (pos - startPos)) * width; iteratorBuffer.limit(size); - valuesOut.readFully((long) pos * width, iteratorBuffer); + valuesOut.readFully((long) (pos - startPos) * width, iteratorBuffer); } else { - valuesOut.readFully((long) pos * width, iteratorBuffer); + valuesOut.readFully((long) (pos - startPos) * width, iteratorBuffer); } - iteratorBuffer.flip(); + iteratorBuffer.clear(); } catch (IOException e) { throw new RuntimeException(e); diff --git a/processing/src/main/java/org/apache/druid/segment/data/FrontCodedIndexedWriter.java b/processing/src/main/java/org/apache/druid/segment/data/FrontCodedIndexedWriter.java index efdf7336d16..c24d2e55d71 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/FrontCodedIndexedWriter.java +++ b/processing/src/main/java/org/apache/druid/segment/data/FrontCodedIndexedWriter.java @@ -234,6 +234,7 @@ public class FrontCodedIndexedWriter implements DictionaryWriter { getOffsetBuffer.clear(); headerOut.readFully(index * (long) Integer.BYTES, getOffsetBuffer); + getOffsetBuffer.clear(); return getOffsetBuffer.getInt(0); } diff --git a/processing/src/main/java/org/apache/druid/segment/data/GenericIndexedWriter.java b/processing/src/main/java/org/apache/druid/segment/data/GenericIndexedWriter.java index 394fccba06b..a69d645be39 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/GenericIndexedWriter.java +++ b/processing/src/main/java/org/apache/druid/segment/data/GenericIndexedWriter.java @@ -24,6 +24,7 @@ import com.google.common.base.Preconditions; import com.google.common.primitives.Ints; import it.unimi.dsi.fastutil.longs.LongArrayList; import it.unimi.dsi.fastutil.longs.LongList; +import org.apache.druid.common.config.NullHandling; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; @@ -293,7 +294,16 @@ public class GenericIndexedWriter implements DictionaryWriter long endOffset = getOffset(index); int valueSize = checkedCastNonnegativeLongToInt(endOffset - startOffset); if (valueSize == 0) { - return null; + if (NullHandling.replaceWithDefault()) { + return null; + } + ByteBuffer bb = ByteBuffer.allocate(Integer.BYTES); + valuesOut.readFully(startOffset - Integer.BYTES, bb); + bb.flip(); + if (bb.getInt() < 0) { + return null; + } + return strategy.fromByteBuffer(bb, 0); } ByteBuffer bb = ByteBuffer.allocate(valueSize); valuesOut.readFully(startOffset, bb); diff --git a/processing/src/main/java/org/apache/druid/segment/nested/DictionaryIdLookup.java b/processing/src/main/java/org/apache/druid/segment/nested/DictionaryIdLookup.java index dc6a95e7006..a4fd0907066 100644 --- a/processing/src/main/java/org/apache/druid/segment/nested/DictionaryIdLookup.java +++ b/processing/src/main/java/org/apache/druid/segment/nested/DictionaryIdLookup.java @@ -19,114 +19,308 @@ package org.apache.druid.segment.nested; -import com.google.common.base.Preconditions; -import it.unimi.dsi.fastutil.doubles.Double2IntLinkedOpenHashMap; -import it.unimi.dsi.fastutil.doubles.Double2IntMap; -import it.unimi.dsi.fastutil.longs.Long2IntLinkedOpenHashMap; -import it.unimi.dsi.fastutil.longs.Long2IntMap; -import it.unimi.dsi.fastutil.objects.Object2IntAVLTreeMap; -import it.unimi.dsi.fastutil.objects.Object2IntLinkedOpenHashMap; -import it.unimi.dsi.fastutil.objects.Object2IntMap; -import org.apache.druid.segment.data.FrontCodedIntArrayIndexedWriter; +import com.google.common.primitives.Ints; +import org.apache.druid.annotations.SuppressFBWarnings; +import org.apache.druid.error.DruidException; +import org.apache.druid.java.util.common.ByteBufferUtils; +import org.apache.druid.java.util.common.FileUtils; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.io.smoosh.FileSmoosher; +import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper; +import org.apache.druid.java.util.common.io.smoosh.SmooshedWriter; +import org.apache.druid.segment.column.StringEncodingStrategies; +import org.apache.druid.segment.column.TypeStrategies; +import org.apache.druid.segment.data.DictionaryWriter; +import org.apache.druid.segment.data.FixedIndexed; +import org.apache.druid.segment.data.FrontCodedIntArrayIndexed; +import org.apache.druid.segment.data.Indexed; import javax.annotation.Nullable; +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.channels.GatheringByteChannel; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.EnumSet; /** - * Ingestion time dictionary identifier lookup, used by {@link NestedCommonFormatColumnSerializer} to build a global - * dictionary id to value mapping for the 'stacked' global value dictionaries. + * Value to dictionary id lookup, backed with memory mapped dictionaries populated lazily by the supplied + * @link DictionaryWriter}. */ -public class DictionaryIdLookup +public final class DictionaryIdLookup implements Closeable { - private final Object2IntMap stringLookup; + private final String name; + @Nullable + private final DictionaryWriter stringDictionaryWriter; + private SmooshedFileMapper stringBufferMapper = null; + private Indexed stringDictionary = null; - private final Long2IntMap longLookup; + @Nullable + private final DictionaryWriter longDictionaryWriter; + private MappedByteBuffer longBuffer = null; + private FixedIndexed longDictionary = null; - private final Double2IntMap doubleLookup; + @Nullable + private final DictionaryWriter doubleDictionaryWriter; + MappedByteBuffer doubleBuffer = null; + FixedIndexed doubleDictionary = null; - private final Object2IntMap arrayLookup; + @Nullable + private final DictionaryWriter arrayDictionaryWriter; + private MappedByteBuffer arrayBuffer = null; + private FrontCodedIntArrayIndexed arrayDictionary = null; - private int dictionarySize; - - public DictionaryIdLookup() + public DictionaryIdLookup( + String name, + @Nullable DictionaryWriter stringDictionaryWriter, + @Nullable DictionaryWriter longDictionaryWriter, + @Nullable DictionaryWriter doubleDictionaryWriter, + @Nullable DictionaryWriter arrayDictionaryWriter + ) { - this.stringLookup = new Object2IntLinkedOpenHashMap<>(); - stringLookup.defaultReturnValue(-1); - this.longLookup = new Long2IntLinkedOpenHashMap(); - longLookup.defaultReturnValue(-1); - this.doubleLookup = new Double2IntLinkedOpenHashMap(); - doubleLookup.defaultReturnValue(-1); - this.arrayLookup = new Object2IntAVLTreeMap<>(FrontCodedIntArrayIndexedWriter.ARRAY_COMPARATOR); - this.arrayLookup.defaultReturnValue(-1); - } - - public void addString(@Nullable String value) - { - Preconditions.checkState( - longLookup.size() == 0 && doubleLookup.size() == 0, - "All string values must be inserted to the lookup before long and double types" - ); - int id = dictionarySize++; - stringLookup.put(value, id); - } - - // used when there are no string values to ensure that 0 is used for the null value - public void addNumericNull() - { - Preconditions.checkState( - stringLookup.size() == 0 && longLookup.size() == 0 && doubleLookup.size() == 0, - "Lookup must be empty to add implicit null" - ); - dictionarySize++; + this.name = name; + this.stringDictionaryWriter = stringDictionaryWriter; + this.longDictionaryWriter = longDictionaryWriter; + this.doubleDictionaryWriter = doubleDictionaryWriter; + this.arrayDictionaryWriter = arrayDictionaryWriter; } public int lookupString(@Nullable String value) { - return stringLookup.getInt(value); - } - - public void addLong(long value) - { - Preconditions.checkState( - doubleLookup.size() == 0, - "All long values must be inserted to the lookup before double types" - ); - int id = dictionarySize++; - longLookup.put(value, id); + if (stringDictionary == null) { + // GenericIndexed v2 can write to multiple files if the dictionary is larger than 2gb, so we use a smooshfile + // for strings because of this. if other type dictionary writers could potentially use multiple internal files + // in the future, we should transition them to using this approach as well (or build a combination smoosher and + // mapper so that we can have a mutable smoosh) + File stringSmoosh = FileUtils.createTempDir(name + "__stringTempSmoosh"); + final String fileName = NestedCommonFormatColumnSerializer.getInternalFileName( + name, + NestedCommonFormatColumnSerializer.STRING_DICTIONARY_FILE_NAME + ); + final FileSmoosher smoosher = new FileSmoosher(stringSmoosh); + try (final SmooshedWriter writer = smoosher.addWithSmooshedWriter( + fileName, + stringDictionaryWriter.getSerializedSize() + )) { + stringDictionaryWriter.writeTo(writer, smoosher); + writer.close(); + smoosher.close(); + stringBufferMapper = SmooshedFileMapper.load(stringSmoosh); + final ByteBuffer stringBuffer = stringBufferMapper.mapFile(fileName); + stringDictionary = StringEncodingStrategies.getStringDictionarySupplier( + stringBufferMapper, + stringBuffer, + ByteOrder.nativeOrder() + ).get(); + } + catch (IOException e) { + throw new RuntimeException(e); + } + } + final byte[] bytes = StringUtils.toUtf8Nullable(value); + final int index = stringDictionary.indexOf(bytes == null ? null : ByteBuffer.wrap(bytes)); + if (index < 0) { + throw DruidException.defensive("Value not found in string dictionary"); + } + return index; } public int lookupLong(@Nullable Long value) { - if (value == null) { - return 0; + if (longDictionary == null) { + Path longFile = makeTempFile(name + NestedCommonFormatColumnSerializer.LONG_DICTIONARY_FILE_NAME); + longBuffer = mapWriter(longFile, longDictionaryWriter); + longDictionary = FixedIndexed.read(longBuffer, TypeStrategies.LONG, ByteOrder.nativeOrder(), Long.BYTES).get(); + // reset position + longBuffer.position(0); } - return longLookup.get(value.longValue()); - } - - public void addDouble(double value) - { - int id = dictionarySize++; - doubleLookup.put(value, id); + final int index = longDictionary.indexOf(value); + if (index < 0) { + throw DruidException.defensive("Value not found in long dictionary"); + } + return index + longOffset(); } public int lookupDouble(@Nullable Double value) { - if (value == null) { - return 0; + if (doubleDictionary == null) { + Path doubleFile = makeTempFile(name + NestedCommonFormatColumnSerializer.DOUBLE_DICTIONARY_FILE_NAME); + doubleBuffer = mapWriter(doubleFile, doubleDictionaryWriter); + doubleDictionary = FixedIndexed.read(doubleBuffer, TypeStrategies.DOUBLE, ByteOrder.nativeOrder(), Double.BYTES).get(); + // reset position + doubleBuffer.position(0); } - return doubleLookup.get(value.doubleValue()); - } - - public void addArray(int[] value) - { - int id = dictionarySize++; - arrayLookup.put(value, id); + final int index = doubleDictionary.indexOf(value); + if (index < 0) { + throw DruidException.defensive("Value not found in double dictionary"); + } + return index + doubleOffset(); } public int lookupArray(@Nullable int[] value) { - if (value == null) { - return 0; + if (arrayDictionary == null) { + Path arrayFile = makeTempFile(name + NestedCommonFormatColumnSerializer.ARRAY_DICTIONARY_FILE_NAME); + arrayBuffer = mapWriter(arrayFile, arrayDictionaryWriter); + arrayDictionary = FrontCodedIntArrayIndexed.read(arrayBuffer, ByteOrder.nativeOrder()).get(); + // reset position + arrayBuffer.position(0); } - return arrayLookup.getInt(value); + final int index = arrayDictionary.indexOf(value); + if (index < 0) { + throw DruidException.defensive("Value not found in array dictionary"); + } + return index + arrayOffset(); + } + + @Nullable + public SmooshedFileMapper getStringBufferMapper() + { + return stringBufferMapper; + } + + @Nullable + public ByteBuffer getLongBuffer() + { + return longBuffer; + } + + @Nullable + public ByteBuffer getDoubleBuffer() + { + return doubleBuffer; + } + + @Nullable + public ByteBuffer getArrayBuffer() + { + return arrayBuffer; + } + + @Override + public void close() + { + if (stringBufferMapper != null) { + stringBufferMapper.close(); + } + if (longBuffer != null) { + ByteBufferUtils.unmap(longBuffer); + } + if (doubleBuffer != null) { + ByteBufferUtils.unmap(doubleBuffer); + } + if (arrayBuffer != null) { + ByteBufferUtils.unmap(arrayBuffer); + } + } + + private int longOffset() + { + return stringDictionaryWriter != null ? stringDictionaryWriter.getCardinality() : 0; + } + + private int doubleOffset() + { + return longOffset() + (longDictionaryWriter != null ? longDictionaryWriter.getCardinality() : 0); + } + + private int arrayOffset() + { + return doubleOffset() + (doubleDictionaryWriter != null ? doubleDictionaryWriter.getCardinality() : 0); + } + + private Path makeTempFile(String name) + { + try { + return Files.createTempFile(name, ".tmp"); + } + catch (IOException e) { + throw new RuntimeException(e); + } + } + + @SuppressFBWarnings("NP_NONNULL_PARAM_VIOLATION") + private MappedByteBuffer mapWriter(Path path, DictionaryWriter writer) + { + final EnumSet options = EnumSet.of( + StandardOpenOption.READ, + StandardOpenOption.WRITE, + StandardOpenOption.CREATE, + StandardOpenOption.TRUNCATE_EXISTING + ); + + try (FileChannel fileChannel = FileChannel.open(path, options); + GatheringByteChannel smooshChannel = makeWriter(fileChannel, writer.getSerializedSize())) { + //noinspection DataFlowIssue + writer.writeTo(smooshChannel, null); + return fileChannel.map(FileChannel.MapMode.READ_ONLY, 0, writer.getSerializedSize()); + } + catch (IOException e) { + throw new RuntimeException(e); + } + } + + private GatheringByteChannel makeWriter(FileChannel channel, long size) + { + // basically same code as smooshed writer, can't use channel directly because copying between channels + // doesn't handle size of source channel correctly + return new GatheringByteChannel() + { + private boolean isClosed = false; + private long currOffset = 0; + + @Override + public boolean isOpen() + { + return !isClosed; + } + + @Override + public void close() throws IOException + { + channel.close(); + isClosed = true; + } + + public int bytesLeft() + { + return (int) (size - currOffset); + } + + @Override + public int write(ByteBuffer buffer) throws IOException + { + return addToOffset(channel.write(buffer)); + } + + @Override + public long write(ByteBuffer[] srcs, int offset, int length) throws IOException + { + return addToOffset(channel.write(srcs, offset, length)); + } + + @Override + public long write(ByteBuffer[] srcs) throws IOException + { + return addToOffset(channel.write(srcs)); + } + + public int addToOffset(long numBytesWritten) + { + if (numBytesWritten > bytesLeft()) { + throw new ISE("Wrote more bytes[%,d] than available[%,d]. Don't do that.", numBytesWritten, bytesLeft()); + } + currOffset += numBytesWritten; + + return Ints.checkedCast(numBytesWritten); + } + }; } } diff --git a/processing/src/main/java/org/apache/druid/segment/nested/NestedCommonFormatColumnSerializer.java b/processing/src/main/java/org/apache/druid/segment/nested/NestedCommonFormatColumnSerializer.java index b0c18d445a8..59c7da7fd21 100644 --- a/processing/src/main/java/org/apache/druid/segment/nested/NestedCommonFormatColumnSerializer.java +++ b/processing/src/main/java/org/apache/druid/segment/nested/NestedCommonFormatColumnSerializer.java @@ -92,6 +92,14 @@ public abstract class NestedCommonFormatColumnSerializer implements GenericColum } } + protected void writeInternal(FileSmoosher smoosher, ByteBuffer buffer, String fileName) throws IOException + { + final String internalName = getInternalFileName(getColumnName(), fileName); + try (SmooshedWriter smooshChannel = smoosher.addWithSmooshedWriter(internalName, buffer.capacity())) { + smooshChannel.write(buffer); + } + } + protected void writeV0Header(WritableByteChannel channel, ByteBuffer columnNameBuffer) throws IOException { channel.write(ByteBuffer.wrap(new byte[]{NestedCommonFormatColumnSerializer.V0})); diff --git a/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializer.java b/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializer.java index 3e49edc32a0..098b6103495 100644 --- a/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializer.java +++ b/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializer.java @@ -28,6 +28,7 @@ import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.io.smoosh.FileSmoosher; +import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper; import org.apache.druid.java.util.common.io.smoosh.SmooshedWriter; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.math.expr.ExprEval; @@ -170,7 +171,6 @@ public class NestedDataColumnSerializer extends NestedCommonFormatColumnSerializ this.segmentWriteOutMedium = segmentWriteOutMedium; this.indexSpec = indexSpec; this.closer = closer; - this.globalDictionaryIdLookup = new DictionaryIdLookup(); } @Override @@ -231,6 +231,15 @@ public class NestedDataColumnSerializer extends NestedCommonFormatColumnSerializ 4 ); arrayDictionaryWriter.open(); + globalDictionaryIdLookup = closer.register( + new DictionaryIdLookup( + name, + dictionaryWriter, + longDictionaryWriter, + doubleDictionaryWriter, + arrayDictionaryWriter + ) + ); } @Override @@ -329,7 +338,6 @@ public class NestedDataColumnSerializer extends NestedCommonFormatColumnSerializ // null is always 0 dictionaryWriter.write(null); - globalDictionaryIdLookup.addString(null); for (String value : strings) { value = NullHandling.emptyToNullIfNeeded(value); if (value == null) { @@ -337,7 +345,6 @@ public class NestedDataColumnSerializer extends NestedCommonFormatColumnSerializ } dictionaryWriter.write(value); - globalDictionaryIdLookup.addString(value); } dictionarySerialized = true; @@ -346,7 +353,6 @@ public class NestedDataColumnSerializer extends NestedCommonFormatColumnSerializ continue; } longDictionaryWriter.write(value); - globalDictionaryIdLookup.addLong(value); } for (Double value : doubles) { @@ -354,7 +360,6 @@ public class NestedDataColumnSerializer extends NestedCommonFormatColumnSerializ continue; } doubleDictionaryWriter.write(value); - globalDictionaryIdLookup.addDouble(value); } for (int[] value : arrays) { @@ -362,7 +367,6 @@ public class NestedDataColumnSerializer extends NestedCommonFormatColumnSerializ continue; } arrayDictionaryWriter.write(value); - globalDictionaryIdLookup.addArray(value); } dictionarySerialized = true; } @@ -433,10 +437,31 @@ public class NestedDataColumnSerializer extends NestedCommonFormatColumnSerializ writeV0Header(channel, columnNameBytes); fieldsWriter.writeTo(channel, smoosher); fieldsInfoWriter.writeTo(channel, smoosher); - writeInternal(smoosher, dictionaryWriter, STRING_DICTIONARY_FILE_NAME); - writeInternal(smoosher, longDictionaryWriter, LONG_DICTIONARY_FILE_NAME); - writeInternal(smoosher, doubleDictionaryWriter, DOUBLE_DICTIONARY_FILE_NAME); - writeInternal(smoosher, arrayDictionaryWriter, ARRAY_DICTIONARY_FILE_NAME); + + + if (globalDictionaryIdLookup.getStringBufferMapper() != null) { + SmooshedFileMapper fileMapper = globalDictionaryIdLookup.getStringBufferMapper(); + for (String internalName : fileMapper.getInternalFilenames()) { + smoosher.add(internalName, fileMapper.mapFile(internalName)); + } + } else { + writeInternal(smoosher, dictionaryWriter, STRING_DICTIONARY_FILE_NAME); + } + if (globalDictionaryIdLookup.getLongBuffer() != null) { + writeInternal(smoosher, globalDictionaryIdLookup.getLongBuffer(), LONG_DICTIONARY_FILE_NAME); + } else { + writeInternal(smoosher, longDictionaryWriter, LONG_DICTIONARY_FILE_NAME); + } + if (globalDictionaryIdLookup.getDoubleBuffer() != null) { + writeInternal(smoosher, globalDictionaryIdLookup.getDoubleBuffer(), DOUBLE_DICTIONARY_FILE_NAME); + } else { + writeInternal(smoosher, doubleDictionaryWriter, DOUBLE_DICTIONARY_FILE_NAME); + } + if (globalDictionaryIdLookup.getArrayBuffer() != null) { + writeInternal(smoosher, globalDictionaryIdLookup.getArrayBuffer(), ARRAY_DICTIONARY_FILE_NAME); + } else { + writeInternal(smoosher, arrayDictionaryWriter, ARRAY_DICTIONARY_FILE_NAME); + } writeInternal(smoosher, rawWriter, RAW_FILE_NAME); if (!nullRowsBitmap.isEmpty()) { writeInternal(smoosher, nullBitmapWriter, NULL_BITMAP_FILE_NAME); diff --git a/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializerV4.java b/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializerV4.java index f10efd2d241..ceab8dcf55b 100644 --- a/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializerV4.java +++ b/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializerV4.java @@ -144,7 +144,6 @@ public class NestedDataColumnSerializerV4 implements GenericColumnSerializer fields) throws IOException @@ -263,7 +272,6 @@ public class NestedDataColumnSerializerV4 implements GenericColumnSerializer stringDictionary; - final Supplier frontCodedStringDictionarySupplier; + final Supplier> stringDictionarySupplier; final Supplier> longDictionarySupplier; final Supplier> doubleDictionarySupplier; final Supplier arrayDictionarySupplier; @@ -82,34 +79,12 @@ public class NestedDataColumnSupplier implements Supplier stringDictionary; - private final Supplier frontCodedStringDictionarySupplier; + private final Supplier> stringDictionarySupplier; private final Supplier> longDictionarySupplier; private final Supplier> doubleDictionarySupplier; private final Supplier arrayDictionarySupplier; @@ -217,8 +190,7 @@ public class NestedDataColumnSupplier implements Supplier stringDictionary, - Supplier frontCodedStringDictionarySupplier, + Supplier> stringDictionarySupplier, Supplier> longDictionarySupplier, Supplier> doubleDictionarySupplier, Supplier arrayDictionarySupplier, @@ -234,8 +206,7 @@ public class NestedDataColumnSupplier implements Supplier( - columnName, - getLogicalType(), - columnConfig, - compressedRawColumnSupplier, - nullValues, - fields, - fieldInfo, - frontCodedStringDictionarySupplier, - longDictionarySupplier, - doubleDictionarySupplier, - arrayDictionarySupplier, - fileMapper, - bitmapSerdeFactory, - byteOrder - ); - } return new NestedDataColumnV5<>( columnName, getLogicalType(), @@ -275,7 +228,7 @@ public class NestedDataColumnSupplier implements Supplier final FieldTypeInfo fieldInfo; final CompressedVariableSizedBlobColumnSupplier compressedRawColumnSupplier; final ImmutableBitmap nullValues; - final GenericIndexed stringDictionary; - final Supplier frontCodedStringDictionarySupplier; + final Supplier> stringDictionarySupplier; final Supplier> longDictionarySupplier; final Supplier> doubleDictionarySupplier; final Supplier arrayDictionarySupplier; @@ -118,34 +115,11 @@ public class NestedDataColumnSupplierV4 implements Supplier NestedCommonFormatColumnSerializer.STRING_DICTIONARY_FILE_NAME ); - final int dictionaryStartPosition = stringDictionaryBuffer.position(); - final byte dictionaryVersion = stringDictionaryBuffer.get(); - - if (dictionaryVersion == EncodedStringDictionaryWriter.VERSION) { - final byte encodingId = stringDictionaryBuffer.get(); - if (encodingId == StringEncodingStrategy.FRONT_CODED_ID) { - frontCodedStringDictionarySupplier = FrontCodedIndexed.read( - stringDictionaryBuffer, - metadata.getByteOrder() - ); - stringDictionary = null; - } else if (encodingId == StringEncodingStrategy.UTF8_ID) { - // this cannot happen naturally right now since generic indexed is written in the 'legacy' format, but - // this provides backwards compatibility should we switch at some point in the future to always - // writing dictionaryVersion - stringDictionary = GenericIndexed.read(stringDictionaryBuffer, GenericIndexed.UTF8_STRATEGY, mapper); - frontCodedStringDictionarySupplier = null; - } else { - throw new ISE("impossible, unknown encoding strategy id: %s", encodingId); - } - } else { - // legacy format that only supports plain utf8 enoding stored in GenericIndexed and the byte we are reading - // as dictionaryVersion is actually also the GenericIndexed version, so we reset start position so the - // GenericIndexed version can be correctly read - stringDictionaryBuffer.position(dictionaryStartPosition); - stringDictionary = GenericIndexed.read(stringDictionaryBuffer, GenericIndexed.UTF8_STRATEGY, mapper); - frontCodedStringDictionarySupplier = null; - } + stringDictionarySupplier = StringEncodingStrategies.getStringDictionarySupplier( + mapper, + stringDictionaryBuffer, + metadata.getByteOrder() + ); final ByteBuffer longDictionaryBuffer = loadInternalFile( mapper, metadata, @@ -210,8 +184,7 @@ public class NestedDataColumnSupplierV4 implements Supplier fieldInfo, compressedRawColumnSupplier, nullValues, - stringDictionary, - frontCodedStringDictionarySupplier, + stringDictionarySupplier, longDictionarySupplier, doubleDictionarySupplier, arrayDictionarySupplier, @@ -236,8 +209,7 @@ public class NestedDataColumnSupplierV4 implements Supplier private final FieldTypeInfo fieldInfo; private final CompressedVariableSizedBlobColumnSupplier compressedRawColumnSupplier; private final ImmutableBitmap nullValues; - private final GenericIndexed stringDictionary; - private final Supplier frontCodedStringDictionarySupplier; + private final Supplier> stringDictionarySupplier; private final Supplier> longDictionarySupplier; private final Supplier> doubleDictionarySupplier; private final Supplier arrayDictionarySupplier; @@ -257,8 +229,7 @@ public class NestedDataColumnSupplierV4 implements Supplier FieldTypeInfo fieldInfo, CompressedVariableSizedBlobColumnSupplier compressedRawColumnSupplier, ImmutableBitmap nullValues, - GenericIndexed stringDictionary, - Supplier frontCodedStringDictionarySupplier, + Supplier> stringDictionarySupplier, Supplier> longDictionarySupplier, Supplier> doubleDictionarySupplier, Supplier arrayDictionarySupplier, @@ -275,8 +246,7 @@ public class NestedDataColumnSupplierV4 implements Supplier this.fieldInfo = fieldInfo; this.compressedRawColumnSupplier = compressedRawColumnSupplier; this.nullValues = nullValues; - this.stringDictionary = stringDictionary; - this.frontCodedStringDictionarySupplier = frontCodedStringDictionarySupplier; + this.stringDictionarySupplier = stringDictionarySupplier; this.longDictionarySupplier = longDictionarySupplier; this.doubleDictionarySupplier = doubleDictionarySupplier; this.arrayDictionarySupplier = arrayDictionarySupplier; @@ -307,23 +277,6 @@ public class NestedDataColumnSupplierV4 implements Supplier private NestedDataColumnV3 makeV3() { - if (frontCodedStringDictionarySupplier != null) { - return new NestedDataColumnV3<>( - columnName, - logicalType, - columnConfig, - compressedRawColumnSupplier, - nullValues, - fields, - fieldInfo, - frontCodedStringDictionarySupplier, - longDictionarySupplier, - doubleDictionarySupplier, - fileMapper, - bitmapSerdeFactory, - byteOrder - ); - } return new NestedDataColumnV3<>( columnName, logicalType, @@ -332,7 +285,7 @@ public class NestedDataColumnSupplierV4 implements Supplier nullValues, fields, fieldInfo, - stringDictionary::singleThreaded, + stringDictionarySupplier, longDictionarySupplier, doubleDictionarySupplier, fileMapper, @@ -343,23 +296,6 @@ public class NestedDataColumnSupplierV4 implements Supplier private NestedDataColumnV4 makeV4() { - if (frontCodedStringDictionarySupplier != null) { - return new NestedDataColumnV4<>( - columnName, - logicalType, - columnConfig, - compressedRawColumnSupplier, - nullValues, - fields, - fieldInfo, - frontCodedStringDictionarySupplier, - longDictionarySupplier, - doubleDictionarySupplier, - fileMapper, - bitmapSerdeFactory, - byteOrder - ); - } return new NestedDataColumnV4<>( columnName, logicalType, @@ -368,7 +304,7 @@ public class NestedDataColumnSupplierV4 implements Supplier nullValues, fields, fieldInfo, - stringDictionary::singleThreaded, + stringDictionarySupplier, longDictionarySupplier, doubleDictionarySupplier, fileMapper, @@ -379,24 +315,6 @@ public class NestedDataColumnSupplierV4 implements Supplier private NestedDataColumnV5 makeV5() { - if (frontCodedStringDictionarySupplier != null) { - return new NestedDataColumnV5<>( - columnName, - logicalType, - columnConfig, - compressedRawColumnSupplier, - nullValues, - fields, - fieldInfo, - frontCodedStringDictionarySupplier, - longDictionarySupplier, - doubleDictionarySupplier, - arrayDictionarySupplier, - fileMapper, - bitmapSerdeFactory, - byteOrder - ); - } return new NestedDataColumnV5<>( columnName, logicalType, @@ -405,7 +323,7 @@ public class NestedDataColumnSupplierV4 implements Supplier nullValues, fields, fieldInfo, - stringDictionary::singleThreaded, + stringDictionarySupplier, longDictionarySupplier, doubleDictionarySupplier, arrayDictionarySupplier, diff --git a/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleColumnSerializer.java b/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleColumnSerializer.java index c3a23ac5e6e..c1139f58be7 100644 --- a/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleColumnSerializer.java +++ b/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleColumnSerializer.java @@ -49,7 +49,7 @@ public class ScalarDoubleColumnSerializer extends ScalarNestedCommonFormatColumn Closer closer ) { - super(name, DOUBLE_DICTIONARY_FILE_NAME, indexSpec, segmentWriteOutMedium, closer); + super(name, indexSpec, segmentWriteOutMedium, closer); } @Override @@ -73,6 +73,15 @@ public class ScalarDoubleColumnSerializer extends ScalarNestedCommonFormatColumn true ); dictionaryWriter.open(); + dictionaryIdLookup = closer.register( + new DictionaryIdLookup( + name, + null, + null, + dictionaryWriter, + null + ) + ); } @Override @@ -102,16 +111,15 @@ public class ScalarDoubleColumnSerializer extends ScalarNestedCommonFormatColumn // null is always 0 dictionaryWriter.write(null); - dictionaryIdLookup.addNumericNull(); for (Double value : doubles) { if (value == null) { continue; } dictionaryWriter.write(value); - dictionaryIdLookup.addDouble(value); } dictionarySerialized = true; + } @Override @@ -119,4 +127,14 @@ public class ScalarDoubleColumnSerializer extends ScalarNestedCommonFormatColumn { writeInternal(smoosher, doublesSerializer, DOUBLE_VALUE_COLUMN_FILE_NAME); } + + @Override + protected void writeDictionaryFile(FileSmoosher smoosher) throws IOException + { + if (dictionaryIdLookup.getDoubleBuffer() != null) { + writeInternal(smoosher, dictionaryIdLookup.getDoubleBuffer(), DOUBLE_DICTIONARY_FILE_NAME); + } else { + writeInternal(smoosher, dictionaryWriter, DOUBLE_DICTIONARY_FILE_NAME); + } + } } diff --git a/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongColumnSerializer.java b/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongColumnSerializer.java index 4d5604851dc..268a9c90e26 100644 --- a/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongColumnSerializer.java +++ b/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongColumnSerializer.java @@ -49,7 +49,7 @@ public class ScalarLongColumnSerializer extends ScalarNestedCommonFormatColumnSe Closer closer ) { - super(name, LONG_DICTIONARY_FILE_NAME, indexSpec, segmentWriteOutMedium, closer); + super(name, indexSpec, segmentWriteOutMedium, closer); } @Override @@ -74,6 +74,15 @@ public class ScalarLongColumnSerializer extends ScalarNestedCommonFormatColumnSe true ); dictionaryWriter.open(); + dictionaryIdLookup = closer.register( + new DictionaryIdLookup( + name, + null, + dictionaryWriter, + null, + null + ) + ); } @Override @@ -104,14 +113,11 @@ public class ScalarLongColumnSerializer extends ScalarNestedCommonFormatColumnSe // null is always 0 dictionaryWriter.write(null); - dictionaryIdLookup.addNumericNull(); - for (Long value : longs) { if (value == null) { continue; } dictionaryWriter.write(value); - dictionaryIdLookup.addLong(value); } dictionarySerialized = true; } @@ -121,4 +127,14 @@ public class ScalarLongColumnSerializer extends ScalarNestedCommonFormatColumnSe { writeInternal(smoosher, longsSerializer, LONG_VALUE_COLUMN_FILE_NAME); } + + @Override + protected void writeDictionaryFile(FileSmoosher smoosher) throws IOException + { + if (dictionaryIdLookup.getLongBuffer() != null) { + writeInternal(smoosher, dictionaryIdLookup.getLongBuffer(), LONG_DICTIONARY_FILE_NAME); + } else { + writeInternal(smoosher, dictionaryWriter, LONG_DICTIONARY_FILE_NAME); + } + } } diff --git a/processing/src/main/java/org/apache/druid/segment/nested/ScalarNestedCommonFormatColumnSerializer.java b/processing/src/main/java/org/apache/druid/segment/nested/ScalarNestedCommonFormatColumnSerializer.java index f3ed96942eb..2caa19ad8d6 100644 --- a/processing/src/main/java/org/apache/druid/segment/nested/ScalarNestedCommonFormatColumnSerializer.java +++ b/processing/src/main/java/org/apache/druid/segment/nested/ScalarNestedCommonFormatColumnSerializer.java @@ -52,7 +52,6 @@ public abstract class ScalarNestedCommonFormatColumnSerializer extends Nested protected final IndexSpec indexSpec; @SuppressWarnings("unused") protected final Closer closer; - protected final String dictionaryFileName; protected DictionaryIdLookup dictionaryIdLookup; protected DictionaryWriter dictionaryWriter; @@ -66,18 +65,15 @@ public abstract class ScalarNestedCommonFormatColumnSerializer extends Nested public ScalarNestedCommonFormatColumnSerializer( String name, - String dictionaryFileName, IndexSpec indexSpec, SegmentWriteOutMedium segmentWriteOutMedium, Closer closer ) { this.name = name; - this.dictionaryFileName = dictionaryFileName; this.segmentWriteOutMedium = segmentWriteOutMedium; this.indexSpec = indexSpec; this.closer = closer; - this.dictionaryIdLookup = new DictionaryIdLookup(); } /** @@ -99,6 +95,8 @@ public abstract class ScalarNestedCommonFormatColumnSerializer extends Nested */ protected abstract void writeValueColumn(FileSmoosher smoosher) throws IOException; + protected abstract void writeDictionaryFile(FileSmoosher smoosher) throws IOException; + @Override public String getColumnName() { @@ -220,7 +218,7 @@ public abstract class ScalarNestedCommonFormatColumnSerializer extends Nested } writeV0Header(channel, columnNameBytes); - writeInternal(smoosher, dictionaryWriter, dictionaryFileName); + writeDictionaryFile(smoosher); writeInternal(smoosher, encodedValueSerializer, ENCODED_VALUE_COLUMN_FILE_NAME); writeValueColumn(smoosher); writeInternal(smoosher, bitmapIndexWriter, BITMAP_INDEX_FILE_NAME); diff --git a/processing/src/main/java/org/apache/druid/segment/nested/ScalarStringColumnAndIndexSupplier.java b/processing/src/main/java/org/apache/druid/segment/nested/ScalarStringColumnAndIndexSupplier.java index 6d4cebc2900..d9ba9ee4e5d 100644 --- a/processing/src/main/java/org/apache/druid/segment/nested/ScalarStringColumnAndIndexSupplier.java +++ b/processing/src/main/java/org/apache/druid/segment/nested/ScalarStringColumnAndIndexSupplier.java @@ -21,20 +21,17 @@ package org.apache.druid.segment.nested; import com.google.common.base.Supplier; import org.apache.druid.collections.bitmap.ImmutableBitmap; -import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper; import org.apache.druid.segment.column.ColumnBuilder; import org.apache.druid.segment.column.ColumnConfig; import org.apache.druid.segment.column.ColumnIndexSupplier; -import org.apache.druid.segment.column.StringEncodingStrategy; +import org.apache.druid.segment.column.StringEncodingStrategies; import org.apache.druid.segment.column.StringUtf8DictionaryEncodedColumn; import org.apache.druid.segment.data.BitmapSerdeFactory; import org.apache.druid.segment.data.ColumnarInts; import org.apache.druid.segment.data.CompressedVSizeColumnarIntsSupplier; -import org.apache.druid.segment.data.EncodedStringDictionaryWriter; -import org.apache.druid.segment.data.FrontCodedIndexed; import org.apache.druid.segment.data.GenericIndexed; import org.apache.druid.segment.data.Indexed; import org.apache.druid.segment.data.VByte; @@ -71,39 +68,11 @@ public class ScalarStringColumnAndIndexSupplier implements Supplier stringDictionary; - final Supplier frontCodedStringDictionarySupplier; + final Supplier> stringDictionarySupplier; final Supplier> longDictionarySupplier; final Supplier> doubleDictionarySupplier; final Supplier arrayDictionarySupplier; @@ -105,34 +101,11 @@ public class VariantColumnAndIndexSupplier implements Supplier stringDictionary, - Supplier frontCodedStringDictionarySupplier, + Supplier> stringDictionarySupplier, Supplier> longDictionarySupplier, Supplier> doubleDictionarySupplier, Supplier arrayDictionarySupplier, @@ -271,9 +242,7 @@ public class VariantColumnAndIndexSupplier implements Supplier> dictionarySupplier; - - if (dictionaryVersion == EncodedStringDictionaryWriter.VERSION) { - final byte encodingId = buffer.get(); - if (encodingId == StringEncodingStrategy.FRONT_CODED_ID) { - dictionarySupplier = FrontCodedIndexed.read(buffer, byteOrder); - } else if (encodingId == StringEncodingStrategy.UTF8_ID) { - // this cannot happen naturally right now since generic indexed is written in the 'legacy' format, but - // this provides backwards compatibility should we switch at some point in the future to always - // writing dictionaryVersion - dictionarySupplier = GenericIndexed.read( + final Supplier> dictionarySupplier = + StringEncodingStrategies.getStringDictionarySupplier( + builder.getFileMapper(), buffer, - GenericIndexed.UTF8_STRATEGY, - builder.getFileMapper() - )::singleThreaded; - } else { - throw new ISE("impossible, unknown encoding strategy id: %s", encodingId); - } - } else { - // legacy format that only supports plain utf8 enoding stored in GenericIndexed and the byte we are reading - // as dictionaryVersion is actually also the GenericIndexed version, so we reset start position so the - // GenericIndexed version can be correctly read - buffer.position(dictionaryStartPosition); - dictionarySupplier = GenericIndexed.read( - buffer, - GenericIndexed.UTF8_STRATEGY, - builder.getFileMapper() - )::singleThreaded; - } + byteOrder + ); final WritableSupplier rSingleValuedColumn; final WritableSupplier rMultiValuedColumn; diff --git a/processing/src/test/java/org/apache/druid/query/DoubleStorageTest.java b/processing/src/test/java/org/apache/druid/query/DoubleStorageTest.java index f71a6511aa0..44d5725ba3d 100644 --- a/processing/src/test/java/org/apache/druid/query/DoubleStorageTest.java +++ b/processing/src/test/java/org/apache/druid/query/DoubleStorageTest.java @@ -59,6 +59,7 @@ import org.apache.druid.segment.incremental.IncrementalIndexSchema; import org.apache.druid.segment.incremental.IndexSizeExceededException; import org.apache.druid.segment.incremental.OnheapIncrementalIndex; import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; +import org.apache.druid.testing.InitializedNullHandlingTest; import org.apache.druid.timeline.SegmentId; import org.joda.time.Interval; import org.junit.After; @@ -81,7 +82,7 @@ import java.util.stream.IntStream; import java.util.stream.Stream; @RunWith(Parameterized.class) -public class DoubleStorageTest +public class DoubleStorageTest extends InitializedNullHandlingTest { private static final SegmentMetadataQueryRunnerFactory METADATA_QR_FACTORY = new SegmentMetadataQueryRunnerFactory( diff --git a/processing/src/test/java/org/apache/druid/segment/data/FixedIndexedTest.java b/processing/src/test/java/org/apache/druid/segment/data/FixedIndexedTest.java index 19303c81955..b79d5058303 100644 --- a/processing/src/test/java/org/apache/druid/segment/data/FixedIndexedTest.java +++ b/processing/src/test/java/org/apache/druid/segment/data/FixedIndexedTest.java @@ -141,6 +141,35 @@ public class FixedIndexedTest extends InitializedNullHandlingTest for (Long aLong : LONGS) { writer.write(aLong); } + Iterator longIterator = writer.getIterator(); + int ctr = 0; + int totalCount = withNull ? 1 + LONGS.length : LONGS.length; + for (int i = 0; i < totalCount; i++) { + if (withNull) { + if (i == 0) { + Assert.assertNull(writer.get(i)); + } else { + Assert.assertEquals(" index: " + i, LONGS[i - 1], writer.get(i)); + } + } else { + Assert.assertEquals(" index: " + i, LONGS[i], writer.get(i)); + } + } + while (longIterator.hasNext()) { + if (withNull) { + if (ctr == 0) { + Assert.assertNull(longIterator.next()); + Assert.assertNull(writer.get(ctr)); + } else { + Assert.assertEquals(LONGS[ctr - 1], longIterator.next()); + Assert.assertEquals(LONGS[ctr - 1], writer.get(ctr)); + } + } else { + Assert.assertEquals(LONGS[ctr], longIterator.next()); + Assert.assertEquals(LONGS[ctr], writer.get(ctr)); + } + ctr++; + } WritableByteChannel channel = new WritableByteChannel() { @Override