move long and double nested field serialization to later phase of serialization (#16769)

changes:
* moves value column serializer initialization, call to `writeValue` method to `GlobalDictionaryEncodedFieldColumnWriter.writeTo` instead of during `GlobalDictionaryEncodedFieldColumnWriter.addValue`. This shift means these numeric value columns are now done in the per field section that happens after serializing the nested column raw data, so only a single compression buffer and temp file will be needed at a time instead of the total number of nested literal fields present in the column. This should be especially helpful for complicated nested structures with thousands of columns as even those 64k compression buffers can add up pretty quickly to a sizeable chunk of direct memory.
This commit is contained in:
Clint Wylie 2024-07-22 21:14:30 -07:00 committed by GitHub
parent 934c10b1cd
commit b645d09c5d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 133 additions and 97 deletions

View File

@ -99,42 +99,27 @@ public final class DictionaryIdLookup implements Closeable
this.arrayDictionaryWriter = arrayDictionaryWriter;
}
@Nullable
public Object getDictionaryValue(int id)
{
ensureStringDictionaryLoaded();
ensureLongDictionaryLoaded();
ensureDoubleDictionaryLoaded();
ensureArrayDictionaryLoaded();
if (id < longOffset()) {
return StringUtils.fromUtf8Nullable(stringDictionary.get(id));
} else if (id < doubleOffset()) {
return longDictionary.get(id - longOffset());
} else if (id < arrayOffset()) {
return doubleDictionary.get(id - doubleOffset());
} else {
return arrayDictionary.get(id - arrayOffset());
}
}
public int lookupString(@Nullable String value)
{
if (stringDictionary == null) {
// GenericIndexed v2 can write to multiple files if the dictionary is larger than 2gb, so we use a smooshfile
// for strings because of this. if other type dictionary writers could potentially use multiple internal files
// in the future, we should transition them to using this approach as well (or build a combination smoosher and
// mapper so that we can have a mutable smoosh)
File stringSmoosh = FileUtils.createTempDirInLocation(tempBasePath, StringUtils.urlEncode(name) + "__stringTempSmoosh");
stringDictionaryFile = stringSmoosh.toPath();
final String fileName = NestedCommonFormatColumnSerializer.getInternalFileName(
name,
NestedCommonFormatColumnSerializer.STRING_DICTIONARY_FILE_NAME
);
try (
final FileSmoosher smoosher = new FileSmoosher(stringSmoosh);
final SmooshedWriter writer = smoosher.addWithSmooshedWriter(
fileName,
stringDictionaryWriter.getSerializedSize()
)
) {
stringDictionaryWriter.writeTo(writer, smoosher);
writer.close();
smoosher.close();
stringBufferMapper = SmooshedFileMapper.load(stringSmoosh);
final ByteBuffer stringBuffer = stringBufferMapper.mapFile(fileName);
stringDictionary = StringEncodingStrategies.getStringDictionarySupplier(
stringBufferMapper,
stringBuffer,
ByteOrder.nativeOrder()
).get();
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
ensureStringDictionaryLoaded();
final byte[] bytes = StringUtils.toUtf8Nullable(value);
final int index = stringDictionary.indexOf(bytes == null ? null : ByteBuffer.wrap(bytes));
if (index < 0) {
@ -145,13 +130,7 @@ public final class DictionaryIdLookup implements Closeable
public int lookupLong(@Nullable Long value)
{
if (longDictionary == null) {
longDictionaryFile = makeTempFile(name + NestedCommonFormatColumnSerializer.LONG_DICTIONARY_FILE_NAME);
longBuffer = mapWriter(longDictionaryFile, longDictionaryWriter);
longDictionary = FixedIndexed.read(longBuffer, TypeStrategies.LONG, ByteOrder.nativeOrder(), Long.BYTES).get();
// reset position
longBuffer.position(0);
}
ensureLongDictionaryLoaded();
final int index = longDictionary.indexOf(value);
if (index < 0) {
throw DruidException.defensive("Value not found in column[%s] long dictionary", name);
@ -161,18 +140,7 @@ public final class DictionaryIdLookup implements Closeable
public int lookupDouble(@Nullable Double value)
{
if (doubleDictionary == null) {
doubleDictionaryFile = makeTempFile(name + NestedCommonFormatColumnSerializer.DOUBLE_DICTIONARY_FILE_NAME);
doubleBuffer = mapWriter(doubleDictionaryFile, doubleDictionaryWriter);
doubleDictionary = FixedIndexed.read(
doubleBuffer,
TypeStrategies.DOUBLE,
ByteOrder.nativeOrder(),
Double.BYTES
).get();
// reset position
doubleBuffer.position(0);
}
ensureDoubleDictionaryLoaded();
final int index = doubleDictionary.indexOf(value);
if (index < 0) {
throw DruidException.defensive("Value not found in column[%s] double dictionary", name);
@ -182,13 +150,7 @@ public final class DictionaryIdLookup implements Closeable
public int lookupArray(@Nullable int[] value)
{
if (arrayDictionary == null) {
arrayDictionaryFile = makeTempFile(name + NestedCommonFormatColumnSerializer.ARRAY_DICTIONARY_FILE_NAME);
arrayBuffer = mapWriter(arrayDictionaryFile, arrayDictionaryWriter);
arrayDictionary = FrontCodedIntArrayIndexed.read(arrayBuffer, ByteOrder.nativeOrder()).get();
// reset position
arrayBuffer.position(0);
}
ensureArrayDictionaryLoaded();
final int index = arrayDictionary.indexOf(value);
if (index < 0) {
throw DruidException.defensive("Value not found in column[%s] array dictionary", name);
@ -256,6 +218,82 @@ public final class DictionaryIdLookup implements Closeable
return doubleOffset() + (doubleDictionaryWriter != null ? doubleDictionaryWriter.getCardinality() : 0);
}
private void ensureStringDictionaryLoaded()
{
if (stringDictionary == null) {
// GenericIndexed v2 can write to multiple files if the dictionary is larger than 2gb, so we use a smooshfile
// for strings because of this. if other type dictionary writers could potentially use multiple internal files
// in the future, we should transition them to using this approach as well (or build a combination smoosher and
// mapper so that we can have a mutable smoosh)
File stringSmoosh = FileUtils.createTempDirInLocation(tempBasePath, StringUtils.urlEncode(name) + "__stringTempSmoosh");
stringDictionaryFile = stringSmoosh.toPath();
final String fileName = NestedCommonFormatColumnSerializer.getInternalFileName(
name,
NestedCommonFormatColumnSerializer.STRING_DICTIONARY_FILE_NAME
);
try (
final FileSmoosher smoosher = new FileSmoosher(stringSmoosh);
final SmooshedWriter writer = smoosher.addWithSmooshedWriter(
fileName,
stringDictionaryWriter.getSerializedSize()
)
) {
stringDictionaryWriter.writeTo(writer, smoosher);
writer.close();
smoosher.close();
stringBufferMapper = SmooshedFileMapper.load(stringSmoosh);
final ByteBuffer stringBuffer = stringBufferMapper.mapFile(fileName);
stringDictionary = StringEncodingStrategies.getStringDictionarySupplier(
stringBufferMapper,
stringBuffer,
ByteOrder.nativeOrder()
).get();
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
}
private void ensureLongDictionaryLoaded()
{
if (longDictionary == null) {
longDictionaryFile = makeTempFile(name + NestedCommonFormatColumnSerializer.LONG_DICTIONARY_FILE_NAME);
longBuffer = mapWriter(longDictionaryFile, longDictionaryWriter);
longDictionary = FixedIndexed.read(longBuffer, TypeStrategies.LONG, ByteOrder.nativeOrder(), Long.BYTES).get();
// reset position
longBuffer.position(0);
}
}
private void ensureDoubleDictionaryLoaded()
{
if (doubleDictionary == null) {
doubleDictionaryFile = makeTempFile(name + NestedCommonFormatColumnSerializer.DOUBLE_DICTIONARY_FILE_NAME);
doubleBuffer = mapWriter(doubleDictionaryFile, doubleDictionaryWriter);
doubleDictionary = FixedIndexed.read(
doubleBuffer,
TypeStrategies.DOUBLE,
ByteOrder.nativeOrder(),
Double.BYTES
).get();
// reset position
doubleBuffer.position(0);
}
}
private void ensureArrayDictionaryLoaded()
{
if (arrayDictionary == null && arrayDictionaryWriter != null) {
arrayDictionaryFile = makeTempFile(name + NestedCommonFormatColumnSerializer.ARRAY_DICTIONARY_FILE_NAME);
arrayBuffer = mapWriter(arrayDictionaryFile, arrayDictionaryWriter);
arrayDictionary = FrontCodedIntArrayIndexed.read(arrayBuffer, ByteOrder.nativeOrder()).get();
// reset position
arrayBuffer.position(0);
}
}
private Path makeTempFile(String name)
{
try {

View File

@ -117,8 +117,8 @@ public abstract class GlobalDictionaryEncodedFieldColumnWriter<T>
}
/**
* Hook to allow implementors the chance to do additional operations during {@link #addValue(int, Object)}, such as
* writing an additional value column
* Hook to allow implementors the chance to do additional operations during {@link #writeTo(int, FileSmoosher)}, such
* as writing an additional value column
*/
void writeValue(@Nullable T value) throws IOException
{
@ -159,7 +159,6 @@ public abstract class GlobalDictionaryEncodedFieldColumnWriter<T>
localId = localDictionary.add(globalId);
}
intermediateValueWriter.write(localId);
writeValue(value);
cursorPosition++;
}
@ -168,11 +167,9 @@ public abstract class GlobalDictionaryEncodedFieldColumnWriter<T>
*/
private void fillNull(int row) throws IOException
{
final T value = processValue(row, null);
final int localId = localDictionary.add(0);
while (cursorPosition < row) {
intermediateValueWriter.write(localId);
writeValue(value);
cursorPosition++;
}
}
@ -252,6 +249,7 @@ public abstract class GlobalDictionaryEncodedFieldColumnWriter<T>
final int unsortedLocalId = rows.nextInt();
final int sortedLocalId = unsortedToSorted[unsortedLocalId];
encodedValueSerializer.addValue(sortedLocalId);
writeValue((T) globalDictionaryIdLookup.getDictionaryValue(unsortedToGlobal[unsortedLocalId]));
bitmaps[sortedLocalId].add(rowCount++);
}
@ -307,7 +305,7 @@ public abstract class GlobalDictionaryEncodedFieldColumnWriter<T>
}
}
private void openColumnSerializer(SegmentWriteOutMedium medium, int maxId) throws IOException
public void openColumnSerializer(SegmentWriteOutMedium medium, int maxId) throws IOException
{
if (indexSpec.getDimensionCompression() != CompressionStrategy.UNCOMPRESSED) {
this.version = DictionaryEncodedColumnPartSerde.VERSION.COMPRESSED;

View File

@ -58,21 +58,6 @@ public final class ScalarDoubleFieldColumnWriter extends GlobalDictionaryEncoded
return globalDictionaryIdLookup.lookupDouble(value);
}
@Override
public void open() throws IOException
{
super.open();
doublesSerializer = CompressionFactory.getDoubleSerializer(
fieldName,
segmentWriteOutMedium,
StringUtils.format("%s.double_column", fieldName),
ByteOrder.nativeOrder(),
indexSpec.getDimensionCompression(),
fieldResourceCloser
);
doublesSerializer.open();
}
@Override
void writeValue(@Nullable Double value) throws IOException
{
@ -83,6 +68,21 @@ public final class ScalarDoubleFieldColumnWriter extends GlobalDictionaryEncoded
}
}
@Override
public void openColumnSerializer(SegmentWriteOutMedium medium, int maxId) throws IOException
{
super.openColumnSerializer(medium, maxId);
doublesSerializer = CompressionFactory.getDoubleSerializer(
fieldName,
medium,
StringUtils.format("%s.double_column", fieldName),
ByteOrder.nativeOrder(),
indexSpec.getDimensionCompression(),
fieldResourceCloser
);
doublesSerializer.open();
}
@Override
void writeColumnTo(WritableByteChannel channel, FileSmoosher smoosher) throws IOException
{

View File

@ -58,22 +58,6 @@ public final class ScalarLongFieldColumnWriter extends GlobalDictionaryEncodedFi
return globalDictionaryIdLookup.lookupLong(value);
}
@Override
public void open() throws IOException
{
super.open();
longsSerializer = CompressionFactory.getLongSerializer(
fieldName,
segmentWriteOutMedium,
StringUtils.format("%s.long_column", fieldName),
ByteOrder.nativeOrder(),
indexSpec.getLongEncoding(),
indexSpec.getDimensionCompression(),
fieldResourceCloser
);
longsSerializer.open();
}
@Override
void writeValue(@Nullable Long value) throws IOException
{
@ -84,6 +68,22 @@ public final class ScalarLongFieldColumnWriter extends GlobalDictionaryEncodedFi
}
}
@Override
public void openColumnSerializer(SegmentWriteOutMedium medium, int maxId) throws IOException
{
super.openColumnSerializer(medium, maxId);
longsSerializer = CompressionFactory.getLongSerializer(
fieldName,
medium,
StringUtils.format("%s.long_column", fieldName),
ByteOrder.nativeOrder(),
indexSpec.getLongEncoding(),
indexSpec.getDimensionCompression(),
fieldResourceCloser
);
longsSerializer.open();
}
@Override
void writeColumnTo(WritableByteChannel channel, FileSmoosher smoosher) throws IOException
{