Add fieldReader for row based frames (#16707)

Add a new fieldReaders#makeRAC for RowBasedFrameRowsAndColumns.
This commit is contained in:
Adarsh Sanjeev 2024-08-13 14:04:41 +05:30 committed by GitHub
parent f67ff92d07
commit c6da2f30e8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
20 changed files with 1210 additions and 90 deletions

View File

@ -21,24 +21,32 @@ package org.apache.druid.frame.field;
import com.google.common.base.Preconditions;
import org.apache.datasketches.memory.Memory;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.write.RowBasedFrameWriter;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.extraction.ExtractionFn;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.query.rowsandcols.column.ColumnAccessor;
import org.apache.druid.query.rowsandcols.column.accessor.ObjectColumnAccessorBase;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.DimensionSelector;
import org.apache.druid.segment.ObjectColumnSelector;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.serde.ComplexMetricSerde;
import org.apache.druid.segment.serde.ComplexMetrics;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.Comparator;
/**
* Reads values written by {@link ComplexFieldWriter}.
*
* <p>
* Format:
*
* <p>
* - 1 byte: {@link ComplexFieldWriter#NULL_BYTE} or {@link ComplexFieldWriter#NOT_NULL_BYTE}
* - 4 bytes: length of serialized complex value, little-endian int
* - N bytes: serialized complex value
@ -121,7 +129,7 @@ public class ComplexFieldReader implements FieldReader
* Alternative interface to read the field from the memory without creating a selector and field pointer
*/
@Nullable
public static Object readFieldFromMemory(
public static <T> T readFieldFromMemory(
final ComplexMetricSerde serde,
final Memory memory,
final long position
@ -136,7 +144,8 @@ public class ComplexFieldReader implements FieldReader
final byte[] bytes = new byte[length];
memory.getByteArray(position + ComplexFieldWriter.HEADER_SIZE, bytes, 0, length);
return serde.fromBytes(bytes, 0, length);
//noinspection unchecked
return (T) serde.fromBytes(bytes, 0, length);
} else {
throw new ISE("Unexpected null byte [%s]", nullByte);
}
@ -166,8 +175,8 @@ public class ComplexFieldReader implements FieldReader
@Override
public T getObject()
{
//noinspection unchecked
return (T) readFieldFromMemory(serde, memory, fieldPointer.position());
final long fieldPosition = fieldPointer.position();
return readFieldFromMemory(serde, memory, fieldPosition);
}
@Override
@ -183,4 +192,80 @@ public class ComplexFieldReader implements FieldReader
// Do nothing.
}
}
@Override
public Column makeRACColumn(Frame frame, RowSignature signature, String columnName)
{
return new ComplexFieldReaderColumn(frame, signature.indexOf(columnName), signature.size());
}
private class ComplexFieldReaderColumn implements Column
{
private final Frame frame;
private final Memory dataRegion;
private final ColumnType type;
private final FieldPositionHelper coach;
public ComplexFieldReaderColumn(Frame frame, int columnIndex, int numFields)
{
this.frame = frame;
dataRegion = frame.region(RowBasedFrameWriter.ROW_DATA_REGION);
this.type = ColumnType.ofComplex(serde.getTypeName());
this.coach = new FieldPositionHelper(
frame,
frame.region(RowBasedFrameWriter.ROW_OFFSET_REGION),
dataRegion,
columnIndex,
numFields
);
}
@Nonnull
@Override
public ColumnAccessor toAccessor()
{
return new ObjectColumnAccessorBase()
{
@Override
public ColumnType getType()
{
return type;
}
@Override
public int numRows()
{
return frame.numRows();
}
@Override
public boolean isNull(int rowNum)
{
final long fieldPosition = coach.computeFieldPosition(rowNum);
return dataRegion.getByte(fieldPosition) == ComplexFieldWriter.NULL_BYTE;
}
@Override
protected Object getVal(int rowNum)
{
return readFieldFromMemory(serde, dataRegion, coach.computeFieldPosition(rowNum));
}
@Override
protected Comparator<Object> getComparator()
{
return serde.getTypeStrategy();
}
};
}
@Nullable
@Override
public <T> T as(Class<? extends T> clazz)
{
return null;
}
}
}

View File

@ -20,7 +20,11 @@
package org.apache.druid.frame.field;
import org.apache.datasketches.memory.Memory;
import org.apache.druid.error.NotYetImplemented;
import org.apache.druid.frame.Frame;
import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.column.RowSignature;
import javax.annotation.Nullable;
@ -60,4 +64,10 @@ public class DoubleArrayFieldReader extends NumericArrayFieldReader
}
};
}
@Override
public Column makeRACColumn(Frame frame, RowSignature signature, String columnName)
{
throw NotYetImplemented.ex(null, "Class cannot create an RAC column.");
}
}

View File

@ -21,11 +21,20 @@ package org.apache.druid.frame.field;
import org.apache.datasketches.memory.Memory;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.write.RowBasedFrameWriter;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.query.rowsandcols.column.ColumnAccessor;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.DoubleColumnSelector;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.column.ValueType;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
/**
* Reads the values produced by {@link DoubleFieldWriter}
*
@ -99,4 +108,135 @@ public class DoubleFieldReader extends NumericFieldReader
return super._isNull();
}
}
@Override
public Column makeRACColumn(Frame frame, RowSignature signature, String columnName)
{
return new DoubleFieldReaderColumn(frame, signature.indexOf(columnName), signature.size());
}
private class DoubleFieldReaderColumn implements Column
{
private final Frame frame;
private final Memory dataRegion;
private final FieldPositionHelper coach;
public DoubleFieldReaderColumn(Frame frame, int columnIndex, int numFields)
{
this.frame = frame;
dataRegion = frame.region(RowBasedFrameWriter.ROW_DATA_REGION);
this.coach = new FieldPositionHelper(
frame,
frame.region(RowBasedFrameWriter.ROW_OFFSET_REGION),
dataRegion,
columnIndex,
numFields
);
}
@Nonnull
@Override
public ColumnAccessor toAccessor()
{
return new ColumnAccessor()
{
@Override
public ColumnType getType()
{
return ColumnType.DOUBLE;
}
@Override
public int numRows()
{
return frame.numRows();
}
@Override
public boolean isNull(int rowNum)
{
final long fieldPosition = coach.computeFieldPosition(rowNum);
return dataRegion.getByte(fieldPosition) == getNullIndicatorByte();
}
@Nullable
@Override
public Object getObject(int rowNum)
{
final long fieldPosition = coach.computeFieldPosition(rowNum);
if (dataRegion.getByte(fieldPosition) == getNullIndicatorByte()) {
return null;
} else {
return getDoubleAtPosition(fieldPosition);
}
}
@Override
public double getDouble(int rowNum)
{
final long fieldPosition = coach.computeFieldPosition(rowNum);
if (dataRegion.getByte(fieldPosition) == getNullIndicatorByte()) {
return 0L;
} else {
return getDoubleAtPosition(fieldPosition);
}
}
@Override
public float getFloat(int rowNum)
{
return (float) getDouble(rowNum);
}
@Override
public long getLong(int rowNum)
{
return (long) getDouble(rowNum);
}
@Override
public int getInt(int rowNum)
{
return (int) getDouble(rowNum);
}
@Override
public int compareRows(int lhsRowNum, int rhsRowNum)
{
long lhsPosition = coach.computeFieldPosition(lhsRowNum);
long rhsPosition = coach.computeFieldPosition(rhsRowNum);
final byte nullIndicatorByte = getNullIndicatorByte();
if (dataRegion.getByte(lhsPosition) == nullIndicatorByte) {
if (dataRegion.getByte(rhsPosition) == nullIndicatorByte) {
return 0;
} else {
return -1;
}
} else {
if (dataRegion.getByte(rhsPosition) == nullIndicatorByte) {
return 1;
} else {
return Double.compare(getDoubleAtPosition(lhsPosition), getDoubleAtPosition(rhsPosition));
}
}
}
private double getDoubleAtPosition(long lhsPosition)
{
return TransformUtils.detransformToDouble(dataRegion.getLong(lhsPosition + Byte.BYTES));
}
};
}
@Nullable
@Override
public <T> T as(Class<? extends T> clazz)
{
return null;
}
}
}

View File

@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.frame.field;
import org.apache.datasketches.memory.Memory;
import org.apache.druid.frame.Frame;
/**
* Helps compute the field position for a frame from the different regions in the frame.
*/
public class FieldPositionHelper
{
private final Frame frame;
private final Memory offsetRegion;
private final Memory dataRegion;
private final int columnIndex;
private final long fieldsBytesSize;
public FieldPositionHelper(
Frame frame,
Memory offsetRegion,
Memory dataRegion,
int columnIndex,
int numFields
)
{
this.frame = frame;
this.offsetRegion = offsetRegion;
this.dataRegion = dataRegion;
this.columnIndex = columnIndex;
this.fieldsBytesSize = this.columnIndex == 0
? ((long) numFields) * Integer.BYTES
: ((long) (this.columnIndex - 1)) * Integer.BYTES;
}
public long computeFieldPosition(int rowNum)
{
rowNum = frame.physicalRow(rowNum);
final long rowPosition = rowNum == 0 ? 0 : offsetRegion.getLong(((long) rowNum - 1) * Long.BYTES);
final long fieldPosition;
if (columnIndex == 0) {
fieldPosition = rowPosition + fieldsBytesSize;
} else {
fieldPosition = rowPosition + dataRegion.getInt(rowPosition + fieldsBytesSize);
}
return fieldPosition;
}
}

View File

@ -20,24 +20,32 @@
package org.apache.druid.frame.field;
import org.apache.datasketches.memory.Memory;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.key.RowKey;
import org.apache.druid.frame.key.RowKeyReader;
import org.apache.druid.query.extraction.ExtractionFn;
import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.DimensionSelector;
import org.apache.druid.segment.column.RowSignature;
import javax.annotation.Nullable;
/**
* Embeds the logic to read a specific field from row-based frames or from {@link RowKey}.
*
* <p>
* Most callers should use {@link org.apache.druid.frame.read.FrameReader} or
* {@link RowKeyReader} rather than using this interface directly.
*
* <p>
* Stateless and immutable.
*/
public interface FieldReader
{
/**
* Create a {@link Column} which provides accses to the rows in the frame, via the {@link Column#toAccessor()}.
*/
Column makeRACColumn(Frame frame, RowSignature signature, String columnName);
/**
* Create a {@link ColumnValueSelector} backed by some memory and a moveable pointer.
*/

View File

@ -20,7 +20,11 @@
package org.apache.druid.frame.field;
import org.apache.datasketches.memory.Memory;
import org.apache.druid.error.NotYetImplemented;
import org.apache.druid.frame.Frame;
import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.column.RowSignature;
import javax.annotation.Nullable;
@ -60,4 +64,10 @@ public class FloatArrayFieldReader extends NumericArrayFieldReader
}
};
}
@Override
public Column makeRACColumn(Frame frame, RowSignature signature, String columnName)
{
throw NotYetImplemented.ex(null, "Class cannot create an RAC column.");
}
}

View File

