diff --git a/processing/src/main/java/org/apache/druid/frame/field/StringFieldReader.java b/processing/src/main/java/org/apache/druid/frame/field/StringFieldReader.java index 1c51a914e0d..be2263be138 100644 --- a/processing/src/main/java/org/apache/druid/frame/field/StringFieldReader.java +++ b/processing/src/main/java/org/apache/druid/frame/field/StringFieldReader.java @@ -480,9 +480,8 @@ public class StringFieldReader implements FieldReader public boolean isNull(int rowNum) { final long fieldPosition = coach.computeFieldPosition(rowNum); - byte[] nullBytes = new byte[3]; - dataRegion.getByteArray(fieldPosition, nullBytes, 0, 3); - return Arrays.equals(nullBytes, EXPECTED_BYTES_FOR_NULL); + return dataRegion.getByte(fieldPosition) == StringFieldWriter.NULL_ROW + && dataRegion.getByte(fieldPosition + 1) == StringFieldWriter.ROW_TERMINATOR; } @Override diff --git a/processing/src/main/java/org/apache/druid/frame/read/columnar/FrameColumnReaders.java b/processing/src/main/java/org/apache/druid/frame/read/columnar/FrameColumnReaders.java index 9b4dc85cb1e..6ceb6b72917 100644 --- a/processing/src/main/java/org/apache/druid/frame/read/columnar/FrameColumnReaders.java +++ b/processing/src/main/java/org/apache/druid/frame/read/columnar/FrameColumnReaders.java @@ -51,7 +51,7 @@ public class FrameColumnReaders return new DoubleFrameColumnReader(columnNumber); case STRING: - return new StringFrameColumnReader(columnNumber, false); + return new StringFrameColumnReader(columnNumber); case COMPLEX: return new ComplexFrameColumnReader(columnNumber); @@ -59,7 +59,7 @@ public class FrameColumnReaders case ARRAY: switch (columnType.getElementType().getType()) { case STRING: - return new StringFrameColumnReader(columnNumber, true); + return new StringArrayFrameColumnReader(columnNumber); case LONG: return new LongArrayFrameColumnReader(columnNumber); case FLOAT: diff --git a/processing/src/main/java/org/apache/druid/frame/read/columnar/StringArrayFrameColumnReader.java b/processing/src/main/java/org/apache/druid/frame/read/columnar/StringArrayFrameColumnReader.java new file mode 100644 index 00000000000..31c56bf38e7 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/frame/read/columnar/StringArrayFrameColumnReader.java @@ -0,0 +1,385 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.frame.read.columnar; + +import com.google.common.primitives.Ints; +import it.unimi.dsi.fastutil.objects.ObjectArrays; +import org.apache.datasketches.memory.Memory; +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.error.DruidException; +import org.apache.druid.frame.Frame; +import org.apache.druid.frame.read.FrameReaderUtils; +import org.apache.druid.frame.write.FrameWriterUtils; +import org.apache.druid.frame.write.columnar.FrameColumnWriters; +import org.apache.druid.frame.write.columnar.StringFrameColumnWriter; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; +import org.apache.druid.query.rowsandcols.column.Column; +import org.apache.druid.query.rowsandcols.column.ColumnAccessorBasedColumn; +import org.apache.druid.query.rowsandcols.column.accessor.ObjectColumnAccessorBase; +import org.apache.druid.segment.ColumnValueSelector; +import org.apache.druid.segment.ObjectColumnSelector; +import org.apache.druid.segment.column.BaseColumn; +import org.apache.druid.segment.column.ColumnCapabilitiesImpl; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.data.ReadableOffset; +import org.apache.druid.segment.vector.ReadableVectorInspector; +import org.apache.druid.segment.vector.ReadableVectorOffset; +import org.apache.druid.segment.vector.VectorObjectSelector; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; +import java.util.Comparator; + +/** + * Reader for {@link ColumnType#STRING_ARRAY}. + * This is similar to {@link StringFrameColumnReader} reading mvds in reading bytes from frame + */ +public class StringArrayFrameColumnReader implements FrameColumnReader +{ + private final int columnNumber; + + /** + * Create a new reader. + * + * @param columnNumber column number + */ + StringArrayFrameColumnReader(int columnNumber) + { + this.columnNumber = columnNumber; + } + + @Override + public Column readRACColumn(Frame frame) + { + final Memory memory = frame.region(columnNumber); + validate(memory); + + final long positionOfLengths = getStartOfStringLengthSection(frame.numRows()); + final long positionOfPayloads = getStartOfStringDataSection(memory, frame.numRows()); + + StringArrayFrameColumn frameCol = new StringArrayFrameColumn( + frame, + memory, + positionOfLengths, + positionOfPayloads + ); + + return new ColumnAccessorBasedColumn(frameCol); + } + + @Override + public ColumnPlus readColumn(final Frame frame) + { + final Memory memory = frame.region(columnNumber); + validate(memory); + + final long startOfStringLengthSection = getStartOfStringLengthSection(frame.numRows()); + final long startOfStringDataSection = getStartOfStringDataSection(memory, frame.numRows()); + + final BaseColumn baseColumn = new StringArrayFrameColumn( + frame, + memory, + startOfStringLengthSection, + startOfStringDataSection + ); + + return new ColumnPlus( + baseColumn, + new ColumnCapabilitiesImpl().setType(ColumnType.STRING_ARRAY) + .setHasMultipleValues(false) + .setDictionaryEncoded(false), + frame.numRows() + ); + } + + private void validate(final Memory region) + { + // Check if column is big enough for a header + if (region.getCapacity() < StringFrameColumnWriter.DATA_OFFSET) { + throw DruidException.defensive("Column[%s] is not big enough for a header", columnNumber); + } + + final byte typeCode = region.getByte(0); + if (typeCode != FrameColumnWriters.TYPE_STRING_ARRAY) { + throw DruidException.defensive( + "Column[%s] does not have the correct type code; expected[%s], got[%s]", + columnNumber, + FrameColumnWriters.TYPE_STRING_ARRAY, + typeCode + ); + } + } + + private static long getStartOfCumulativeLengthSection() + { + return StringFrameColumnWriter.DATA_OFFSET; + } + + private static long getStartOfStringLengthSection(final int numRows) + { + return StringFrameColumnWriter.DATA_OFFSET + (long) Integer.BYTES * numRows; + } + + private long getStartOfStringDataSection( + final Memory memory, + final int numRows + ) + { + if (numRows < 0) { + throw DruidException.defensive("Encountered -ve numRows [%s] while reading frame", numRows); + } + final int totalNumValues = FrameColumnReaderUtils.getAdjustedCumulativeRowLength( + memory, + getStartOfCumulativeLengthSection(), + numRows - 1 + ); + + return getStartOfStringLengthSection(numRows) + (long) Integer.BYTES * totalNumValues; + } + + private static class StringArrayFrameColumn extends ObjectColumnAccessorBase implements BaseColumn + { + private final Frame frame; + private final Memory memory; + private final long startOfStringLengthSection; + private final long startOfStringDataSection; + + private StringArrayFrameColumn( + Frame frame, + Memory memory, + long startOfStringLengthSection, + long startOfStringDataSection + ) + { + this.frame = frame; + this.memory = memory; + this.startOfStringLengthSection = startOfStringLengthSection; + this.startOfStringDataSection = startOfStringDataSection; + } + + @Override + public ColumnValueSelector makeColumnValueSelector(ReadableOffset offset) + { + return new ObjectColumnSelector() + { + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + // Do nothing. + } + + @Nullable + @Override + public Object getObject() + { + return getRowAsObject(frame.physicalRow(offset.getOffset()), true); + } + + @Override + public Class classOfObject() + { + return Object[].class; + } + }; + } + + @Override + public VectorObjectSelector makeVectorObjectSelector(final ReadableVectorOffset offset) + { + class StringArrayFrameVectorObjectSelector implements VectorObjectSelector + { + private final Object[] vector = new Object[offset.getMaxVectorSize()]; + private int id = ReadableVectorInspector.NULL_ID; + + @Override + public Object[] getObjectVector() + { + computeVectorIfNeeded(); + return vector; + } + + @Override + public int getMaxVectorSize() + { + return offset.getMaxVectorSize(); + } + + @Override + public int getCurrentVectorSize() + { + return offset.getCurrentVectorSize(); + } + + private void computeVectorIfNeeded() + { + if (id == offset.getId()) { + return; + } + + if (offset.isContiguous()) { + final int start = offset.getStartOffset(); + + for (int i = 0; i < offset.getCurrentVectorSize(); i++) { + final int physicalRow = frame.physicalRow(i + start); + vector[i] = getRowAsObject(physicalRow, true); + } + } else { + final int[] offsets = offset.getOffsets(); + + for (int i = 0; i < offset.getCurrentVectorSize(); i++) { + final int physicalRow = frame.physicalRow(offsets[i]); + vector[i] = getRowAsObject(physicalRow, true); + } + } + + id = offset.getId(); + } + } + + return new StringArrayFrameVectorObjectSelector(); + } + + @Override + public void close() + { + // Do nothing. + } + + @Override + public ColumnType getType() + { + return ColumnType.STRING_ARRAY; + } + + @Override + public int numRows() + { + return frame.numRows(); + } + + @Override + protected Object getVal(int rowNum) + { + return getRowAsObject(frame.physicalRow(rowNum), true); + } + + @Override + protected Comparator getComparator() + { + return Comparator.nullsFirst(ColumnType.STRING_ARRAY.getStrategy()); + } + + /** + * Returns a ByteBuffer containing UTF-8 encoded string number {@code index}. The ByteBuffer is always newly + * created, so it is OK to change its position, limit, etc. However, it may point to shared memory, so it is + * not OK to write to its contents. + */ + @Nullable + private ByteBuffer getStringUtf8(final int index) + { + if (startOfStringLengthSection > Long.MAX_VALUE - (long) Integer.BYTES * index) { + throw DruidException.defensive("length index would overflow trying to read the frame memory!"); + } + + final int dataEndVariableIndex = memory.getInt(startOfStringLengthSection + (long) Integer.BYTES * index); + if (startOfStringDataSection > Long.MAX_VALUE - dataEndVariableIndex) { + throw DruidException.defensive("data end index would overflow trying to read the frame memory!"); + } + + final long dataStart; + final long dataEnd = startOfStringDataSection + dataEndVariableIndex; + + if (index == 0) { + dataStart = startOfStringDataSection; + } else { + final int dataStartVariableIndex = memory.getInt(startOfStringLengthSection + (long) Integer.BYTES * (index + - 1)); + if (startOfStringDataSection > Long.MAX_VALUE - dataStartVariableIndex) { + throw DruidException.defensive("data start index would overflow trying to read the frame memory!"); + } + dataStart = startOfStringDataSection + dataStartVariableIndex; + } + + final int dataLength = Ints.checkedCast(dataEnd - dataStart); + + if ((dataLength == 0 && NullHandling.replaceWithDefault()) || + (dataLength == 1 && memory.getByte(dataStart) == FrameWriterUtils.NULL_STRING_MARKER)) { + return null; + } + + return FrameReaderUtils.readByteBuffer(memory, dataStart, dataLength); + } + + @Nullable + private String getString(final int index) + { + final ByteBuffer stringUtf8 = getStringUtf8(index); + + if (stringUtf8 == null) { + return null; + } else { + return StringUtils.fromUtf8(stringUtf8); + } + } + + /** + * Returns the object at the given physical row number. + * + * @param physicalRow physical row number + * @param decode if true, return java.lang.String. If false, return UTF-8 ByteBuffer. + */ + @Nullable + private Object getRowAsObject(final int physicalRow, final boolean decode) + { + final int cumulativeRowLength = FrameColumnReaderUtils.getCumulativeRowLength( + memory, + getStartOfCumulativeLengthSection(), + physicalRow + ); + final int rowLength; + + if (FrameColumnReaderUtils.isNullRow(cumulativeRowLength)) { + return null; + } else if (physicalRow == 0) { + rowLength = cumulativeRowLength; + } else { + rowLength = cumulativeRowLength - FrameColumnReaderUtils.getAdjustedCumulativeRowLength( + memory, + getStartOfCumulativeLengthSection(), + physicalRow - 1 + ); + } + + if (rowLength == 0) { + return ObjectArrays.EMPTY_ARRAY; + } else { + final Object[] row = new Object[rowLength]; + + for (int i = 0; i < rowLength; i++) { + final int index = cumulativeRowLength - rowLength + i; + row[i] = decode ? getString(index) : getStringUtf8(index); + } + + return row; + } + } + } +} diff --git a/processing/src/main/java/org/apache/druid/frame/read/columnar/StringFrameColumnReader.java b/processing/src/main/java/org/apache/druid/frame/read/columnar/StringFrameColumnReader.java index d9fb9d83a9f..2385c431e5b 100644 --- a/processing/src/main/java/org/apache/druid/frame/read/columnar/StringFrameColumnReader.java +++ b/processing/src/main/java/org/apache/druid/frame/read/columnar/StringFrameColumnReader.java @@ -19,18 +19,16 @@ package org.apache.druid.frame.read.columnar; -import com.google.common.annotations.VisibleForTesting; import com.google.common.primitives.Ints; -import it.unimi.dsi.fastutil.objects.ObjectArrays; import org.apache.datasketches.memory.Memory; import org.apache.druid.common.config.NullHandling; import org.apache.druid.error.DruidException; +import org.apache.druid.error.InvalidInput; import org.apache.druid.frame.Frame; import org.apache.druid.frame.read.FrameReaderUtils; import org.apache.druid.frame.write.FrameWriterUtils; import org.apache.druid.frame.write.columnar.FrameColumnWriters; import org.apache.druid.frame.write.columnar.StringFrameColumnWriter; -import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.query.extraction.ExtractionFn; import org.apache.druid.query.filter.DruidPredicateFactory; @@ -40,13 +38,11 @@ import org.apache.druid.query.rowsandcols.column.Column; import org.apache.druid.query.rowsandcols.column.ColumnAccessorBasedColumn; import org.apache.druid.query.rowsandcols.column.accessor.ObjectColumnAccessorBase; import org.apache.druid.segment.BaseSingleValueDimensionSelector; -import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.segment.DimensionDictionarySelector; import org.apache.druid.segment.DimensionSelector; import org.apache.druid.segment.DimensionSelectorUtils; import org.apache.druid.segment.IdLookup; import org.apache.druid.segment.column.BaseColumn; -import org.apache.druid.segment.column.ColumnCapabilities; import org.apache.druid.segment.column.ColumnCapabilitiesImpl; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.DictionaryEncodedColumn; @@ -67,23 +63,20 @@ import java.util.Comparator; import java.util.List; /** - * Reader for {@link StringFrameColumnWriter}, types {@link ColumnType#STRING} and {@link ColumnType#STRING_ARRAY}. + * Reader for {@link StringFrameColumnWriter}, type {@link ColumnType#STRING}. */ public class StringFrameColumnReader implements FrameColumnReader { private final int columnNumber; - private final boolean asArray; /** * Create a new reader. * * @param columnNumber column number - * @param asArray true for {@link ColumnType#STRING_ARRAY}, false for {@link ColumnType#STRING} */ - StringFrameColumnReader(int columnNumber, boolean asArray) + StringFrameColumnReader(int columnNumber) { this.columnNumber = columnNumber; - this.asArray = asArray; } @Override @@ -92,18 +85,20 @@ public class StringFrameColumnReader implements FrameColumnReader final Memory memory = frame.region(columnNumber); validate(memory); + if (isMultiValue(memory)) { + throw InvalidInput.exception("Encountered a multi value column. Window processing does not support MVDs. " + + "Consider using UNNEST or MV_TO_ARRAY."); + } final long positionOfLengths = getStartOfStringLengthSection(frame.numRows(), false); final long positionOfPayloads = getStartOfStringDataSection(memory, frame.numRows(), false); - StringFrameColumn frameCol = - new StringFrameColumn( - frame, - false, - memory, - positionOfLengths, - positionOfPayloads, - asArray || isMultiValue(memory) // Read MVDs as String arrays - ); + StringFrameColumn frameCol = new StringFrameColumn( + frame, + false, + memory, + positionOfLengths, + positionOfPayloads + ); return new ColumnAccessorBasedColumn(frameCol); } @@ -118,35 +113,19 @@ public class StringFrameColumnReader implements FrameColumnReader final long startOfStringLengthSection = getStartOfStringLengthSection(frame.numRows(), multiValue); final long startOfStringDataSection = getStartOfStringDataSection(memory, frame.numRows(), multiValue); - final BaseColumn baseColumn; - - if (asArray) { - baseColumn = new StringArrayFrameColumn( - frame, - multiValue, - memory, - startOfStringLengthSection, - startOfStringDataSection - ); - } else { - baseColumn = new StringFrameColumn( - frame, - multiValue, - memory, - startOfStringLengthSection, - startOfStringDataSection, - false - ); - } + final BaseColumn baseColumn = new StringFrameColumn( + frame, + multiValue, + memory, + startOfStringLengthSection, + startOfStringDataSection + ); return new ColumnPlus( baseColumn, - new ColumnCapabilitiesImpl().setType(asArray ? ColumnType.STRING_ARRAY : ColumnType.STRING) - .setHasMultipleValues(!asArray && multiValue) - .setDictionaryEncoded(false) - .setHasBitmapIndexes(false) - .setHasSpatialIndexes(false) - .setHasNulls(ColumnCapabilities.Capable.UNKNOWN), + new ColumnCapabilitiesImpl().setType(ColumnType.STRING) + .setHasMultipleValues(multiValue) + .setDictionaryEncoded(false), frame.numRows() ); } @@ -159,12 +138,11 @@ public class StringFrameColumnReader implements FrameColumnReader } final byte typeCode = region.getByte(0); - final byte expectedTypeCode = asArray ? FrameColumnWriters.TYPE_STRING_ARRAY : FrameColumnWriters.TYPE_STRING; - if (typeCode != expectedTypeCode) { + if (typeCode != FrameColumnWriters.TYPE_STRING) { throw DruidException.defensive( "Column[%s] does not have the correct type code; expected[%s], got[%s]", columnNumber, - expectedTypeCode, + FrameColumnWriters.TYPE_STRING, typeCode ); } @@ -172,7 +150,7 @@ public class StringFrameColumnReader implements FrameColumnReader private static boolean isMultiValue(final Memory memory) { - return memory.getByte(1) == 1; + return memory.getByte(StringFrameColumnWriter.MULTI_VALUE_POSITION) == StringFrameColumnWriter.MULTI_VALUE_BYTE; } private static long getStartOfCumulativeLengthSection() @@ -213,8 +191,7 @@ public class StringFrameColumnReader implements FrameColumnReader return getStartOfStringLengthSection(numRows, multiValue) + (long) Integer.BYTES * totalNumValues; } - @VisibleForTesting - static class StringFrameColumn extends ObjectColumnAccessorBase implements DictionaryEncodedColumn + private static class StringFrameColumn extends ObjectColumnAccessorBase implements DictionaryEncodedColumn { private final Frame frame; private final Memory memory; @@ -226,18 +203,12 @@ public class StringFrameColumnReader implements FrameColumnReader */ private final boolean multiValue; - /** - * Whether the column is being read as {@link ColumnType#STRING_ARRAY} (true) or {@link ColumnType#STRING} (false). - */ - private final boolean asArray; - private StringFrameColumn( Frame frame, boolean multiValue, Memory memory, long startOfStringLengthSection, - long startOfStringDataSection, - final boolean asArray + long startOfStringDataSection ) { this.frame = frame; @@ -245,7 +216,6 @@ public class StringFrameColumnReader implements FrameColumnReader this.memory = memory; this.startOfStringLengthSection = startOfStringLengthSection; this.startOfStringDataSection = startOfStringDataSection; - this.asArray = asArray; } @Override @@ -292,247 +262,6 @@ public class StringFrameColumnReader implements FrameColumnReader @Override public DimensionSelector makeDimensionSelector(ReadableOffset offset, @Nullable ExtractionFn extractionFn) - { - if (asArray) { - throw new ISE("Cannot call makeDimensionSelector on field of type [%s]", ColumnType.STRING_ARRAY); - } - - return makeDimensionSelectorInternal(offset, extractionFn); - } - - @Override - public SingleValueDimensionVectorSelector makeSingleValueDimensionVectorSelector(ReadableVectorOffset offset) - { - // Callers should use object selectors, because we have no dictionary. - throw new UnsupportedOperationException(); - } - - @Override - public MultiValueDimensionVectorSelector makeMultiValueDimensionVectorSelector(ReadableVectorOffset vectorOffset) - { - // Callers should use object selectors, because we have no dictionary. - throw new UnsupportedOperationException(); - } - - @Override - public VectorObjectSelector makeVectorObjectSelector(final ReadableVectorOffset offset) - { - class StringFrameVectorObjectSelector implements VectorObjectSelector - { - private final Object[] vector = new Object[offset.getMaxVectorSize()]; - private int id = ReadableVectorInspector.NULL_ID; - - @Override - public Object[] getObjectVector() - { - computeVectorIfNeeded(); - return vector; - } - - @Override - public int getMaxVectorSize() - { - return offset.getMaxVectorSize(); - } - - @Override - public int getCurrentVectorSize() - { - return offset.getCurrentVectorSize(); - } - - private void computeVectorIfNeeded() - { - if (id == offset.getId()) { - return; - } - - if (offset.isContiguous()) { - final int start = offset.getStartOffset(); - - for (int i = 0; i < offset.getCurrentVectorSize(); i++) { - final int physicalRow = frame.physicalRow(i + start); - vector[i] = getRowAsObject(physicalRow, true); - } - } else { - final int[] offsets = offset.getOffsets(); - - for (int i = 0; i < offset.getCurrentVectorSize(); i++) { - final int physicalRow = frame.physicalRow(offsets[i]); - vector[i] = getRowAsObject(physicalRow, true); - } - } - - id = offset.getId(); - } - } - - return new StringFrameVectorObjectSelector(); - } - - @Override - public int length() - { - return frame.numRows(); - } - - @Override - public void close() - { - // Do nothing. - } - - @Override - public ColumnType getType() - { - return asArray ? ColumnType.STRING_ARRAY : ColumnType.STRING; - } - - @Override - public int numRows() - { - return length(); - } - - @Override - protected Object getVal(int rowNum) - { - return getString(frame.physicalRow(rowNum)); - } - - @Override - protected Comparator getComparator() - { - return Comparator.nullsFirst(Comparator.comparing(o -> ((String) o))); - } - - /** - * Returns a ByteBuffer containing UTF-8 encoded string number {@code index}. The ByteBuffer is always newly - * created, so it is OK to change its position, limit, etc. However, it may point to shared memory, so it is - * not OK to write to its contents. - */ - @Nullable - private ByteBuffer getStringUtf8(final int index) - { - final long dataStart; - final long dataEnd = - startOfStringDataSection + - memory.getInt(startOfStringLengthSection + (long) Integer.BYTES * index); - - if (index == 0) { - dataStart = startOfStringDataSection; - } else { - dataStart = - startOfStringDataSection + - memory.getInt(startOfStringLengthSection + (long) Integer.BYTES * (index - 1)); - } - - final int dataLength = Ints.checkedCast(dataEnd - dataStart); - - if ((dataLength == 0 && NullHandling.replaceWithDefault()) || - (dataLength == 1 && memory.getByte(dataStart) == FrameWriterUtils.NULL_STRING_MARKER)) { - return null; - } - - return FrameReaderUtils.readByteBuffer(memory, dataStart, dataLength); - } - - @Nullable - private String getString(final int index) - { - final ByteBuffer stringUtf8 = getStringUtf8(index); - - if (stringUtf8 == null) { - return null; - } else { - return StringUtils.fromUtf8(stringUtf8); - } - } - - /** - * Returns the object at the given physical row number. - * - * When {@link #asArray}, the return value is always of type {@code Object[]}. Otherwise, the return value - * is either an empty list (if the row is empty), a single String (if the row has one value), or a List - * of Strings (if the row has more than one value). - * - * @param physicalRow physical row number - * @param decode if true, return java.lang.String. If false, return UTF-8 ByteBuffer. - */ - @Nullable - private Object getRowAsObject(final int physicalRow, final boolean decode) - { - if (multiValue) { - final int cumulativeRowLength = FrameColumnReaderUtils.getCumulativeRowLength( - memory, - getStartOfCumulativeLengthSection(), - physicalRow - ); - final int rowLength; - - if (FrameColumnReaderUtils.isNullRow(cumulativeRowLength)) { - return null; - } else if (physicalRow == 0) { - rowLength = cumulativeRowLength; - } else { - rowLength = cumulativeRowLength - FrameColumnReaderUtils.getAdjustedCumulativeRowLength( - memory, - getStartOfCumulativeLengthSection(), - physicalRow - 1 - ); - } - - if (rowLength == 0) { - return asArray ? ObjectArrays.EMPTY_ARRAY : Collections.emptyList(); - } else if (rowLength == 1) { - final int index = cumulativeRowLength - 1; - final Object o = decode ? getString(index) : getStringUtf8(index); - return asArray ? new Object[]{o} : o; - } else { - final Object[] row = new Object[rowLength]; - - for (int i = 0; i < rowLength; i++) { - final int index = cumulativeRowLength - rowLength + i; - row[i] = decode ? getString(index) : getStringUtf8(index); - } - - return asArray ? row : Arrays.asList(row); - } - } else { - final Object o = decode ? getString(physicalRow) : getStringUtf8(physicalRow); - return asArray ? new Object[]{o} : o; - } - } - - /** - * Returns the value at the given physical row number as a list of ByteBuffers. Only valid when !asArray, i.e., - * when type is {@link ColumnType#STRING}. - * - * @param physicalRow physical row number - */ - private List getRowAsListUtf8(final int physicalRow) - { - if (asArray) { - throw DruidException.defensive("Unexpected call for array column"); - } - - final Object object = getRowAsObject(physicalRow, false); - - if (object == null) { - return Collections.singletonList(null); - } else if (object instanceof List) { - //noinspection unchecked - return (List) object; - } else { - return Collections.singletonList((ByteBuffer) object); - } - } - - /** - * Selector used by this column. It's versatile: it can run as string array (asArray = true) or regular string - * column (asArray = false). - */ - private DimensionSelector makeDimensionSelectorInternal(ReadableOffset offset, @Nullable ExtractionFn extractionFn) { if (multiValue) { class MultiValueSelector implements DimensionSelector @@ -671,41 +400,224 @@ public class StringFrameColumnReader implements FrameColumnReader return new SingleValueSelector(); } } - } - static class StringArrayFrameColumn implements BaseColumn - { - private final StringFrameColumn delegate; - - private StringArrayFrameColumn( - Frame frame, - boolean multiValue, - Memory memory, - long startOfStringLengthSection, - long startOfStringDataSection - ) + @Override + public SingleValueDimensionVectorSelector makeSingleValueDimensionVectorSelector(ReadableVectorOffset offset) { - this.delegate = new StringFrameColumn( - frame, - multiValue, - memory, - startOfStringLengthSection, - startOfStringDataSection, - true - ); + // Callers should use object selectors, because we have no dictionary. + throw new UnsupportedOperationException(); } @Override - @SuppressWarnings("rawtypes") - public ColumnValueSelector makeColumnValueSelector(ReadableOffset offset) + public MultiValueDimensionVectorSelector makeMultiValueDimensionVectorSelector(ReadableVectorOffset vectorOffset) { - return delegate.makeDimensionSelectorInternal(offset, null); + // Callers should use object selectors, because we have no dictionary. + throw new UnsupportedOperationException(); + } + + @Override + public VectorObjectSelector makeVectorObjectSelector(final ReadableVectorOffset offset) + { + class StringFrameVectorObjectSelector implements VectorObjectSelector + { + private final Object[] vector = new Object[offset.getMaxVectorSize()]; + private int id = ReadableVectorInspector.NULL_ID; + + @Override + public Object[] getObjectVector() + { + computeVectorIfNeeded(); + return vector; + } + + @Override + public int getMaxVectorSize() + { + return offset.getMaxVectorSize(); + } + + @Override + public int getCurrentVectorSize() + { + return offset.getCurrentVectorSize(); + } + + private void computeVectorIfNeeded() + { + if (id == offset.getId()) { + return; + } + + if (offset.isContiguous()) { + final int start = offset.getStartOffset(); + + for (int i = 0; i < offset.getCurrentVectorSize(); i++) { + final int physicalRow = frame.physicalRow(i + start); + vector[i] = getRowAsObject(physicalRow, true); + } + } else { + final int[] offsets = offset.getOffsets(); + + for (int i = 0; i < offset.getCurrentVectorSize(); i++) { + final int physicalRow = frame.physicalRow(offsets[i]); + vector[i] = getRowAsObject(physicalRow, true); + } + } + + id = offset.getId(); + } + } + + return new StringFrameVectorObjectSelector(); + } + + @Override + public int length() + { + return frame.numRows(); } @Override public void close() { - delegate.close(); + // Do nothing. + } + + @Override + public ColumnType getType() + { + return ColumnType.STRING; + } + + @Override + public int numRows() + { + return length(); + } + + @Override + protected Object getVal(int rowNum) + { + return getRowAsObject(frame.physicalRow(rowNum), true); + } + + @Override + protected Comparator getComparator() + { + return Comparator.nullsFirst(Comparator.comparing(o -> ((String) o))); + } + + /** + * Returns a ByteBuffer containing UTF-8 encoded string number {@code index}. The ByteBuffer is always newly + * created, so it is OK to change its position, limit, etc. However, it may point to shared memory, so it is + * not OK to write to its contents. + */ + @Nullable + private ByteBuffer getStringUtf8(final int index) + { + final long dataStart; + final long dataEnd = + startOfStringDataSection + + memory.getInt(startOfStringLengthSection + (long) Integer.BYTES * index); + + if (index == 0) { + dataStart = startOfStringDataSection; + } else { + dataStart = + startOfStringDataSection + + memory.getInt(startOfStringLengthSection + (long) Integer.BYTES * (index - 1)); + } + + final int dataLength = Ints.checkedCast(dataEnd - dataStart); + + if ((dataLength == 0 && NullHandling.replaceWithDefault()) || + (dataLength == 1 && memory.getByte(dataStart) == FrameWriterUtils.NULL_STRING_MARKER)) { + return null; + } + + return FrameReaderUtils.readByteBuffer(memory, dataStart, dataLength); + } + + @Nullable + private String getString(final int index) + { + final ByteBuffer stringUtf8 = getStringUtf8(index); + + if (stringUtf8 == null) { + return null; + } else { + return StringUtils.fromUtf8(stringUtf8); + } + } + + /** + * Returns the object at the given physical row number. + * + * @param physicalRow physical row number + * @param decode if true, return java.lang.String. If false, return UTF-8 ByteBuffer. + */ + @Nullable + private Object getRowAsObject(final int physicalRow, final boolean decode) + { + if (multiValue) { + final int cumulativeRowLength = FrameColumnReaderUtils.getCumulativeRowLength( + memory, + getStartOfCumulativeLengthSection(), + physicalRow + ); + final int rowLength; + + if (FrameColumnReaderUtils.isNullRow(cumulativeRowLength)) { + return null; + } else if (physicalRow == 0) { + rowLength = cumulativeRowLength; + } else { + rowLength = cumulativeRowLength - FrameColumnReaderUtils.getAdjustedCumulativeRowLength( + memory, + getStartOfCumulativeLengthSection(), + physicalRow - 1 + ); + } + + if (rowLength == 0) { + return Collections.emptyList(); + } else if (rowLength == 1) { + final int index = cumulativeRowLength - 1; + final Object o = decode ? getString(index) : getStringUtf8(index); + return o; + } else { + final Object[] row = new Object[rowLength]; + + for (int i = 0; i < rowLength; i++) { + final int index = cumulativeRowLength - rowLength + i; + row[i] = decode ? getString(index) : getStringUtf8(index); + } + + return Arrays.asList(row); + } + } else { + final Object o = decode ? getString(physicalRow) : getStringUtf8(physicalRow); + return o; + } + } + + /** + * Returns the value at the given physical row number as a list of ByteBuffers. + * + * @param physicalRow physical row number + */ + private List getRowAsListUtf8(final int physicalRow) + { + final Object object = getRowAsObject(physicalRow, false); + + if (object == null) { + return Collections.singletonList(null); + } else if (object instanceof List) { + //noinspection unchecked + return (List) object; + } else { + return Collections.singletonList((ByteBuffer) object); + } } } } diff --git a/processing/src/main/java/org/apache/druid/frame/write/columnar/StringFrameColumnWriter.java b/processing/src/main/java/org/apache/druid/frame/write/columnar/StringFrameColumnWriter.java index 8eee0fd0cef..75f64613388 100644 --- a/processing/src/main/java/org/apache/druid/frame/write/columnar/StringFrameColumnWriter.java +++ b/processing/src/main/java/org/apache/druid/frame/write/columnar/StringFrameColumnWriter.java @@ -44,6 +44,9 @@ public abstract class StringFrameColumnWriter imp public static final long DATA_OFFSET = 1 /* type code */ + 1 /* single or multi-value? */; + public static final byte MULTI_VALUE_BYTE = (byte) 0x01; + public static final long MULTI_VALUE_POSITION = 1; + private final T selector; private final byte typeCode; protected final ColumnCapabilities.Capable multiValue; @@ -228,7 +231,7 @@ public abstract class StringFrameColumnWriter imp long currentPosition = startPosition; memory.putByte(currentPosition, typeCode); - memory.putByte(currentPosition + 1, writeMultiValue ? (byte) 1 : (byte) 0); + memory.putByte(currentPosition + 1, writeMultiValue ? MULTI_VALUE_BYTE : (byte) 0); currentPosition += 2; if (writeMultiValue) { diff --git a/processing/src/test/java/org/apache/druid/query/operator/window/RowsAndColumnsHelper.java b/processing/src/test/java/org/apache/druid/query/operator/window/RowsAndColumnsHelper.java index 2636156b53c..cdc84620ab0 100644 --- a/processing/src/test/java/org/apache/druid/query/operator/window/RowsAndColumnsHelper.java +++ b/processing/src/test/java/org/apache/druid/query/operator/window/RowsAndColumnsHelper.java @@ -29,6 +29,7 @@ import org.apache.druid.query.rowsandcols.column.ColumnAccessor; import org.apache.druid.segment.column.ColumnType; import org.junit.Assert; +import java.util.ArrayList; import java.util.Collection; import java.util.LinkedHashMap; import java.util.Map; @@ -280,6 +281,17 @@ public class RowsAndColumnsHelper } else { Assert.assertEquals(msg, ((Long) expectedVal).longValue(), accessor.getLong(i)); } + } else if (expectedVal instanceof Object[]) { + Object actualVal = accessor.getObject(i); + if (expectedNulls[i]) { + Assert.assertNull(msg, accessor.getObject(i)); + } else { + if (actualVal instanceof ArrayList) { + Assert.assertArrayEquals(msg, (Object[]) expectedVals[i], ((ArrayList) actualVal).toArray()); + } else { + Assert.assertArrayEquals(msg, (Object[]) expectedVals[i], (Object[]) actualVal); + } + } } else { if (expectedNulls[i]) { Assert.assertNull(msg, accessor.getObject(i)); diff --git a/processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/TestVirtualColumnEvaluationRowsAndColumnsTest.java b/processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/EvaluateRowsAndColumnsTest.java similarity index 73% rename from processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/TestVirtualColumnEvaluationRowsAndColumnsTest.java rename to processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/EvaluateRowsAndColumnsTest.java index 99d5dfabc89..e2cee35a8e9 100644 --- a/processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/TestVirtualColumnEvaluationRowsAndColumnsTest.java +++ b/processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/EvaluateRowsAndColumnsTest.java @@ -38,32 +38,44 @@ import java.util.function.Function; import static org.junit.Assert.assertEquals; import static org.junit.Assume.assumeNotNull; -public class TestVirtualColumnEvaluationRowsAndColumnsTest extends SemanticTestBase +public class EvaluateRowsAndColumnsTest extends SemanticTestBase { - public TestVirtualColumnEvaluationRowsAndColumnsTest(String name, Function fn) + public EvaluateRowsAndColumnsTest(String name, Function fn) { super(name, fn); } @Test - public void testMaterializeVirtualColumns() + public void testMaterializeColumns() { Object[][] vals = new Object[][] { - {1L, "a", 123L, 0L}, - {2L, "a", 456L, 1L}, - {3L, "b", 789L, 2L}, - {4L, "b", 123L, 3L}, + {1L, "a", 123L, new Object[]{"xyz", "x"}, 0L}, + {2L, "a", 456L, new Object[]{"abc"}, 1L}, + {3L, "b", 789L, new Object[]{null}, 2L}, + {4L, null, 123L, null, 3L}, }; RowSignature siggy = RowSignature.builder() .add("__time", ColumnType.LONG) .add("dim", ColumnType.STRING) .add("val", ColumnType.LONG) + .add("array", ColumnType.STRING_ARRAY) .add("arrayIndex", ColumnType.LONG) .build(); final RowsAndColumns base = make(MapOfColumnsRowsAndColumns.fromRowObjects(vals, siggy)); + Object[] expectedArr = new Object[][] { + {"xyz", "x"}, + {"abc"}, + {null}, + null + }; + + new RowsAndColumnsHelper() + .expectColumn("array", expectedArr, ColumnType.STRING_ARRAY) + .validate(base); + assumeNotNull("skipping: CursorFactory not supported", base.as(CursorFactory.class)); LazilyDecoratedRowsAndColumns ras = new LazilyDecoratedRowsAndColumns( @@ -82,12 +94,18 @@ public class TestVirtualColumnEvaluationRowsAndColumnsTest extends SemanticTestB // do the materialziation ras.numRows(); - assertEquals(Lists.newArrayList("__time", "dim", "val", "arrayIndex", "expr"), ras.getColumnNames()); + assertEquals(Lists.newArrayList("__time", "dim", "val", "array", "arrayIndex", "expr"), ras.getColumnNames()); new RowsAndColumnsHelper() .expectColumn("expr", new long[] {123 * 2, 456L * 2, 789 * 2, 123 * 2}) .validate(ras); - } + new RowsAndColumnsHelper() + .expectColumn("dim", new String[] {"a", "a", "b", null}, ColumnType.STRING) + .validate(ras); + new RowsAndColumnsHelper() + .expectColumn("array", expectedArr, ColumnType.STRING_ARRAY) + .validate(ras); + } }