From b645d09c5dc5791e538532efbd44e203e005a9fb Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Mon, 22 Jul 2024 21:14:30 -0700 Subject: [PATCH] move long and double nested field serialization to later phase of serialization (#16769) changes: * moves value column serializer initialization, call to `writeValue` method to `GlobalDictionaryEncodedFieldColumnWriter.writeTo` instead of during `GlobalDictionaryEncodedFieldColumnWriter.addValue`. This shift means these numeric value columns are now done in the per field section that happens after serializing the nested column raw data, so only a single compression buffer and temp file will be needed at a time instead of the total number of nested literal fields present in the column. This should be especially helpful for complicated nested structures with thousands of columns as even those 64k compression buffers can add up pretty quickly to a sizeable chunk of direct memory. --- .../segment/nested/DictionaryIdLookup.java | 158 +++++++++++------- ...balDictionaryEncodedFieldColumnWriter.java | 10 +- .../nested/ScalarDoubleFieldColumnWriter.java | 30 ++-- .../nested/ScalarLongFieldColumnWriter.java | 32 ++-- 4 files changed, 133 insertions(+), 97 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/segment/nested/DictionaryIdLookup.java b/processing/src/main/java/org/apache/druid/segment/nested/DictionaryIdLookup.java index f4176db220c..6827497f7a6 100644 --- a/processing/src/main/java/org/apache/druid/segment/nested/DictionaryIdLookup.java +++ b/processing/src/main/java/org/apache/druid/segment/nested/DictionaryIdLookup.java @@ -99,42 +99,27 @@ public final class DictionaryIdLookup implements Closeable this.arrayDictionaryWriter = arrayDictionaryWriter; } + @Nullable + public Object getDictionaryValue(int id) + { + ensureStringDictionaryLoaded(); + ensureLongDictionaryLoaded(); + ensureDoubleDictionaryLoaded(); + ensureArrayDictionaryLoaded(); + if (id < longOffset()) { + return StringUtils.fromUtf8Nullable(stringDictionary.get(id)); + } else if (id < doubleOffset()) { + return longDictionary.get(id - longOffset()); + } else if (id < arrayOffset()) { + return doubleDictionary.get(id - doubleOffset()); + } else { + return arrayDictionary.get(id - arrayOffset()); + } + } + public int lookupString(@Nullable String value) { - if (stringDictionary == null) { - // GenericIndexed v2 can write to multiple files if the dictionary is larger than 2gb, so we use a smooshfile - // for strings because of this. if other type dictionary writers could potentially use multiple internal files - // in the future, we should transition them to using this approach as well (or build a combination smoosher and - // mapper so that we can have a mutable smoosh) - File stringSmoosh = FileUtils.createTempDirInLocation(tempBasePath, StringUtils.urlEncode(name) + "__stringTempSmoosh"); - stringDictionaryFile = stringSmoosh.toPath(); - final String fileName = NestedCommonFormatColumnSerializer.getInternalFileName( - name, - NestedCommonFormatColumnSerializer.STRING_DICTIONARY_FILE_NAME - ); - - try ( - final FileSmoosher smoosher = new FileSmoosher(stringSmoosh); - final SmooshedWriter writer = smoosher.addWithSmooshedWriter( - fileName, - stringDictionaryWriter.getSerializedSize() - ) - ) { - stringDictionaryWriter.writeTo(writer, smoosher); - writer.close(); - smoosher.close(); - stringBufferMapper = SmooshedFileMapper.load(stringSmoosh); - final ByteBuffer stringBuffer = stringBufferMapper.mapFile(fileName); - stringDictionary = StringEncodingStrategies.getStringDictionarySupplier( - stringBufferMapper, - stringBuffer, - ByteOrder.nativeOrder() - ).get(); - } - catch (IOException e) { - throw new RuntimeException(e); - } - } + ensureStringDictionaryLoaded(); final byte[] bytes = StringUtils.toUtf8Nullable(value); final int index = stringDictionary.indexOf(bytes == null ? null : ByteBuffer.wrap(bytes)); if (index < 0) { @@ -145,13 +130,7 @@ public final class DictionaryIdLookup implements Closeable public int lookupLong(@Nullable Long value) { - if (longDictionary == null) { - longDictionaryFile = makeTempFile(name + NestedCommonFormatColumnSerializer.LONG_DICTIONARY_FILE_NAME); - longBuffer = mapWriter(longDictionaryFile, longDictionaryWriter); - longDictionary = FixedIndexed.read(longBuffer, TypeStrategies.LONG, ByteOrder.nativeOrder(), Long.BYTES).get(); - // reset position - longBuffer.position(0); - } + ensureLongDictionaryLoaded(); final int index = longDictionary.indexOf(value); if (index < 0) { throw DruidException.defensive("Value not found in column[%s] long dictionary", name); @@ -161,18 +140,7 @@ public final class DictionaryIdLookup implements Closeable public int lookupDouble(@Nullable Double value) { - if (doubleDictionary == null) { - doubleDictionaryFile = makeTempFile(name + NestedCommonFormatColumnSerializer.DOUBLE_DICTIONARY_FILE_NAME); - doubleBuffer = mapWriter(doubleDictionaryFile, doubleDictionaryWriter); - doubleDictionary = FixedIndexed.read( - doubleBuffer, - TypeStrategies.DOUBLE, - ByteOrder.nativeOrder(), - Double.BYTES - ).get(); - // reset position - doubleBuffer.position(0); - } + ensureDoubleDictionaryLoaded(); final int index = doubleDictionary.indexOf(value); if (index < 0) { throw DruidException.defensive("Value not found in column[%s] double dictionary", name); @@ -182,13 +150,7 @@ public final class DictionaryIdLookup implements Closeable public int lookupArray(@Nullable int[] value) { - if (arrayDictionary == null) { - arrayDictionaryFile = makeTempFile(name + NestedCommonFormatColumnSerializer.ARRAY_DICTIONARY_FILE_NAME); - arrayBuffer = mapWriter(arrayDictionaryFile, arrayDictionaryWriter); - arrayDictionary = FrontCodedIntArrayIndexed.read(arrayBuffer, ByteOrder.nativeOrder()).get(); - // reset position - arrayBuffer.position(0); - } + ensureArrayDictionaryLoaded(); final int index = arrayDictionary.indexOf(value); if (index < 0) { throw DruidException.defensive("Value not found in column[%s] array dictionary", name); @@ -256,6 +218,82 @@ public final class DictionaryIdLookup implements Closeable return doubleOffset() + (doubleDictionaryWriter != null ? doubleDictionaryWriter.getCardinality() : 0); } + private void ensureStringDictionaryLoaded() + { + if (stringDictionary == null) { + // GenericIndexed v2 can write to multiple files if the dictionary is larger than 2gb, so we use a smooshfile + // for strings because of this. if other type dictionary writers could potentially use multiple internal files + // in the future, we should transition them to using this approach as well (or build a combination smoosher and + // mapper so that we can have a mutable smoosh) + File stringSmoosh = FileUtils.createTempDirInLocation(tempBasePath, StringUtils.urlEncode(name) + "__stringTempSmoosh"); + stringDictionaryFile = stringSmoosh.toPath(); + final String fileName = NestedCommonFormatColumnSerializer.getInternalFileName( + name, + NestedCommonFormatColumnSerializer.STRING_DICTIONARY_FILE_NAME + ); + + try ( + final FileSmoosher smoosher = new FileSmoosher(stringSmoosh); + final SmooshedWriter writer = smoosher.addWithSmooshedWriter( + fileName, + stringDictionaryWriter.getSerializedSize() + ) + ) { + stringDictionaryWriter.writeTo(writer, smoosher); + writer.close(); + smoosher.close(); + stringBufferMapper = SmooshedFileMapper.load(stringSmoosh); + final ByteBuffer stringBuffer = stringBufferMapper.mapFile(fileName); + stringDictionary = StringEncodingStrategies.getStringDictionarySupplier( + stringBufferMapper, + stringBuffer, + ByteOrder.nativeOrder() + ).get(); + } + catch (IOException e) { + throw new RuntimeException(e); + } + } + } + + private void ensureLongDictionaryLoaded() + { + if (longDictionary == null) { + longDictionaryFile = makeTempFile(name + NestedCommonFormatColumnSerializer.LONG_DICTIONARY_FILE_NAME); + longBuffer = mapWriter(longDictionaryFile, longDictionaryWriter); + longDictionary = FixedIndexed.read(longBuffer, TypeStrategies.LONG, ByteOrder.nativeOrder(), Long.BYTES).get(); + // reset position + longBuffer.position(0); + } + } + + private void ensureDoubleDictionaryLoaded() + { + if (doubleDictionary == null) { + doubleDictionaryFile = makeTempFile(name + NestedCommonFormatColumnSerializer.DOUBLE_DICTIONARY_FILE_NAME); + doubleBuffer = mapWriter(doubleDictionaryFile, doubleDictionaryWriter); + doubleDictionary = FixedIndexed.read( + doubleBuffer, + TypeStrategies.DOUBLE, + ByteOrder.nativeOrder(), + Double.BYTES + ).get(); + // reset position + doubleBuffer.position(0); + } + } + + private void ensureArrayDictionaryLoaded() + { + if (arrayDictionary == null && arrayDictionaryWriter != null) { + arrayDictionaryFile = makeTempFile(name + NestedCommonFormatColumnSerializer.ARRAY_DICTIONARY_FILE_NAME); + arrayBuffer = mapWriter(arrayDictionaryFile, arrayDictionaryWriter); + arrayDictionary = FrontCodedIntArrayIndexed.read(arrayBuffer, ByteOrder.nativeOrder()).get(); + // reset position + arrayBuffer.position(0); + } + } + private Path makeTempFile(String name) { try { diff --git a/processing/src/main/java/org/apache/druid/segment/nested/GlobalDictionaryEncodedFieldColumnWriter.java b/processing/src/main/java/org/apache/druid/segment/nested/GlobalDictionaryEncodedFieldColumnWriter.java index aa6a71ae754..d9f00bb2321 100644 --- a/processing/src/main/java/org/apache/druid/segment/nested/GlobalDictionaryEncodedFieldColumnWriter.java +++ b/processing/src/main/java/org/apache/druid/segment/nested/GlobalDictionaryEncodedFieldColumnWriter.java @@ -117,8 +117,8 @@ public abstract class GlobalDictionaryEncodedFieldColumnWriter } /** - * Hook to allow implementors the chance to do additional operations during {@link #addValue(int, Object)}, such as - * writing an additional value column + * Hook to allow implementors the chance to do additional operations during {@link #writeTo(int, FileSmoosher)}, such + * as writing an additional value column */ void writeValue(@Nullable T value) throws IOException { @@ -159,7 +159,6 @@ public abstract class GlobalDictionaryEncodedFieldColumnWriter localId = localDictionary.add(globalId); } intermediateValueWriter.write(localId); - writeValue(value); cursorPosition++; } @@ -168,11 +167,9 @@ public abstract class GlobalDictionaryEncodedFieldColumnWriter */ private void fillNull(int row) throws IOException { - final T value = processValue(row, null); final int localId = localDictionary.add(0); while (cursorPosition < row) { intermediateValueWriter.write(localId); - writeValue(value); cursorPosition++; } } @@ -252,6 +249,7 @@ public abstract class GlobalDictionaryEncodedFieldColumnWriter final int unsortedLocalId = rows.nextInt(); final int sortedLocalId = unsortedToSorted[unsortedLocalId]; encodedValueSerializer.addValue(sortedLocalId); + writeValue((T) globalDictionaryIdLookup.getDictionaryValue(unsortedToGlobal[unsortedLocalId])); bitmaps[sortedLocalId].add(rowCount++); } @@ -307,7 +305,7 @@ public abstract class GlobalDictionaryEncodedFieldColumnWriter } } - private void openColumnSerializer(SegmentWriteOutMedium medium, int maxId) throws IOException + public void openColumnSerializer(SegmentWriteOutMedium medium, int maxId) throws IOException { if (indexSpec.getDimensionCompression() != CompressionStrategy.UNCOMPRESSED) { this.version = DictionaryEncodedColumnPartSerde.VERSION.COMPRESSED; diff --git a/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleFieldColumnWriter.java b/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleFieldColumnWriter.java index 8ccd528715b..09e8dc121c8 100644 --- a/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleFieldColumnWriter.java +++ b/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleFieldColumnWriter.java @@ -58,21 +58,6 @@ public final class ScalarDoubleFieldColumnWriter extends GlobalDictionaryEncoded return globalDictionaryIdLookup.lookupDouble(value); } - @Override - public void open() throws IOException - { - super.open(); - doublesSerializer = CompressionFactory.getDoubleSerializer( - fieldName, - segmentWriteOutMedium, - StringUtils.format("%s.double_column", fieldName), - ByteOrder.nativeOrder(), - indexSpec.getDimensionCompression(), - fieldResourceCloser - ); - doublesSerializer.open(); - } - @Override void writeValue(@Nullable Double value) throws IOException { @@ -83,6 +68,21 @@ public final class ScalarDoubleFieldColumnWriter extends GlobalDictionaryEncoded } } + @Override + public void openColumnSerializer(SegmentWriteOutMedium medium, int maxId) throws IOException + { + super.openColumnSerializer(medium, maxId); + doublesSerializer = CompressionFactory.getDoubleSerializer( + fieldName, + medium, + StringUtils.format("%s.double_column", fieldName), + ByteOrder.nativeOrder(), + indexSpec.getDimensionCompression(), + fieldResourceCloser + ); + doublesSerializer.open(); + } + @Override void writeColumnTo(WritableByteChannel channel, FileSmoosher smoosher) throws IOException { diff --git a/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongFieldColumnWriter.java b/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongFieldColumnWriter.java index 66b5eca18d9..d9191c4e805 100644 --- a/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongFieldColumnWriter.java +++ b/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongFieldColumnWriter.java @@ -58,22 +58,6 @@ public final class ScalarLongFieldColumnWriter extends GlobalDictionaryEncodedFi return globalDictionaryIdLookup.lookupLong(value); } - @Override - public void open() throws IOException - { - super.open(); - longsSerializer = CompressionFactory.getLongSerializer( - fieldName, - segmentWriteOutMedium, - StringUtils.format("%s.long_column", fieldName), - ByteOrder.nativeOrder(), - indexSpec.getLongEncoding(), - indexSpec.getDimensionCompression(), - fieldResourceCloser - ); - longsSerializer.open(); - } - @Override void writeValue(@Nullable Long value) throws IOException { @@ -84,6 +68,22 @@ public final class ScalarLongFieldColumnWriter extends GlobalDictionaryEncodedFi } } + @Override + public void openColumnSerializer(SegmentWriteOutMedium medium, int maxId) throws IOException + { + super.openColumnSerializer(medium, maxId); + longsSerializer = CompressionFactory.getLongSerializer( + fieldName, + medium, + StringUtils.format("%s.long_column", fieldName), + ByteOrder.nativeOrder(), + indexSpec.getLongEncoding(), + indexSpec.getDimensionCompression(), + fieldResourceCloser + ); + longsSerializer.open(); + } + @Override void writeColumnTo(WritableByteChannel channel, FileSmoosher smoosher) throws IOException {