@ -21,11 +21,20 @@ package org.apache.druid.frame.field;
import org.apache.datasketches.memory.Memory;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.write.RowBasedFrameWriter;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.query.rowsandcols.column.ColumnAccessor;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.FloatColumnSelector;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.column.ValueType;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
/**
* Reads values written by {@link FloatFieldWriter}.
*
@ -99,4 +108,135 @@ public class FloatFieldReader extends NumericFieldReader
return super._isNull();
}
}
@Override
public Column makeRACColumn(Frame frame, RowSignature signature, String columnName)
{
return new FloatFieldReaderColumn(frame, signature.indexOf(columnName), signature.size());
}
private class FloatFieldReaderColumn implements Column
{
private final Frame frame;
private final Memory dataRegion;
private final FieldPositionHelper coach;
public FloatFieldReaderColumn(Frame frame, int columnIndex, int numFields)
{
this.frame = frame;
dataRegion = frame.region(RowBasedFrameWriter.ROW_DATA_REGION);
this.coach = new FieldPositionHelper(
frame,
frame.region(RowBasedFrameWriter.ROW_OFFSET_REGION),
dataRegion,
columnIndex,
numFields
);
}
@Nonnull
@Override
public ColumnAccessor toAccessor()
{
return new ColumnAccessor()
{
@Override
public ColumnType getType()
{
return ColumnType.FLOAT;
}
@Override
public int numRows()
{
return frame.numRows();
}
@Override
public boolean isNull(int rowNum)
{
final long fieldPosition = coach.computeFieldPosition(rowNum);
return dataRegion.getByte(fieldPosition) == getNullIndicatorByte();
}
@Nullable
@Override
public Object getObject(int rowNum)
{
final long fieldPosition = coach.computeFieldPosition(rowNum);
if (dataRegion.getByte(fieldPosition) == getNullIndicatorByte()) {
return null;
} else {
return getFloatAtPosition(fieldPosition);
}
}
@Override
public double getDouble(int rowNum)
{
return getFloat(rowNum);
}
@Override
public float getFloat(int rowNum)
{
final long fieldPosition = coach.computeFieldPosition(rowNum);
if (dataRegion.getByte(fieldPosition) == getNullIndicatorByte()) {
return 0L;
} else {
return getFloatAtPosition(fieldPosition);
}
}
@Override
public long getLong(int rowNum)
{
return (long) getFloat(rowNum);
}
@Override
public int getInt(int rowNum)
{
return (int) getFloat(rowNum);
}
@Override
public int compareRows(int lhsRowNum, int rhsRowNum)
{
long lhsPosition = coach.computeFieldPosition(lhsRowNum);
long rhsPosition = coach.computeFieldPosition(rhsRowNum);
final byte nullIndicatorByte = getNullIndicatorByte();
if (dataRegion.getByte(lhsPosition) == nullIndicatorByte) {
if (dataRegion.getByte(rhsPosition) == nullIndicatorByte) {
return 0;
} else {
return -1;
}
} else {
if (dataRegion.getByte(rhsPosition) == nullIndicatorByte) {
return 1;
} else {
return Float.compare(getFloatAtPosition(lhsPosition), getFloatAtPosition(rhsPosition));
}
}
}
private float getFloatAtPosition(long rhsPosition)
{
return TransformUtils.detransformToFloat(dataRegion.getInt(rhsPosition + Byte.BYTES));
}
};
}
@Nullable
@Override
public <T> T as(Class<? extends T> clazz)
{
return null;
}
}
}

View File

@ -20,7 +20,11 @@
package org.apache.druid.frame.field;
import org.apache.datasketches.memory.Memory;
import org.apache.druid.error.NotYetImplemented;
import org.apache.druid.frame.Frame;
import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.column.RowSignature;
import javax.annotation.Nullable;
@ -60,4 +64,10 @@ public class LongArrayFieldReader extends NumericArrayFieldReader
}
};
}
@Override
public Column makeRACColumn(Frame frame, RowSignature signature, String columnName)
{
throw NotYetImplemented.ex(null, "Class cannot create an RAC column.");
}
}

View File

@ -21,11 +21,20 @@ package org.apache.druid.frame.field;
import org.apache.datasketches.memory.Memory;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.write.RowBasedFrameWriter;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.query.rowsandcols.column.ColumnAccessor;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.LongColumnSelector;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.column.ValueType;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
/**
* Reads values written by {@link LongFieldWriter}.
*
@ -68,7 +77,6 @@ public class LongFieldReader extends NumericFieldReader
private static class LongFieldSelector extends NumericFieldReader.Selector implements LongColumnSelector
{
final Memory dataRegion;
final ReadableFieldPointer fieldPointer;
@ -99,4 +107,135 @@ public class LongFieldReader extends NumericFieldReader
return super._isNull();
}
}
@Override
public Column makeRACColumn(Frame frame, RowSignature signature, String columnName)
{
return new LongFieldReaderColumn(frame, signature.indexOf(columnName), signature.size());
}
private class LongFieldReaderColumn implements Column
{
private final Frame frame;
private final Memory dataRegion;
private final FieldPositionHelper coach;
public LongFieldReaderColumn(Frame frame, int columnIndex, int numFields)
{
this.frame = frame;
this.dataRegion = frame.region(RowBasedFrameWriter.ROW_DATA_REGION);
this.coach = new FieldPositionHelper(
frame,
frame.region(RowBasedFrameWriter.ROW_OFFSET_REGION),
dataRegion,
columnIndex,
numFields
);
}
@Nonnull
@Override
public ColumnAccessor toAccessor()
{
return new ColumnAccessor()
{
@Override
public ColumnType getType()
{
return ColumnType.LONG;
}
@Override
public int numRows()
{
return frame.numRows();
}
@Override
public boolean isNull(int rowNum)
{
final long fieldPosition = coach.computeFieldPosition(rowNum);
return dataRegion.getByte(fieldPosition) == getNullIndicatorByte();
}
@Nullable
@Override
public Object getObject(int rowNum)
{
final long fieldPosition = coach.computeFieldPosition(rowNum);
if (dataRegion.getByte(fieldPosition) == getNullIndicatorByte()) {
return null;
} else {
return getLongAtPosition(fieldPosition);
}
}
@Override
public double getDouble(int rowNum)
{
return getLong(rowNum);
}
@Override
public float getFloat(int rowNum)
{
return getLong(rowNum);
}
@Override
public long getLong(int rowNum)
{
final long fieldPosition = coach.computeFieldPosition(rowNum);
if (dataRegion.getByte(fieldPosition) == getNullIndicatorByte()) {
return 0L;
} else {
return getLongAtPosition(fieldPosition);
}
}
@Override
public int getInt(int rowNum)
{
return (int) getLong(rowNum);
}
@Override
public int compareRows(int lhsRowNum, int rhsRowNum)
{
long lhsPosition = coach.computeFieldPosition(lhsRowNum);
long rhsPosition = coach.computeFieldPosition(rhsRowNum);
final byte nullIndicatorByte = getNullIndicatorByte();
if (dataRegion.getByte(lhsPosition) == nullIndicatorByte) {
if (dataRegion.getByte(rhsPosition) == nullIndicatorByte) {
return 0;
} else {
return -1;
}
} else {
if (dataRegion.getByte(rhsPosition) == nullIndicatorByte) {
return 1;
} else {
return Long.compare(getLongAtPosition(lhsPosition), getLongAtPosition(rhsPosition));
}
}
}
private long getLongAtPosition(long rhsPosition)
{
return TransformUtils.detransformToLong(dataRegion.getLong(rhsPosition + Byte.BYTES));
}
};
}
@Nullable
@Override
public <T> T as(Class<? extends T> clazz)
{
return null;
}
}
}

View File

@ -29,7 +29,7 @@ import javax.annotation.Nullable;
/**
* Reader class for the fields written by {@link NumericArrayFieldWriter}. See the Javadoc for the writer for more
* information on the format
*
* <p>
* The numeric array fields are byte comparable
*/
public abstract class NumericArrayFieldReader implements FieldReader

