Fix String Frame Readers to read String Arrays correctly (#16885)

While writing to a frame, String arrays are written by setting the multivalue byte.
But while reading, it was hardcoded to false.
This commit is contained in:
Sree Charan Manamala 2024-09-10 14:20:54 +05:30 committed by GitHub
parent 72fbaf2e56
commit c7c3307e61
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 668 additions and 339 deletions

View File

@ -480,9 +480,8 @@ public class StringFieldReader implements FieldReader
public boolean isNull(int rowNum)
{
final long fieldPosition = coach.computeFieldPosition(rowNum);
byte[] nullBytes = new byte[3];
dataRegion.getByteArray(fieldPosition, nullBytes, 0, 3);
return Arrays.equals(nullBytes, EXPECTED_BYTES_FOR_NULL);
return dataRegion.getByte(fieldPosition) == StringFieldWriter.NULL_ROW
&& dataRegion.getByte(fieldPosition + 1) == StringFieldWriter.ROW_TERMINATOR;
}
@Override

View File

@ -51,7 +51,7 @@ public class FrameColumnReaders
return new DoubleFrameColumnReader(columnNumber);
case STRING:
return new StringFrameColumnReader(columnNumber, false);
return new StringFrameColumnReader(columnNumber);
case COMPLEX:
return new ComplexFrameColumnReader(columnNumber);
@ -59,7 +59,7 @@ public class FrameColumnReaders
case ARRAY:
switch (columnType.getElementType().getType()) {
case STRING:
return new StringFrameColumnReader(columnNumber, true);
return new StringArrayFrameColumnReader(columnNumber);
case LONG:
return new LongArrayFrameColumnReader(columnNumber);
case FLOAT:

View File

@ -0,0 +1,385 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.frame.read.columnar;
import com.google.common.primitives.Ints;
import it.unimi.dsi.fastutil.objects.ObjectArrays;
import org.apache.datasketches.memory.Memory;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.error.DruidException;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.read.FrameReaderUtils;
import org.apache.druid.frame.write.FrameWriterUtils;
import org.apache.druid.frame.write.columnar.FrameColumnWriters;
import org.apache.druid.frame.write.columnar.StringFrameColumnWriter;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.query.rowsandcols.column.ColumnAccessorBasedColumn;
import org.apache.druid.query.rowsandcols.column.accessor.ObjectColumnAccessorBase;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.ObjectColumnSelector;
import org.apache.druid.segment.column.BaseColumn;
import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.data.ReadableOffset;
import org.apache.druid.segment.vector.ReadableVectorInspector;
import org.apache.druid.segment.vector.ReadableVectorOffset;
import org.apache.druid.segment.vector.VectorObjectSelector;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.Comparator;
/**
* Reader for {@link ColumnType#STRING_ARRAY}.
* This is similar to {@link StringFrameColumnReader} reading mvds in reading bytes from frame
*/
public class StringArrayFrameColumnReader implements FrameColumnReader
{
private final int columnNumber;
/**
* Create a new reader.
*
* @param columnNumber column number
*/
StringArrayFrameColumnReader(int columnNumber)
{
this.columnNumber = columnNumber;
}
@Override
public Column readRACColumn(Frame frame)
{
final Memory memory = frame.region(columnNumber);
validate(memory);
final long positionOfLengths = getStartOfStringLengthSection(frame.numRows());
final long positionOfPayloads = getStartOfStringDataSection(memory, frame.numRows());
StringArrayFrameColumn frameCol = new StringArrayFrameColumn(
frame,
memory,
positionOfLengths,
positionOfPayloads
);
return new ColumnAccessorBasedColumn(frameCol);
}
@Override
public ColumnPlus readColumn(final Frame frame)
{
final Memory memory = frame.region(columnNumber);
validate(memory);
final long startOfStringLengthSection = getStartOfStringLengthSection(frame.numRows());
final long startOfStringDataSection = getStartOfStringDataSection(memory, frame.numRows());
final BaseColumn baseColumn = new StringArrayFrameColumn(
frame,
memory,
startOfStringLengthSection,
startOfStringDataSection
);
return new ColumnPlus(
baseColumn,
new ColumnCapabilitiesImpl().setType(ColumnType.STRING_ARRAY)
.setHasMultipleValues(false)
.setDictionaryEncoded(false),
frame.numRows()
);
}
private void validate(final Memory region)
{
// Check if column is big enough for a header
if (region.getCapacity() < StringFrameColumnWriter.DATA_OFFSET) {
throw DruidException.defensive("Column[%s] is not big enough for a header", columnNumber);
}
final byte typeCode = region.getByte(0);
if (typeCode != FrameColumnWriters.TYPE_STRING_ARRAY) {
throw DruidException.defensive(
"Column[%s] does not have the correct type code; expected[%s], got[%s]",
columnNumber,
FrameColumnWriters.TYPE_STRING_ARRAY,
typeCode
);
}
}
private static long getStartOfCumulativeLengthSection()
{
return StringFrameColumnWriter.DATA_OFFSET;
}
private static long getStartOfStringLengthSection(final int numRows)
{
return StringFrameColumnWriter.DATA_OFFSET + (long) Integer.BYTES * numRows;
}
private long getStartOfStringDataSection(
final Memory memory,
final int numRows
)
{
if (numRows < 0) {
throw DruidException.defensive("Encountered -ve numRows [%s] while reading frame", numRows);
}
final int totalNumValues = FrameColumnReaderUtils.getAdjustedCumulativeRowLength(
memory,
getStartOfCumulativeLengthSection(),
numRows - 1
);
return getStartOfStringLengthSection(numRows) + (long) Integer.BYTES * totalNumValues;
}
private static class StringArrayFrameColumn extends ObjectColumnAccessorBase implements BaseColumn
{
private final Frame frame;
private final Memory memory;
private final long startOfStringLengthSection;
private final long startOfStringDataSection;
private StringArrayFrameColumn(
Frame frame,
Memory memory,
long startOfStringLengthSection,
long startOfStringDataSection
)
{
this.frame = frame;
this.memory = memory;
this.startOfStringLengthSection = startOfStringLengthSection;
this.startOfStringDataSection = startOfStringDataSection;
}
@Override
public ColumnValueSelector<?> makeColumnValueSelector(ReadableOffset offset)
{
return new ObjectColumnSelector<Object>()
{
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
// Do nothing.
}
@Nullable
@Override
public Object getObject()
{
return getRowAsObject(frame.physicalRow(offset.getOffset()), true);
}
@Override
public Class<?> classOfObject()
{
return Object[].class;
}
};
}
@Override
public VectorObjectSelector makeVectorObjectSelector(final ReadableVectorOffset offset)
{
class StringArrayFrameVectorObjectSelector implements VectorObjectSelector
{
private final Object[] vector = new Object[offset.getMaxVectorSize()];
private int id = ReadableVectorInspector.NULL_ID;
@Override
public Object[] getObjectVector()
{
computeVectorIfNeeded();
return vector;
}
@Override
public int getMaxVectorSize()
{
return offset.getMaxVectorSize();
}
@Override
public int getCurrentVectorSize()
{
return offset.getCurrentVectorSize();
}
private void computeVectorIfNeeded()
{
if (id == offset.getId()) {
return;
}
if (offset.isContiguous()) {
final int start = offset.getStartOffset();
for (int i = 0; i < offset.getCurrentVectorSize(); i++) {
final int physicalRow = frame.physicalRow(i + start);
vector[i] = getRowAsObject(physicalRow, true);
}
} else {
final int[] offsets = offset.getOffsets();
for (int i = 0; i < offset.getCurrentVectorSize(); i++) {
final int physicalRow = frame.physicalRow(offsets[i]);
vector[i] = getRowAsObject(physicalRow, true);
}
}
id = offset.getId();
}
}
return new StringArrayFrameVectorObjectSelector();
}
@Override
public void close()
{
// Do nothing.
}
@Override
public ColumnType getType()
{
return ColumnType.STRING_ARRAY;
}
@Override
public int numRows()
{
return frame.numRows();
}
@Override
protected Object getVal(int rowNum)
{
return getRowAsObject(frame.physicalRow(rowNum), true);
}
@Override
protected Comparator<Object> getComparator()
{
return Comparator.nullsFirst(ColumnType.STRING_ARRAY.getStrategy());
}
/**
* Returns a ByteBuffer containing UTF-8 encoded string number {@code index}. The ByteBuffer is always newly
* created, so it is OK to change its position, limit, etc. However, it may point to shared memory, so it is
* not OK to write to its contents.
*/
@Nullable
private ByteBuffer getStringUtf8(final int index)
{
if (startOfStringLengthSection > Long.MAX_VALUE - (long) Integer.BYTES * index) {
throw DruidException.defensive("length index would overflow trying to read the frame memory!");
}
final int dataEndVariableIndex = memory.getInt(startOfStringLengthSection + (long) Integer.BYTES * index);
if (startOfStringDataSection > Long.MAX_VALUE - dataEndVariableIndex) {
throw DruidException.defensive("data end index would overflow trying to read the frame memory!");
}
final long dataStart;
final long dataEnd = startOfStringDataSection + dataEndVariableIndex;
if (index == 0) {
dataStart = startOfStringDataSection;
} else {
final int dataStartVariableIndex = memory.getInt(startOfStringLengthSection + (long) Integer.BYTES * (index
- 1));
if (startOfStringDataSection > Long.MAX_VALUE - dataStartVariableIndex) {
throw DruidException.defensive("data start index would overflow trying to read the frame memory!");
}
dataStart = startOfStringDataSection + dataStartVariableIndex;
}
final int dataLength = Ints.checkedCast(dataEnd - dataStart);
if ((dataLength == 0 && NullHandling.replaceWithDefault()) ||
(dataLength == 1 && memory.getByte(dataStart) == FrameWriterUtils.NULL_STRING_MARKER)) {
return null;
}
return FrameReaderUtils.readByteBuffer(memory, dataStart, dataLength);
}
@Nullable
private String getString(final int index)
{
final ByteBuffer stringUtf8 = getStringUtf8(index);
if (stringUtf8 == null) {
return null;
} else {
return StringUtils.fromUtf8(stringUtf8);
}
}
/**
* Returns the object at the given physical row number.
*
* @param physicalRow physical row number
* @param decode if true, return java.lang.String. If false, return UTF-8 ByteBuffer.
*/
@Nullable
private Object getRowAsObject(final int physicalRow, final boolean decode)
{
final int cumulativeRowLength = FrameColumnReaderUtils.getCumulativeRowLength(
memory,
getStartOfCumulativeLengthSection(),
physicalRow
);
final int rowLength;
if (FrameColumnReaderUtils.isNullRow(cumulativeRowLength)) {
return null;
} else if (physicalRow == 0) {
rowLength = cumulativeRowLength;
} else {
rowLength = cumulativeRowLength - FrameColumnReaderUtils.getAdjustedCumulativeRowLength(
memory,
getStartOfCumulativeLengthSection(),
physicalRow - 1
);
}
if (rowLength == 0) {
return ObjectArrays.EMPTY_ARRAY;
} else {
final Object[] row = new Object[rowLength];
for (int i = 0; i < rowLength; i++) {
final int index = cumulativeRowLength - rowLength + i;
row[i] = decode ? getString(index) : getStringUtf8(index);
}
return row;
}
}
}
}

View File

@ -19,18 +19,16 @@
package org.apache.druid.frame.read.columnar;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.primitives.Ints;
import it.unimi.dsi.fastutil.objects.ObjectArrays;
import org.apache.datasketches.memory.Memory;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.read.FrameReaderUtils;
import org.apache.druid.frame.write.FrameWriterUtils;
import org.apache.druid.frame.write.columnar.FrameColumnWriters;
import org.apache.druid.frame.write.columnar.StringFrameColumnWriter;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.extraction.ExtractionFn;
import org.apache.druid.query.filter.DruidPredicateFactory;
@ -40,13 +38,11 @@ import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.query.rowsandcols.column.ColumnAccessorBasedColumn;
import org.apache.druid.query.rowsandcols.column.accessor.ObjectColumnAccessorBase;
import org.apache.druid.segment.BaseSingleValueDimensionSelector;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.DimensionDictionarySelector;
import org.apache.druid.segment.DimensionSelector;
import org.apache.druid.segment.DimensionSelectorUtils;
import org.apache.druid.segment.IdLookup;
import org.apache.druid.segment.column.BaseColumn;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.DictionaryEncodedColumn;
@ -67,23 +63,20 @@ import java.util.Comparator;
import java.util.List;
/**
* Reader for {@link StringFrameColumnWriter}, types {@link ColumnType#STRING} and {@link ColumnType#STRING_ARRAY}.
* Reader for {@link StringFrameColumnWriter}, type {@link ColumnType#STRING}.
*/
public class StringFrameColumnReader implements FrameColumnReader
{
private final int columnNumber;
private final boolean asArray;
/**
* Create a new reader.
*
* @param columnNumber column number
* @param asArray true for {@link ColumnType#STRING_ARRAY}, false for {@link ColumnType#STRING}
*/
StringFrameColumnReader(int columnNumber, boolean asArray)
StringFrameColumnReader(int columnNumber)
{
this.columnNumber = columnNumber;
this.asArray = asArray;
}
@Override
@ -92,18 +85,20 @@ public class StringFrameColumnReader implements FrameColumnReader
final Memory memory = frame.region(columnNumber);
validate(memory);
if (isMultiValue(memory)) {
throw InvalidInput.exception("Encountered a multi value column. Window processing does not support MVDs. "
+ "Consider using UNNEST or MV_TO_ARRAY.");
}
final long positionOfLengths = getStartOfStringLengthSection(frame.numRows(), false);
final long positionOfPayloads = getStartOfStringDataSection(memory, frame.numRows(), false);
StringFrameColumn frameCol =
new StringFrameColumn(
frame,
false,
memory,
positionOfLengths,
positionOfPayloads,
asArray || isMultiValue(memory) // Read MVDs as String arrays
);
StringFrameColumn frameCol = new StringFrameColumn(
frame,
false,
memory,
positionOfLengths,
positionOfPayloads
);
return new ColumnAccessorBasedColumn(frameCol);
}
@ -118,35 +113,19 @@ public class StringFrameColumnReader implements FrameColumnReader
final long startOfStringLengthSection = getStartOfStringLengthSection(frame.numRows(), multiValue);
final long startOfStringDataSection = getStartOfStringDataSection(memory, frame.numRows(), multiValue);
final BaseColumn baseColumn;
if (asArray) {
baseColumn = new StringArrayFrameColumn(
frame,
multiValue,
memory,
startOfStringLengthSection,
startOfStringDataSection
);
} else {
baseColumn = new StringFrameColumn(
frame,
multiValue,
memory,
startOfStringLengthSection,
startOfStringDataSection,
false
);
}
final BaseColumn baseColumn = new StringFrameColumn(
frame,
multiValue,
memory,
startOfStringLengthSection,
startOfStringDataSection
);
return new ColumnPlus(
baseColumn,
new ColumnCapabilitiesImpl().setType(asArray ? ColumnType.STRING_ARRAY : ColumnType.STRING)
.setHasMultipleValues(!asArray && multiValue)
.setDictionaryEncoded(false)
.setHasBitmapIndexes(false)
.setHasSpatialIndexes(false)
.setHasNulls(ColumnCapabilities.Capable.UNKNOWN),
new ColumnCapabilitiesImpl().setType(ColumnType.STRING)
.setHasMultipleValues(multiValue)
.setDictionaryEncoded(false),
frame.numRows()
);
}
@ -159,12 +138,11 @@ public class StringFrameColumnReader implements FrameColumnReader
}
final byte typeCode = region.getByte(0);
final byte expectedTypeCode = asArray ? FrameColumnWriters.TYPE_STRING_ARRAY : FrameColumnWriters.TYPE_STRING;
if (typeCode != expectedTypeCode) {
if (typeCode != FrameColumnWriters.TYPE_STRING) {
throw DruidException.defensive(
"Column[%s] does not have the correct type code; expected[%s], got[%s]",
columnNumber,
expectedTypeCode,
FrameColumnWriters.TYPE_STRING,
typeCode
);
}
@ -172,7 +150,7 @@ public class StringFrameColumnReader implements FrameColumnReader
private static boolean isMultiValue(final Memory memory)
{
return memory.getByte(1) == 1;
return memory.getByte(StringFrameColumnWriter.MULTI_VALUE_POSITION) == StringFrameColumnWriter.MULTI_VALUE_BYTE;
}
private static long getStartOfCumulativeLengthSection()
@ -213,8 +191,7 @@ public class StringFrameColumnReader implements FrameColumnReader
return getStartOfStringLengthSection(numRows, multiValue) + (long) Integer.BYTES * totalNumValues;
}
@VisibleForTesting
static class StringFrameColumn extends ObjectColumnAccessorBase implements DictionaryEncodedColumn<String>
private static class StringFrameColumn extends ObjectColumnAccessorBase implements DictionaryEncodedColumn<String>
{
private final Frame frame;
private final Memory memory;
@ -226,18 +203,12 @@ public class StringFrameColumnReader implements FrameColumnReader
*/
private final boolean multiValue;
/**
* Whether the column is being read as {@link ColumnType#STRING_ARRAY} (true) or {@link ColumnType#STRING} (false).
*/
private final boolean asArray;
private StringFrameColumn(
Frame frame,
boolean multiValue,
Memory memory,
long startOfStringLengthSection,
long startOfStringDataSection,
final boolean asArray
long startOfStringDataSection
)
{
this.frame = frame;
@ -245,7 +216,6 @@ public class StringFrameColumnReader implements FrameColumnReader
this.memory = memory;
this.startOfStringLengthSection = startOfStringLengthSection;
this.startOfStringDataSection = startOfStringDataSection;
this.asArray = asArray;
}
@Override
@ -292,247 +262,6 @@ public class StringFrameColumnReader implements FrameColumnReader
@Override
public DimensionSelector makeDimensionSelector(ReadableOffset offset, @Nullable ExtractionFn extractionFn)
{
if (asArray) {
throw new ISE("Cannot call makeDimensionSelector on field of type [%s]", ColumnType.STRING_ARRAY);
}
return makeDimensionSelectorInternal(offset, extractionFn);
}
@Override
public SingleValueDimensionVectorSelector makeSingleValueDimensionVectorSelector(ReadableVectorOffset offset)
{
// Callers should use object selectors, because we have no dictionary.
throw new UnsupportedOperationException();
}
@Override
public MultiValueDimensionVectorSelector makeMultiValueDimensionVectorSelector(ReadableVectorOffset vectorOffset)
{
// Callers should use object selectors, because we have no dictionary.
throw new UnsupportedOperationException();
}
@Override
public VectorObjectSelector makeVectorObjectSelector(final ReadableVectorOffset offset)
{
class StringFrameVectorObjectSelector implements VectorObjectSelector
{
private final Object[] vector = new Object[offset.getMaxVectorSize()];
private int id = ReadableVectorInspector.NULL_ID;
@Override
public Object[] getObjectVector()
{
computeVectorIfNeeded();
return vector;
}
@Override
public int getMaxVectorSize()
{
return offset.getMaxVectorSize();
}
@Override
public int getCurrentVectorSize()
{
return offset.getCurrentVectorSize();
}
private void computeVectorIfNeeded()
{
if (id == offset.getId()) {
return;
}
if (offset.isContiguous()) {
final int start = offset.getStartOffset();
for (int i = 0; i < offset.getCurrentVectorSize(); i++) {
final int physicalRow = frame.physicalRow(i + start);
vector[i] = getRowAsObject(physicalRow, true);
}
} else {
final int[] offsets = offset.getOffsets();
for (int i = 0; i < offset.getCurrentVectorSize(); i++) {
final int physicalRow = frame.physicalRow(offsets[i]);
vector[i] = getRowAsObject(physicalRow, true);
}
}
id = offset.getId();
}
}
return new StringFrameVectorObjectSelector();
}
@Override
public int length()
{
return frame.numRows();
}
@Override
public void close()
{
// Do nothing.
}
@Override
public ColumnType getType()
{
return asArray ? ColumnType.STRING_ARRAY : ColumnType.STRING;
}
@Override
public int numRows()
{
return length();
}
@Override
protected Object getVal(int rowNum)
{
return getString(frame.physicalRow(rowNum));
}
@Override
protected Comparator<Object> getComparator()
{
return Comparator.nullsFirst(Comparator.comparing(o -> ((String) o)));
}
/**
* Returns a ByteBuffer containing UTF-8 encoded string number {@code index}. The ByteBuffer is always newly
* created, so it is OK to change its position, limit, etc. However, it may point to shared memory, so it is
* not OK to write to its contents.
*/
@Nullable
private ByteBuffer getStringUtf8(final int index)
{
final long dataStart;
final long dataEnd =
startOfStringDataSection +
memory.getInt(startOfStringLengthSection + (long) Integer.BYTES * index);
if (index == 0) {
dataStart = startOfStringDataSection;
} else {
dataStart =
startOfStringDataSection +
memory.getInt(startOfStringLengthSection + (long) Integer.BYTES * (index - 1));
}
final int dataLength = Ints.checkedCast(dataEnd - dataStart);
if ((dataLength == 0 && NullHandling.replaceWithDefault()) ||
(dataLength == 1 && memory.getByte(dataStart) == FrameWriterUtils.NULL_STRING_MARKER)) {
return null;
}
return FrameReaderUtils.readByteBuffer(memory, dataStart, dataLength);
}
@Nullable
private String getString(final int index)
{
final ByteBuffer stringUtf8 = getStringUtf8(index);
if (stringUtf8 == null) {
return null;
} else {
return StringUtils.fromUtf8(stringUtf8);
}
}
/**
* Returns the object at the given physical row number.
*
* When {@link #asArray}, the return value is always of type {@code Object[]}. Otherwise, the return value
* is either an empty list (if the row is empty), a single String (if the row has one value), or a List
* of Strings (if the row has more than one value).
*
* @param physicalRow physical row number
* @param decode if true, return java.lang.String. If false, return UTF-8 ByteBuffer.
*/
@Nullable
private Object getRowAsObject(final int physicalRow, final boolean decode)
{
if (multiValue) {
final int cumulativeRowLength = FrameColumnReaderUtils.getCumulativeRowLength(
memory,
getStartOfCumulativeLengthSection(),
physicalRow
);
final int rowLength;
if (FrameColumnReaderUtils.isNullRow(cumulativeRowLength)) {
return null;
} else if (physicalRow == 0) {
rowLength = cumulativeRowLength;
} else {
rowLength = cumulativeRowLength - FrameColumnReaderUtils.getAdjustedCumulativeRowLength(
memory,
getStartOfCumulativeLengthSection(),
physicalRow - 1
);
}
if (rowLength == 0) {
return asArray ? ObjectArrays.EMPTY_ARRAY : Collections.emptyList();
} else if (rowLength == 1) {
final int index = cumulativeRowLength - 1;
final Object o = decode ? getString(index) : getStringUtf8(index);
return asArray ? new Object[]{o} : o;
} else {
final Object[] row = new Object[rowLength];
for (int i = 0; i < rowLength; i++) {
final int index = cumulativeRowLength - rowLength + i;
row[i] = decode ? getString(index) : getStringUtf8(index);
}
return asArray ? row : Arrays.asList(row);
}
} else {
final Object o = decode ? getString(physicalRow) : getStringUtf8(physicalRow);
return asArray ? new Object[]{o} : o;
}
}
/**
* Returns the value at the given physical row number as a list of ByteBuffers. Only valid when !asArray, i.e.,
* when type is {@link ColumnType#STRING}.
*
* @param physicalRow physical row number
*/
private List<ByteBuffer> getRowAsListUtf8(final int physicalRow)
{
if (asArray) {
throw DruidException.defensive("Unexpected call for array column");
}
final Object object = getRowAsObject(physicalRow, false);
if (object == null) {
return Collections.singletonList(null);
} else if (object instanceof List) {
//noinspection unchecked
return (List<ByteBuffer>) object;
} else {
return Collections.singletonList((ByteBuffer) object);
}
}
/**
* Selector used by this column. It's versatile: it can run as string array (asArray = true) or regular string
* column (asArray = false).
*/
private DimensionSelector makeDimensionSelectorInternal(ReadableOffset offset, @Nullable ExtractionFn extractionFn)
{
if (multiValue) {
class MultiValueSelector implements DimensionSelector
@ -671,41 +400,224 @@ public class StringFrameColumnReader implements FrameColumnReader
return new SingleValueSelector();
}
}
}
static class StringArrayFrameColumn implements BaseColumn
{
private final StringFrameColumn delegate;
private StringArrayFrameColumn(
Frame frame,
boolean multiValue,
Memory memory,
long startOfStringLengthSection,
long startOfStringDataSection
)
@Override
public SingleValueDimensionVectorSelector makeSingleValueDimensionVectorSelector(ReadableVectorOffset offset)
{
this.delegate = new StringFrameColumn(
frame,
multiValue,
memory,
startOfStringLengthSection,
startOfStringDataSection,
true
);
// Callers should use object selectors, because we have no dictionary.
throw new UnsupportedOperationException();
}
@Override
@SuppressWarnings("rawtypes")
public ColumnValueSelector makeColumnValueSelector(ReadableOffset offset)
public MultiValueDimensionVectorSelector makeMultiValueDimensionVectorSelector(ReadableVectorOffset vectorOffset)
{
return delegate.makeDimensionSelectorInternal(offset, null);
// Callers should use object selectors, because we have no dictionary.
throw new UnsupportedOperationException();
}
@Override
public VectorObjectSelector makeVectorObjectSelector(final ReadableVectorOffset offset)
{
class StringFrameVectorObjectSelector implements VectorObjectSelector
{
private final Object[] vector = new Object[offset.getMaxVectorSize()];
private int id = ReadableVectorInspector.NULL_ID;
@Override
public Object[] getObjectVector()
{
computeVectorIfNeeded();
return vector;
}
@Override
public int getMaxVectorSize()
{
return offset.getMaxVectorSize();
}
@Override
public int getCurrentVectorSize()
{
return offset.getCurrentVectorSize();
}
private void computeVectorIfNeeded()
{
if (id == offset.getId()) {
return;
}
if (offset.isContiguous()) {
final int start = offset.getStartOffset();
for (int i = 0; i < offset.getCurrentVectorSize(); i++) {
final int physicalRow = frame.physicalRow(i + start);
vector[i] = getRowAsObject(physicalRow, true);
}
} else {
final int[] offsets = offset.getOffsets();
for (int i = 0; i < offset.getCurrentVectorSize(); i++) {
final int physicalRow = frame.physicalRow(offsets[i]);
vector[i] = getRowAsObject(physicalRow, true);
}
}
id = offset.getId();
}
}
return new StringFrameVectorObjectSelector();
}
@Override
public int length()
{
return frame.numRows();
}
@Override
public void close()
{
delegate.close();
// Do nothing.
}
@Override
public ColumnType getType()
{
return ColumnType.STRING;
}
@Override
public int numRows()
{
return length();
}
@Override
protected Object getVal(int rowNum)
{
return getRowAsObject(frame.physicalRow(rowNum), true);
}
@Override
protected Comparator<Object> getComparator()
{
return Comparator.nullsFirst(Comparator.comparing(o -> ((String) o)));
}
/**
* Returns a ByteBuffer containing UTF-8 encoded string number {@code index}. The ByteBuffer is always newly
* created, so it is OK to change its position, limit, etc. However, it may point to shared memory, so it is
* not OK to write to its contents.
*/
@Nullable
private ByteBuffer getStringUtf8(final int index)
{
final long dataStart;
final long dataEnd =
startOfStringDataSection +
memory.getInt(startOfStringLengthSection + (long) Integer.BYTES * index);
if (index == 0) {
dataStart = startOfStringDataSection;
} else {
dataStart =
startOfStringDataSection +
memory.getInt(startOfStringLengthSection + (long) Integer.BYTES * (index - 1));
}
final int dataLength = Ints.checkedCast(dataEnd - dataStart);
if ((dataLength == 0 && NullHandling.replaceWithDefault()) ||
(dataLength == 1 && memory.getByte(dataStart) == FrameWriterUtils.NULL_STRING_MARKER)) {
return null;
}
return FrameReaderUtils.readByteBuffer(memory, dataStart, dataLength);
}
@Nullable
private String getString(final int index)
{
final ByteBuffer stringUtf8 = getStringUtf8(index);
if (stringUtf8 == null) {
return null;
} else {
return StringUtils.fromUtf8(stringUtf8);
}
}
/**
* Returns the object at the given physical row number.
*
* @param physicalRow physical row number
* @param decode if true, return java.lang.String. If false, return UTF-8 ByteBuffer.
*/
@Nullable
private Object getRowAsObject(final int physicalRow, final boolean decode)
{
if (multiValue) {
final int cumulativeRowLength = FrameColumnReaderUtils.getCumulativeRowLength(
memory,
getStartOfCumulativeLengthSection(),
physicalRow
);
final int rowLength;
if (FrameColumnReaderUtils.isNullRow(cumulativeRowLength)) {
return null;
} else if (physicalRow == 0) {
rowLength = cumulativeRowLength;
} else {
rowLength = cumulativeRowLength - FrameColumnReaderUtils.getAdjustedCumulativeRowLength(
memory,
getStartOfCumulativeLengthSection(),
physicalRow - 1
);
}
if (rowLength == 0) {
return Collections.emptyList();
} else if (rowLength == 1) {
final int index = cumulativeRowLength - 1;
final Object o = decode ? getString(index) : getStringUtf8(index);
return o;
} else {
final Object[] row = new Object[rowLength];
for (int i = 0; i < rowLength; i++) {
final int index = cumulativeRowLength - rowLength + i;
row[i] = decode ? getString(index) : getStringUtf8(index);
}
return Arrays.asList(row);
}
} else {
final Object o = decode ? getString(physicalRow) : getStringUtf8(physicalRow);
return o;
}
}
/**
* Returns the value at the given physical row number as a list of ByteBuffers.
*
* @param physicalRow physical row number
*/
private List<ByteBuffer> getRowAsListUtf8(final int physicalRow)
{
final Object object = getRowAsObject(physicalRow, false);
if (object == null) {
return Collections.singletonList(null);
} else if (object instanceof List) {
//noinspection unchecked
return (List<ByteBuffer>) object;
} else {
return Collections.singletonList((ByteBuffer) object);
}
}
}
}

