mirror of https://github.com/apache/druid.git
nested column serializer performance improvement for sparse columns (#13101)
This commit is contained in:
parent
8ce03eb094
commit
a0e0fbe1b3
|
@ -48,12 +48,12 @@ import java.nio.channels.WritableByteChannel;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Base class for writer of global dictionary encoded nested literal columns for {@link NestedDataColumnSerializer}.
|
* Base class for writer of global dictionary encoded nested literal columns for {@link NestedDataColumnSerializer}.
|
||||||
* {@link NestedDataColumnSerializer} while processing the 'raw' nested data will call {@link #addValue(Object)} for
|
* {@link NestedDataColumnSerializer} while processing the 'raw' nested data will call {@link #addValue(int, Object)}
|
||||||
* all literal writers, which for this type of writer entails building a local dictionary to map into to the global
|
* for all literal writers, which for this type of writer entails building a local dictionary to map into to the global
|
||||||
* dictionary ({@link #localDictionary}) and writes this unsorted localId to an intermediate integer column,
|
* dictionary ({@link #localDictionary}) and writes this unsorted localId to an intermediate integer column,
|
||||||
* {@link #intermediateValueWriter}.
|
* {@link #intermediateValueWriter}.
|
||||||
*
|
*
|
||||||
* When processing the 'raw' value column is complete, the {@link #writeTo(FileSmoosher)} method will sort the
|
* When processing the 'raw' value column is complete, the {@link #writeTo(int, FileSmoosher)} method will sort the
|
||||||
* local ids and write them out to a local sorted dictionary, iterate over {@link #intermediateValueWriter} swapping
|
* local ids and write them out to a local sorted dictionary, iterate over {@link #intermediateValueWriter} swapping
|
||||||
* the unsorted local ids with the sorted ids and writing to the compressed id column writer
|
* the unsorted local ids with the sorted ids and writing to the compressed id column writer
|
||||||
* {@link #encodedValueSerializer} building the bitmap indexes along the way.
|
* {@link #encodedValueSerializer} building the bitmap indexes along the way.
|
||||||
|
@ -75,6 +75,8 @@ public abstract class GlobalDictionaryEncodedFieldColumnWriter<T>
|
||||||
protected DictionaryEncodedColumnPartSerde.VERSION version = null;
|
protected DictionaryEncodedColumnPartSerde.VERSION version = null;
|
||||||
protected SingleValueColumnarIntsSerializer encodedValueSerializer;
|
protected SingleValueColumnarIntsSerializer encodedValueSerializer;
|
||||||
|
|
||||||
|
protected int cursorPosition;
|
||||||
|
|
||||||
protected GlobalDictionaryEncodedFieldColumnWriter(
|
protected GlobalDictionaryEncodedFieldColumnWriter(
|
||||||
String columnName,
|
String columnName,
|
||||||
String fieldName,
|
String fieldName,
|
||||||
|
@ -99,7 +101,7 @@ public abstract class GlobalDictionaryEncodedFieldColumnWriter<T>
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Hook to allow implementors the chance to do additional operations during {@link #addValue(Object)}, such as
|
* Hook to allow implementors the chance to do additional operations during {@link #addValue(int, Object)}, such as
|
||||||
* writing an additional value column
|
* writing an additional value column
|
||||||
*/
|
*/
|
||||||
void writeValue(@Nullable T value) throws IOException
|
void writeValue(@Nullable T value) throws IOException
|
||||||
|
@ -113,24 +115,40 @@ public abstract class GlobalDictionaryEncodedFieldColumnWriter<T>
|
||||||
abstract int lookupGlobalId(T value);
|
abstract int lookupGlobalId(T value);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Open the writer so that {@link #addValue(Object)} can be called
|
* Open the writer so that {@link #addValue(int, Object)} can be called
|
||||||
*/
|
*/
|
||||||
public void open() throws IOException
|
public void open() throws IOException
|
||||||
{
|
{
|
||||||
intermediateValueWriter = new FixedIndexedIntWriter(segmentWriteOutMedium, false);
|
intermediateValueWriter = new FixedIndexedIntWriter(segmentWriteOutMedium, false);
|
||||||
intermediateValueWriter.open();
|
intermediateValueWriter.open();
|
||||||
|
cursorPosition = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Add a value to the unsorted local dictionary and write to an intermediate column
|
* Add a value to the unsorted local dictionary and write to an intermediate column
|
||||||
*/
|
*/
|
||||||
public void addValue(Object val) throws IOException
|
public void addValue(int row, Object val) throws IOException
|
||||||
{
|
{
|
||||||
|
if (row > cursorPosition) {
|
||||||
|
fillNull(row);
|
||||||
|
}
|
||||||
final T value = processValue(val);
|
final T value = processValue(val);
|
||||||
final int globalId = lookupGlobalId(value);
|
final int globalId = lookupGlobalId(value);
|
||||||
final int localId = localDictionary.add(globalId);
|
final int localId = localDictionary.add(globalId);
|
||||||
intermediateValueWriter.write(localId);
|
intermediateValueWriter.write(localId);
|
||||||
writeValue(value);
|
writeValue(value);
|
||||||
|
cursorPosition++;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void fillNull(int row) throws IOException
|
||||||
|
{
|
||||||
|
final T value = processValue(null);
|
||||||
|
final int localId = localDictionary.add(0);
|
||||||
|
while (cursorPosition < row) {
|
||||||
|
intermediateValueWriter.write(localId);
|
||||||
|
writeValue(value);
|
||||||
|
cursorPosition++;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -148,8 +166,11 @@ public abstract class GlobalDictionaryEncodedFieldColumnWriter<T>
|
||||||
*/
|
*/
|
||||||
abstract void writeColumnTo(WritableByteChannel channel, FileSmoosher smoosher) throws IOException;
|
abstract void writeColumnTo(WritableByteChannel channel, FileSmoosher smoosher) throws IOException;
|
||||||
|
|
||||||
public void writeTo(FileSmoosher smoosher) throws IOException
|
public void writeTo(int finalRowCount, FileSmoosher smoosher) throws IOException
|
||||||
{
|
{
|
||||||
|
if (finalRowCount > cursorPosition) {
|
||||||
|
fillNull(finalRowCount);
|
||||||
|
}
|
||||||
// use a child writeout medium so that we can close them when we are finished and don't leave temporary files
|
// use a child writeout medium so that we can close them when we are finished and don't leave temporary files
|
||||||
// hanging out until the entire segment is done
|
// hanging out until the entire segment is done
|
||||||
final SegmentWriteOutMedium tmpWriteoutMedium = segmentWriteOutMedium.makeChildWriteOutMedium();
|
final SegmentWriteOutMedium tmpWriteoutMedium = segmentWriteOutMedium.makeChildWriteOutMedium();
|
||||||
|
|
|
@ -55,7 +55,6 @@ import java.nio.ByteBuffer;
|
||||||
import java.nio.ByteOrder;
|
import java.nio.ByteOrder;
|
||||||
import java.nio.channels.WritableByteChannel;
|
import java.nio.channels.WritableByteChannel;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
|
||||||
import java.util.SortedMap;
|
import java.util.SortedMap;
|
||||||
|
|
||||||
public class NestedDataColumnSerializer implements GenericColumnSerializer<StructuredData>
|
public class NestedDataColumnSerializer implements GenericColumnSerializer<StructuredData>
|
||||||
|
@ -83,7 +82,7 @@ public class NestedDataColumnSerializer implements GenericColumnSerializer<Struc
|
||||||
if (writer != null) {
|
if (writer != null) {
|
||||||
try {
|
try {
|
||||||
ExprEval<?> eval = ExprEval.bestEffortOf(fieldValue);
|
ExprEval<?> eval = ExprEval.bestEffortOf(fieldValue);
|
||||||
writer.addValue(eval.value());
|
writer.addValue(rowCount, eval.value());
|
||||||
// serializer doesn't use size estimate
|
// serializer doesn't use size estimate
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -266,17 +265,7 @@ public class NestedDataColumnSerializer implements GenericColumnSerializer<Struc
|
||||||
}
|
}
|
||||||
rawWriter.addValue(NestedDataComplexTypeSerde.INSTANCE.toBytes(data));
|
rawWriter.addValue(NestedDataComplexTypeSerde.INSTANCE.toBytes(data));
|
||||||
if (data != null) {
|
if (data != null) {
|
||||||
StructuredDataProcessor.ProcessResults processed = fieldProcessor.processFields(data.getValue());
|
fieldProcessor.processFields(data.getValue());
|
||||||
Set<String> set = processed.getLiteralFields();
|
|
||||||
for (String field : fields.keySet()) {
|
|
||||||
if (!set.contains(field)) {
|
|
||||||
fieldWriters.get(field).addValue(null);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
for (String field : fields.keySet()) {
|
|
||||||
fieldWriters.get(field).addValue(null);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
rowCount++;
|
rowCount++;
|
||||||
}
|
}
|
||||||
|
@ -349,7 +338,7 @@ public class NestedDataColumnSerializer implements GenericColumnSerializer<Struc
|
||||||
for (Map.Entry<String, NestedLiteralTypeInfo.MutableTypeSet> field : fields.entrySet()) {
|
for (Map.Entry<String, NestedLiteralTypeInfo.MutableTypeSet> field : fields.entrySet()) {
|
||||||
// remove writer so that it can be collected when we are done with it
|
// remove writer so that it can be collected when we are done with it
|
||||||
GlobalDictionaryEncodedFieldColumnWriter<?> writer = fieldWriters.remove(field.getKey());
|
GlobalDictionaryEncodedFieldColumnWriter<?> writer = fieldWriters.remove(field.getKey());
|
||||||
writer.writeTo(smoosher);
|
writer.writeTo(rowCount, smoosher);
|
||||||
}
|
}
|
||||||
log.info("Column [%s] serialized successfully with [%d] nested columns.", name, fields.size());
|
log.info("Column [%s] serialized successfully with [%d] nested columns.", name, fields.size());
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue