diff --git a/processing/src/main/java/org/apache/druid/segment/nested/GlobalDictionaryEncodedFieldColumnWriter.java b/processing/src/main/java/org/apache/druid/segment/nested/GlobalDictionaryEncodedFieldColumnWriter.java index ed30c84b792..a6d40471cd7 100644 --- a/processing/src/main/java/org/apache/druid/segment/nested/GlobalDictionaryEncodedFieldColumnWriter.java +++ b/processing/src/main/java/org/apache/druid/segment/nested/GlobalDictionaryEncodedFieldColumnWriter.java @@ -48,12 +48,12 @@ import java.nio.channels.WritableByteChannel; /** * Base class for writer of global dictionary encoded nested literal columns for {@link NestedDataColumnSerializer}. - * {@link NestedDataColumnSerializer} while processing the 'raw' nested data will call {@link #addValue(Object)} for - * all literal writers, which for this type of writer entails building a local dictionary to map into to the global + * {@link NestedDataColumnSerializer} while processing the 'raw' nested data will call {@link #addValue(int, Object)} + * for all literal writers, which for this type of writer entails building a local dictionary to map into to the global * dictionary ({@link #localDictionary}) and writes this unsorted localId to an intermediate integer column, * {@link #intermediateValueWriter}. * - * When processing the 'raw' value column is complete, the {@link #writeTo(FileSmoosher)} method will sort the + * When processing the 'raw' value column is complete, the {@link #writeTo(int, FileSmoosher)} method will sort the * local ids and write them out to a local sorted dictionary, iterate over {@link #intermediateValueWriter} swapping * the unsorted local ids with the sorted ids and writing to the compressed id column writer * {@link #encodedValueSerializer} building the bitmap indexes along the way. @@ -75,6 +75,8 @@ public abstract class GlobalDictionaryEncodedFieldColumnWriter protected DictionaryEncodedColumnPartSerde.VERSION version = null; protected SingleValueColumnarIntsSerializer encodedValueSerializer; + protected int cursorPosition; + protected GlobalDictionaryEncodedFieldColumnWriter( String columnName, String fieldName, @@ -99,7 +101,7 @@ public abstract class GlobalDictionaryEncodedFieldColumnWriter } /** - * Hook to allow implementors the chance to do additional operations during {@link #addValue(Object)}, such as + * Hook to allow implementors the chance to do additional operations during {@link #addValue(int, Object)}, such as * writing an additional value column */ void writeValue(@Nullable T value) throws IOException @@ -113,24 +115,40 @@ public abstract class GlobalDictionaryEncodedFieldColumnWriter abstract int lookupGlobalId(T value); /** - * Open the writer so that {@link #addValue(Object)} can be called + * Open the writer so that {@link #addValue(int, Object)} can be called */ public void open() throws IOException { intermediateValueWriter = new FixedIndexedIntWriter(segmentWriteOutMedium, false); intermediateValueWriter.open(); + cursorPosition = 0; } /** * Add a value to the unsorted local dictionary and write to an intermediate column */ - public void addValue(Object val) throws IOException + public void addValue(int row, Object val) throws IOException { + if (row > cursorPosition) { + fillNull(row); + } final T value = processValue(val); final int globalId = lookupGlobalId(value); final int localId = localDictionary.add(globalId); intermediateValueWriter.write(localId); writeValue(value); + cursorPosition++; + } + + private void fillNull(int row) throws IOException + { + final T value = processValue(null); + final int localId = localDictionary.add(0); + while (cursorPosition < row) { + intermediateValueWriter.write(localId); + writeValue(value); + cursorPosition++; + } } @@ -148,8 +166,11 @@ public abstract class GlobalDictionaryEncodedFieldColumnWriter */ abstract void writeColumnTo(WritableByteChannel channel, FileSmoosher smoosher) throws IOException; - public void writeTo(FileSmoosher smoosher) throws IOException + public void writeTo(int finalRowCount, FileSmoosher smoosher) throws IOException { + if (finalRowCount > cursorPosition) { + fillNull(finalRowCount); + } // use a child writeout medium so that we can close them when we are finished and don't leave temporary files // hanging out until the entire segment is done final SegmentWriteOutMedium tmpWriteoutMedium = segmentWriteOutMedium.makeChildWriteOutMedium(); diff --git a/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializer.java b/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializer.java index 485abf14c35..521740aadac 100644 --- a/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializer.java +++ b/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializer.java @@ -55,7 +55,6 @@ import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.nio.channels.WritableByteChannel; import java.util.Map; -import java.util.Set; import java.util.SortedMap; public class NestedDataColumnSerializer implements GenericColumnSerializer @@ -83,7 +82,7 @@ public class NestedDataColumnSerializer implements GenericColumnSerializer eval = ExprEval.bestEffortOf(fieldValue); - writer.addValue(eval.value()); + writer.addValue(rowCount, eval.value()); // serializer doesn't use size estimate return 0; } @@ -266,17 +265,7 @@ public class NestedDataColumnSerializer implements GenericColumnSerializer set = processed.getLiteralFields(); - for (String field : fields.keySet()) { - if (!set.contains(field)) { - fieldWriters.get(field).addValue(null); - } - } - } else { - for (String field : fields.keySet()) { - fieldWriters.get(field).addValue(null); - } + fieldProcessor.processFields(data.getValue()); } rowCount++; } @@ -349,7 +338,7 @@ public class NestedDataColumnSerializer implements GenericColumnSerializer field : fields.entrySet()) { // remove writer so that it can be collected when we are done with it GlobalDictionaryEncodedFieldColumnWriter writer = fieldWriters.remove(field.getKey()); - writer.writeTo(smoosher); + writer.writeTo(rowCount, smoosher); } log.info("Column [%s] serialized successfully with [%d] nested columns.", name, fields.size()); }