View File

@ -50,6 +50,11 @@ public abstract class NumericFieldReader implements FieldReader
}
}
public byte getNullIndicatorByte()
{
return nullIndicatorByte;
}
@Override
public ColumnValueSelector<?> makeColumnValueSelector(Memory memory, ReadableFieldPointer fieldPointer)
{
@ -94,7 +99,7 @@ public abstract class NumericFieldReader implements FieldReader
/**
* Helper class which allows the inheritors to fetch the nullity of the field located at fieldPointer's position in
* the dataRegion.
*
* <p>
* The implementations of the column value selectors returned by the {@link #getColumnValueSelector} can inherit this
* class and call {@link #_isNull()} in their {@link ColumnValueSelector#isNull()} to offload the responsibility of
* detecting null elements to this Selector, instead of reworking the null handling

View File

@ -23,51 +23,64 @@ import com.google.common.primitives.Ints;
import it.unimi.dsi.fastutil.objects.ObjectArrays;
import org.apache.datasketches.memory.Memory;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.NotYetImplemented;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.read.FrameReaderUtils;
import org.apache.druid.frame.write.RowBasedFrameWriter;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.extraction.ExtractionFn;
import org.apache.druid.query.filter.DruidPredicateFactory;
import org.apache.druid.query.filter.ValueMatcher;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.query.rowsandcols.column.ColumnAccessor;
import org.apache.druid.query.rowsandcols.column.accessor.ObjectColumnAccessorBase;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.DimensionSelector;
import org.apache.druid.segment.DimensionSelectorUtils;
import org.apache.druid.segment.IdLookup;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.data.IndexedInts;
import org.apache.druid.segment.data.RangeIndexedInts;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
/**
* Reads fields written by {@link StringFieldWriter} or {@link StringArrayFieldWriter}.
*
* <p>
* Strings are written in UTF8 and terminated by {@link StringFieldWriter#VALUE_TERMINATOR}. Note that this byte
* appears in valid UTF8 encodings if and only if the string contains a NUL (char 0). Therefore, this field writer
* cannot write out strings containing NUL characters.
*
* <p>
* All rows are terminated by {@link StringFieldWriter#ROW_TERMINATOR}.
*
* <p>
* Empty rows are represented in one byte: solely that {@link StringFieldWriter#ROW_TERMINATOR}. Rows that are null
* themselves (i.e., a null array) are represented as a {@link StringFieldWriter#NULL_ROW} followed by a
* {@link StringFieldWriter#ROW_TERMINATOR}. This encoding for null arrays is decoded by older readers as an
* empty array; null arrays are a feature that did not exist in earlier versions of the code.
*
* <p>
* Null strings are stored as {@link StringFieldWriter#NULL_BYTE}. All other strings are prepended by
* {@link StringFieldWriter#NOT_NULL_BYTE} byte to differentiate them from nulls.
*
* <p>
* This encoding allows the encoded data to be compared as bytes in a way that matches the behavior of
* {@link org.apache.druid.segment.StringDimensionHandler#DIMENSION_SELECTOR_COMPARATOR}, except null and
* empty list are not considered equal.
*/
public class StringFieldReader implements FieldReader
{
public static final byte[] EXPECTED_BYTES_FOR_NULL = {
StringFieldWriter.NULL_BYTE, StringFieldWriter.VALUE_TERMINATOR, StringFieldWriter.ROW_TERMINATOR
};
private final boolean asArray;
public StringFieldReader()
@ -123,6 +136,16 @@ public class StringFieldReader implements FieldReader
}
}
@Override
public Column makeRACColumn(Frame frame, RowSignature signature, String columnName)
{
if (asArray) {
return new StringArrayFieldReaderColumn(frame, signature.indexOf(columnName), signature.size());
} else {
return new StringFieldReaderColumn(frame, signature.indexOf(columnName), signature.size());
}
}
/**
* Selector that reads a value from a location pointed to by {@link ReadableFieldPointer}.
*/
@ -297,70 +320,296 @@ public class StringFieldReader implements FieldReader
{
currentUtf8StringsIsNull = false;
currentUtf8Strings.clear();
long position = fieldPosition;
long limit = memory.getCapacity();
boolean rowTerminatorSeen = false;
while (position < limit && !rowTerminatorSeen) {
final byte kind = memory.getByte(position);
position++;
switch (kind) {
case StringFieldWriter.VALUE_TERMINATOR: // Or NULL_ROW (same byte value)
if (position == fieldPosition + 1) {
// It was NULL_ROW.
currentUtf8StringsIsNull = true;
}
// Skip; next byte will be a null/not-null byte or a row terminator.
break;
case StringFieldWriter.ROW_TERMINATOR:
// Skip; this is the end of the row, so we'll fall through to the return statement.
rowTerminatorSeen = true;
break;
case StringFieldWriter.NULL_BYTE:
currentUtf8Strings.add(null);
break;
case StringFieldWriter.NOT_NULL_BYTE:
for (long i = position; ; i++) {
if (i >= limit) {
throw new ISE("Value overrun");
}
final byte b = memory.getByte(i);
if (b == StringFieldWriter.VALUE_TERMINATOR) {
final int len = Ints.checkedCast(i - position);
if (len == 0 && NullHandling.replaceWithDefault()) {
// Empty strings and nulls are the same in this mode.
currentUtf8Strings.add(null);
} else {
final ByteBuffer buf = FrameReaderUtils.readByteBuffer(memory, position, len);
currentUtf8Strings.add(buf);
}
position += len;
break;
}
}
break;
default:
throw new ISE("Invalid value start byte [%s]", kind);
}
}
if (!rowTerminatorSeen) {
throw new ISE("Unexpected end of field");
}
currentUtf8StringsIsNull = addStringsToList(memory, fieldPosition, currentUtf8Strings);
}
}
private static class StringFieldReaderColumn implements Column
{
private final Frame frame;
private final Memory dataRegion;
private final FieldPositionHelper coach;
public StringFieldReaderColumn(Frame frame, int columnIndex, int numFields)
{
this.frame = frame;
this.dataRegion = frame.region(RowBasedFrameWriter.ROW_DATA_REGION);
this.coach = new FieldPositionHelper(
frame,
frame.region(RowBasedFrameWriter.ROW_OFFSET_REGION),
dataRegion,
columnIndex,
numFields
);
}
@Nonnull
@Override
public ColumnAccessor toAccessor()
{
return new ObjectColumnAccessorBase()
{
@Override
public ColumnType getType()
{
return ColumnType.STRING;
}
@Override
public int numRows()
{
return frame.numRows();
}
@Override
public boolean isNull(int rowNum)
{
final long fieldPosition = coach.computeFieldPosition(rowNum);
byte[] nullBytes = new byte[3];
dataRegion.getByteArray(fieldPosition, nullBytes, 0, 3);
return Arrays.equals(nullBytes, EXPECTED_BYTES_FOR_NULL);
}
@Override
public int compareRows(int lhsRowNum, int rhsRowNum)
{
ByteBuffer lhs = getUTF8BytesAtPosition(coach.computeFieldPosition(lhsRowNum));
ByteBuffer rhs = getUTF8BytesAtPosition(coach.computeFieldPosition(rhsRowNum));
if (lhs == null) {
if (rhs == null) {
return 0;
} else {
return -1;
}
} else {
if (rhs == null) {
return 1;
} else {
return lhs.compareTo(rhs);
}
}
}
@Override
protected Object getVal(int rowNum)
{
return getStringAtPosition(coach.computeFieldPosition(rowNum));
}
@Override
protected Comparator<Object> getComparator()
{
// we implement compareRows and thus don't need to actually implement this method
throw new UnsupportedOperationException();
}
@Nullable
private String getStringAtPosition(long fieldPosition)
{
return StringUtils.fromUtf8Nullable(getUTF8BytesAtPosition(fieldPosition));
}
@Nullable
private ByteBuffer getUTF8BytesAtPosition(long fieldPosition)
{
ArrayList<ByteBuffer> buffers = new ArrayList<>();
final boolean isNull = addStringsToList(dataRegion, fieldPosition, buffers);
if (isNull) {
return null;
} else {
if (buffers.size() > 1) {
throw DruidException.defensive(
"Can only work with single-valued strings, should use a COMPLEX or ARRAY typed Column instead"
);
}
return buffers.get(0);
}
}
};
}
@Nullable
@Override
public <T> T as(Class<? extends T> clazz)
{
return null;
}
}
private static class StringArrayFieldReaderColumn implements Column
{
private final Frame frame;
private final Memory dataRegion;
private final FieldPositionHelper coach;
public StringArrayFieldReaderColumn(Frame frame, int columnIndex, int numFields)
{
this.frame = frame;
this.dataRegion = frame.region(RowBasedFrameWriter.ROW_DATA_REGION);
this.coach = new FieldPositionHelper(
frame,
frame.region(RowBasedFrameWriter.ROW_OFFSET_REGION),
this.dataRegion,
columnIndex,
numFields
);
}
@Nonnull
@Override
public ColumnAccessor toAccessor()
{
return new ObjectColumnAccessorBase()
{
@Override
public ColumnType getType()
{
return ColumnType.STRING_ARRAY;
}
@Override
public int numRows()
{
return frame.numRows();
}
@Override
public boolean isNull(int rowNum)
{
final long fieldPosition = coach.computeFieldPosition(rowNum);
byte[] nullBytes = new byte[3];
dataRegion.getByteArray(fieldPosition, nullBytes, 0, 3);
return Arrays.equals(nullBytes, EXPECTED_BYTES_FOR_NULL);
}
@Override
public int compareRows(int lhsRowNum, int rhsRowNum)
{
throw NotYetImplemented.ex(
null,
"Should implement this by comparing the actual bytes in the frame, they should be comparable"
);
}
@Override
protected Object getVal(int rowNum)
{
return getStringsAtPosition(coach.computeFieldPosition(rowNum));
}
@Override
protected Comparator<Object> getComparator()
{
// we implement compareRows and thus don't need to actually implement this method
throw new UnsupportedOperationException();
}
@Nullable
private List<String> getStringsAtPosition(long fieldPosition)
{
final List<ByteBuffer> bufs = getUTF8BytesAtPosition(fieldPosition);
if (bufs == null) {
return null;
}
final ArrayList<String> retVal = new ArrayList<>(bufs.size());
for (ByteBuffer buf : bufs) {
retVal.add(StringUtils.fromUtf8Nullable(buf));
}
return retVal;
}
@Nullable
private List<ByteBuffer> getUTF8BytesAtPosition(long fieldPosition)
{
ArrayList<ByteBuffer> buffers = new ArrayList<>();
final boolean isNull = addStringsToList(dataRegion, fieldPosition, buffers);
if (isNull) {
return null;
} else {
return buffers;
}
}
};
}
@Nullable
@Override
public <T> T as(Class<? extends T> clazz)
{
return null;
}
}
private static boolean addStringsToList(Memory memory, long fieldPosition, List<ByteBuffer> list)
{
long position = fieldPosition;
long limit = memory.getCapacity();
boolean rowTerminatorSeen = false;
boolean isEffectivelyNull = false;
while (position < limit && !rowTerminatorSeen) {
final byte kind = memory.getByte(position);
position++;
switch (kind) {
case StringFieldWriter.VALUE_TERMINATOR: // Or NULL_ROW (same byte value)
if (position == fieldPosition + 1) {
// It was NULL_ROW.
isEffectivelyNull = true;
}
// Skip; next byte will be a null/not-null byte or a row terminator.
break;
case StringFieldWriter.ROW_TERMINATOR:
// Skip; this is the end of the row, so we'll fall through to the return statement.
rowTerminatorSeen = true;
break;
case StringFieldWriter.NULL_BYTE:
list.add(null);
break;
case StringFieldWriter.NOT_NULL_BYTE:
for (long i = position; ; i++) {
if (i >= limit) {
throw new ISE("Value overrun");
}
final byte b = memory.getByte(i);
if (b == StringFieldWriter.VALUE_TERMINATOR) {
final int len = Ints.checkedCast(i - position);
if (len == 0 && NullHandling.replaceWithDefault()) {
// Empty strings and nulls are the same in this mode.
list.add(null);
} else {
final ByteBuffer buf = FrameReaderUtils.readByteBuffer(memory, position, len);
list.add(buf);
}
position += len;
break;
}
}
break;
default:
throw new ISE("Invalid value start byte [%s]", kind);
}
}
if (!rowTerminatorSeen) {
throw new ISE("Unexpected end of field");
}
return isEffectivelyNull;
}
}