View File

@ -44,6 +44,9 @@ public abstract class StringFrameColumnWriter<T extends ColumnValueSelector> imp
public static final long DATA_OFFSET = 1 /* type code */ + 1 /* single or multi-value? */;
public static final byte MULTI_VALUE_BYTE = (byte) 0x01;
public static final long MULTI_VALUE_POSITION = 1;
private final T selector;
private final byte typeCode;
protected final ColumnCapabilities.Capable multiValue;
@ -228,7 +231,7 @@ public abstract class StringFrameColumnWriter<T extends ColumnValueSelector> imp
long currentPosition = startPosition;
memory.putByte(currentPosition, typeCode);
memory.putByte(currentPosition + 1, writeMultiValue ? (byte) 1 : (byte) 0);
memory.putByte(currentPosition + 1, writeMultiValue ? MULTI_VALUE_BYTE : (byte) 0);
currentPosition += 2;
if (writeMultiValue) {

View File

@ -29,6 +29,7 @@ import org.apache.druid.query.rowsandcols.column.ColumnAccessor;
import org.apache.druid.segment.column.ColumnType;
import org.junit.Assert;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.Map;
@ -280,6 +281,17 @@ public class RowsAndColumnsHelper
} else {
Assert.assertEquals(msg, ((Long) expectedVal).longValue(), accessor.getLong(i));
}
} else if (expectedVal instanceof Object[]) {
Object actualVal = accessor.getObject(i);
if (expectedNulls[i]) {
Assert.assertNull(msg, accessor.getObject(i));
} else {
if (actualVal instanceof ArrayList) {
Assert.assertArrayEquals(msg, (Object[]) expectedVals[i], ((ArrayList<?>) actualVal).toArray());
} else {
Assert.assertArrayEquals(msg, (Object[]) expectedVals[i], (Object[]) actualVal);
}
}
} else {
if (expectedNulls[i]) {
Assert.assertNull(msg, accessor.getObject(i));

View File

@ -38,32 +38,44 @@ import java.util.function.Function;
import static org.junit.Assert.assertEquals;
import static org.junit.Assume.assumeNotNull;
public class TestVirtualColumnEvaluationRowsAndColumnsTest extends SemanticTestBase
public class EvaluateRowsAndColumnsTest extends SemanticTestBase
{
public TestVirtualColumnEvaluationRowsAndColumnsTest(String name, Function<MapOfColumnsRowsAndColumns, RowsAndColumns> fn)
public EvaluateRowsAndColumnsTest(String name, Function<MapOfColumnsRowsAndColumns, RowsAndColumns> fn)
{
super(name, fn);
}
@Test
public void testMaterializeVirtualColumns()
public void testMaterializeColumns()
{
Object[][] vals = new Object[][] {
{1L, "a", 123L, 0L},
{2L, "a", 456L, 1L},
{3L, "b", 789L, 2L},
{4L, "b", 123L, 3L},
{1L, "a", 123L, new Object[]{"xyz", "x"}, 0L},
{2L, "a", 456L, new Object[]{"abc"}, 1L},
{3L, "b", 789L, new Object[]{null}, 2L},
{4L, null, 123L, null, 3L},
};
RowSignature siggy = RowSignature.builder()
.add("__time", ColumnType.LONG)
.add("dim", ColumnType.STRING)
.add("val", ColumnType.LONG)
.add("array", ColumnType.STRING_ARRAY)
.add("arrayIndex", ColumnType.LONG)
.build();
final RowsAndColumns base = make(MapOfColumnsRowsAndColumns.fromRowObjects(vals, siggy));
Object[] expectedArr = new Object[][] {
{"xyz", "x"},
{"abc"},
{null},
null
};
new RowsAndColumnsHelper()
.expectColumn("array", expectedArr, ColumnType.STRING_ARRAY)
.validate(base);
assumeNotNull("skipping: CursorFactory not supported", base.as(CursorFactory.class));
LazilyDecoratedRowsAndColumns ras = new LazilyDecoratedRowsAndColumns(
@ -82,12 +94,18 @@ public class TestVirtualColumnEvaluationRowsAndColumnsTest extends SemanticTestB
// do the materialziation
ras.numRows();
assertEquals(Lists.newArrayList("__time", "dim", "val", "arrayIndex", "expr"), ras.getColumnNames());
assertEquals(Lists.newArrayList("__time", "dim", "val", "array", "arrayIndex", "expr"), ras.getColumnNames());
new RowsAndColumnsHelper()
.expectColumn("expr", new long[] {123 * 2, 456L * 2, 789 * 2, 123 * 2})
.validate(ras);
}
new RowsAndColumnsHelper()
.expectColumn("dim", new String[] {"a", "a", "b", null}, ColumnType.STRING)
.validate(ras);
new RowsAndColumnsHelper()
.expectColumn("array", expectedArr, ColumnType.STRING_ARRAY)
.validate(ras);
}
}