View File

@ -19,12 +19,13 @@
package org.apache.druid.query.rowsandcols.concrete;
import org.apache.druid.error.DruidException;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.FrameType;
import org.apache.druid.frame.field.FieldReader;
import org.apache.druid.frame.field.FieldReaders;
import org.apache.druid.frame.read.FrameReader;
import org.apache.druid.frame.read.columnar.FrameColumnReaders;
import org.apache.druid.frame.segment.FrameStorageAdapter;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.column.Column;
@ -65,6 +66,7 @@ public class RowBasedFrameRowsAndColumns implements RowsAndColumns, AutoCloseabl
@Override
public Column findColumn(String name)
{
// Use contains so that we can negative cache.
if (!colCache.containsKey(name)) {
final int columnIndex = signature.indexOf(name);
if (columnIndex < 0) {
@ -72,9 +74,16 @@ public class RowBasedFrameRowsAndColumns implements RowsAndColumns, AutoCloseabl
} else {
final ColumnType columnType = signature
.getColumnType(columnIndex)
.orElseThrow(() -> new ISE("just got the id, why is columnType not there?"));
.orElseThrow(
() -> DruidException.defensive(
"just got the id [%s][%s], why is columnType not there?",
columnIndex,
name
)
);
colCache.put(name, FrameColumnReaders.create(name, columnIndex, columnType).readRACColumn(frame));
final FieldReader reader = FieldReaders.create(name, columnType);
colCache.put(name, reader.makeRACColumn(frame, signature, name));
}
}
return colCache.get(name);

View File

@ -21,14 +21,20 @@ package org.apache.druid.frame.field;
import org.apache.datasketches.memory.WritableMemory;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.frame.key.KeyOrder;
import org.apache.druid.frame.write.FrameWriterTestData;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.extraction.SubstringDimExtractionFn;
import org.apache.druid.query.filter.DruidObjectPredicate;
import org.apache.druid.query.filter.StringPredicateDruidPredicateFactory;
import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
import org.apache.druid.query.rowsandcols.column.ColumnAccessor;
import org.apache.druid.query.rowsandcols.concrete.RowBasedFrameRowsAndColumnsTest;
import org.apache.druid.segment.BaseDoubleColumnValueSelector;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.DimensionDictionarySelector;
import org.apache.druid.segment.DimensionSelector;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.data.IndexedInts;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.junit.After;
@ -42,6 +48,9 @@ import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;
import org.mockito.quality.Strictness;
import java.util.List;
import java.util.Objects;
public class DoubleFieldReaderTest extends InitializedNullHandlingTest
{
private static final long MEMORY_POSITION = 1;
@ -143,6 +152,27 @@ public class DoubleFieldReaderTest extends InitializedNullHandlingTest
}
}
@Test
public void testCompareRows()
{
final List<Double> rows = FrameWriterTestData.TEST_DOUBLES.getData(KeyOrder.ASCENDING);
final ColumnAccessor accessor =
RowBasedFrameRowsAndColumnsTest.MAKER.apply(
MapOfColumnsRowsAndColumns.builder()
.add("dim1", rows.toArray(), ColumnType.DOUBLE)
.build()
).findColumn("dim1").toAccessor();
for (int i = 1; i < rows.size(); i++) {
if (Objects.equals(accessor.getObject(i - 1), accessor.getObject(i))) {
Assert.assertEquals(0, accessor.compareRows(i - 1, i));
} else {
Assert.assertTrue(accessor.compareRows(i - 1, i) < 0);
}
}
}
@Test
public void test_makeDimensionSelector_aValue()
{

View File

@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.frame.field;
import org.apache.druid.error.DruidExceptionMatcher;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.FrameType;
import org.apache.druid.frame.testutil.FrameTestUtil;
import org.apache.druid.query.rowsandcols.column.ColumnAccessor;
import org.apache.druid.query.rowsandcols.concrete.RowBasedFrameRowsAndColumns;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexStorageAdapter;
import org.apache.druid.segment.SimpleAscendingOffset;
import org.apache.druid.segment.TestIndex;
import org.apache.druid.segment.column.BaseColumn;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
public class FieldReaderRACTest extends InitializedNullHandlingTest
{
final DruidExceptionMatcher noArraysMatcher = DruidExceptionMatcher
.defensive()
.expectMessageIs("Can only work with single-valued strings, should use a COMPLEX or ARRAY typed Column instead");
@Test
public void testDataSet() throws IOException
{
final QueryableIndex index = TestIndex.getMMappedTestIndex();
final QueryableIndexStorageAdapter storageAdapter = new QueryableIndexStorageAdapter(index);
final Frame frame = FrameTestUtil.adapterToFrame(storageAdapter, FrameType.ROW_BASED);
final RowSignature siggy = storageAdapter.getRowSignature();
final RowBasedFrameRowsAndColumns rowBasedRAC = new RowBasedFrameRowsAndColumns(frame, siggy);
for (String columnName : siggy.getColumnNames()) {
final ColumnHolder colHolder = index.getColumnHolder(columnName);
final boolean multiValue = colHolder.getCapabilities().hasMultipleValues().isTrue();
try (BaseColumn col = colHolder.getColumn()) {
final ColumnAccessor racCol = rowBasedRAC.findColumn(columnName).toAccessor();
final SimpleAscendingOffset offset = new SimpleAscendingOffset(racCol.numRows());
final ColumnValueSelector<?> selector = col.makeColumnValueSelector(offset);
while (offset.withinBounds()) {
if (multiValue) {
noArraysMatcher.assertThrowsAndMatches(() -> racCol.getObject(offset.getOffset()));
} else {
final Object racObj = racCol.getObject(offset.getOffset());
Assert.assertEquals(racCol.isNull(offset.getOffset()), racCol.getObject(offset.getOffset()) == null);
Assert.assertEquals(selector.getObject(), racObj);
}
offset.increment();
}
}
}
}
}

View File

@ -21,14 +21,20 @@ package org.apache.druid.frame.field;
import org.apache.datasketches.memory.WritableMemory;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.frame.key.KeyOrder;
import org.apache.druid.frame.write.FrameWriterTestData;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.extraction.SubstringDimExtractionFn;
import org.apache.druid.query.filter.DruidObjectPredicate;
import org.apache.druid.query.filter.StringPredicateDruidPredicateFactory;
import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
import org.apache.druid.query.rowsandcols.column.ColumnAccessor;
import org.apache.druid.query.rowsandcols.concrete.RowBasedFrameRowsAndColumnsTest;
import org.apache.druid.segment.BaseFloatColumnValueSelector;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.DimensionDictionarySelector;
import org.apache.druid.segment.DimensionSelector;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.data.IndexedInts;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.junit.After;
@ -42,6 +48,9 @@ import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;
import org.mockito.quality.Strictness;
import java.util.List;
import java.util.Objects;
public class FloatFieldReaderTest extends InitializedNullHandlingTest
{
private static final long MEMORY_POSITION = 1;
@ -75,6 +84,27 @@ public class FloatFieldReaderTest extends InitializedNullHandlingTest
Assert.assertEquals(NullHandling.sqlCompatible(), FloatFieldReader.forPrimitive().isNull(memory, MEMORY_POSITION));
}
@Test
public void testCompareRows()
{
final List<Float> rows = FrameWriterTestData.TEST_FLOATS.getData(KeyOrder.ASCENDING);
final ColumnAccessor accessor =
RowBasedFrameRowsAndColumnsTest.MAKER.apply(
MapOfColumnsRowsAndColumns.builder()
.add("dim1", rows.toArray(), ColumnType.FLOAT)
.build()
).findColumn("dim1").toAccessor();
for (int i = 1; i < rows.size(); i++) {
if (Objects.equals(accessor.getObject(i - 1), accessor.getObject(i))) {
Assert.assertEquals(0, accessor.compareRows(i - 1, i));
} else {
Assert.assertTrue(accessor.compareRows(i - 1, i) < 0);
}
}
}
@Test
public void test_isNull_aValue()
{

View File

@ -21,14 +21,20 @@ package org.apache.druid.frame.field;
import org.apache.datasketches.memory.WritableMemory;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.frame.key.KeyOrder;
import org.apache.druid.frame.write.FrameWriterTestData;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.extraction.SubstringDimExtractionFn;
import org.apache.druid.query.filter.DruidObjectPredicate;
import org.apache.druid.query.filter.StringPredicateDruidPredicateFactory;
import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
import org.apache.druid.query.rowsandcols.column.ColumnAccessor;
import org.apache.druid.query.rowsandcols.concrete.RowBasedFrameRowsAndColumnsTest;
import org.apache.druid.segment.BaseLongColumnValueSelector;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.DimensionDictionarySelector;
import org.apache.druid.segment.DimensionSelector;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.data.IndexedInts;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.junit.After;
@ -42,6 +48,9 @@ import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;
import org.mockito.quality.Strictness;
import java.util.List;
import java.util.Objects;
public class LongFieldReaderTest extends InitializedNullHandlingTest
{
private static final long MEMORY_POSITION = 1;
@ -202,6 +211,28 @@ public class LongFieldReaderTest extends InitializedNullHandlingTest
Assert.assertFalse(readSelector.makeValueMatcher(StringPredicateDruidPredicateFactory.equalTo("2")).matches(false));
}
@Test
public void testCompareRows()
{
final List<Long> rows = FrameWriterTestData.TEST_LONGS.getData(KeyOrder.ASCENDING);
final ColumnAccessor accessor =
RowBasedFrameRowsAndColumnsTest.MAKER.apply(
MapOfColumnsRowsAndColumns.builder()
.add("dim1", rows.toArray(), ColumnType.LONG)
.build()
).findColumn("dim1").toAccessor();
for (int i = 1; i < rows.size(); i++) {
if (Objects.equals(accessor.getObject(i - 1), accessor.getObject(i))) {
Assert.assertEquals(0, accessor.compareRows(i - 1, i));
} else {
Assert.assertTrue(accessor.compareRows(i - 1, i) < 0);
}
}
}
private void writeToMemory(final Long value)
{
Mockito.when(writeSelector.isNull()).thenReturn(value == null);

View File

@ -24,6 +24,8 @@ import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.rowsandcols.concrete.ColumnBasedFrameRowsAndColumns;
import org.apache.druid.query.rowsandcols.concrete.ColumnBasedFrameRowsAndColumnsTest;
import org.apache.druid.query.rowsandcols.concrete.RowBasedFrameRowsAndColumns;
import org.apache.druid.query.rowsandcols.concrete.RowBasedFrameRowsAndColumnsTest;
import org.junit.Assert;
import org.junit.Test;
@ -67,7 +69,8 @@ public abstract class RowsAndColumnsTestBase
new Object[]{ConcatRowsAndColumns.class, ConcatRowsAndColumnsTest.MAKER},
new Object[]{RearrangedRowsAndColumns.class, RearrangedRowsAndColumnsTest.MAKER},
new Object[]{ColumnBasedFrameRowsAndColumns.class, ColumnBasedFrameRowsAndColumnsTest.MAKER},
new Object[]{StorageAdapterRowsAndColumns.class, StorageAdapterRowsAndColumnsTest.MAKER}
new Object[]{StorageAdapterRowsAndColumns.class, StorageAdapterRowsAndColumnsTest.MAKER},
new Object[]{RowBasedFrameRowsAndColumns.class, RowBasedFrameRowsAndColumnsTest.MAKER}
);
}

View File

@ -33,17 +33,13 @@ public class ColumnBasedFrameRowsAndColumnsTest extends RowsAndColumnsTestBase
super(ColumnBasedFrameRowsAndColumns.class);
}
public static Function<MapOfColumnsRowsAndColumns, ColumnBasedFrameRowsAndColumns> MAKER = input -> {
return buildFrame(input);
};
public static Function<MapOfColumnsRowsAndColumns, ColumnBasedFrameRowsAndColumns> MAKER = ColumnBasedFrameRowsAndColumnsTest::buildFrame;
public static ColumnBasedFrameRowsAndColumns buildFrame(MapOfColumnsRowsAndColumns input)
{
LazilyDecoratedRowsAndColumns rac = new LazilyDecoratedRowsAndColumns(input, null, null, null, OffsetLimit.limit(Integer.MAX_VALUE), null, null);
rac.numRows(); // materialize
return (ColumnBasedFrameRowsAndColumns) rac.getBase();
}
}

View File

@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.rowsandcols.concrete;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.allocation.ArenaMemoryAllocatorFactory;
import org.apache.druid.frame.write.FrameWriter;
import org.apache.druid.frame.write.FrameWriters;
import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
import org.apache.druid.query.rowsandcols.RowsAndColumnsTestBase;
import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.query.rowsandcols.semantic.ColumnSelectorFactoryMaker;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.column.RowSignature;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
public class RowBasedFrameRowsAndColumnsTest extends RowsAndColumnsTestBase
{
public RowBasedFrameRowsAndColumnsTest()
{
super(RowBasedFrameRowsAndColumns.class);
}
public static Function<MapOfColumnsRowsAndColumns, RowBasedFrameRowsAndColumns> MAKER = RowBasedFrameRowsAndColumnsTest::buildFrame;
private static RowBasedFrameRowsAndColumns buildFrame(MapOfColumnsRowsAndColumns rac)
{
final AtomicInteger rowId = new AtomicInteger(0);
final int numRows = rac.numRows();
final ColumnSelectorFactoryMaker csfm = ColumnSelectorFactoryMaker.fromRAC(rac);
final ColumnSelectorFactory selectorFactory = csfm.make(rowId);
final RowSignature.Builder sigBob = RowSignature.builder();
final ArenaMemoryAllocatorFactory memFactory = new ArenaMemoryAllocatorFactory(200 << 20);
for (String column : rac.getColumnNames()) {
final Column racColumn = rac.findColumn(column);
if (racColumn == null) {
continue;
}
sigBob.add(column, racColumn.toAccessor().getType());
}
final RowSignature signature = sigBob.build();
final FrameWriter frameWriter = FrameWriters.makeRowBasedFrameWriterFactory(
memFactory,
signature,
Collections.emptyList(),
false
).newFrameWriter(selectorFactory);
rowId.set(0);
for (; rowId.get() < numRows; rowId.incrementAndGet()) {
frameWriter.addSelection();
}
return new RowBasedFrameRowsAndColumns(Frame.wrap(frameWriter.toByteArray()), signature);
}
}