Numeric array support for columnar frames (#15917)

Columnar frames used in subquery materialization and window functions now support numeric arrays.
This commit is contained in:
Laksh Singla 2024-02-21 11:32:33 +05:30 committed by GitHub
parent 2c0d1128f8
commit a1b2c7326e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 1120 additions and 62 deletions

View File

@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.frame.read.columnar;
import com.google.common.math.LongMath;
import org.apache.datasketches.memory.Memory;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.write.columnar.FrameColumnWriters;
import org.apache.druid.segment.column.ColumnType;
/**
* Reader for columns written by {@link org.apache.druid.frame.write.columnar.DoubleArrayFrameColumnWriter}
*
* @see NumericArrayFrameColumnReader
* @see org.apache.druid.frame.write.columnar.NumericArrayFrameColumnWriter for column's layout in memory
*/
public class DoubleArrayFrameColumnReader extends NumericArrayFrameColumnReader
{
public DoubleArrayFrameColumnReader(int columnNumber)
{
super(FrameColumnWriters.TYPE_DOUBLE_ARRAY, ColumnType.DOUBLE_ARRAY, columnNumber);
}
@Override
NumericArrayFrameColumn column(Frame frame, Memory memory, ColumnType columnType)
{
return new DoubleArrayFrameColumn(frame, memory, columnType);
}
private static class DoubleArrayFrameColumn extends NumericArrayFrameColumn
{
public DoubleArrayFrameColumn(
Frame frame,
Memory memory,
ColumnType columnType
)
{
super(frame, memory, columnType);
}
@Override
Number getElement(Memory memory, long rowDataOffset, int cumulativeIndex)
{
return memory.getDouble(LongMath.checkedAdd(rowDataOffset, (long) cumulativeIndex * Double.BYTES));
}
}
}

View File

@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.frame.read.columnar;
import com.google.common.math.LongMath;
import org.apache.datasketches.memory.Memory;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.write.columnar.FrameColumnWriters;
import org.apache.druid.segment.column.ColumnType;
/**
* Reader for columns written by {@link org.apache.druid.frame.write.columnar.FloatArrayFrameColumnWriter}
*
* @see NumericArrayFrameColumnReader
* @see org.apache.druid.frame.write.columnar.NumericArrayFrameColumnWriter for column's layout in memory
*/
public class FloatArrayFrameColumnReader extends NumericArrayFrameColumnReader
{
public FloatArrayFrameColumnReader(int columnNumber)
{
super(FrameColumnWriters.TYPE_FLOAT_ARRAY, ColumnType.FLOAT_ARRAY, columnNumber);
}
@Override
NumericArrayFrameColumn column(Frame frame, Memory memory, ColumnType columnType)
{
return new FloatArrayFrameColumn(frame, memory, columnType);
}
private static class FloatArrayFrameColumn extends NumericArrayFrameColumn
{
public FloatArrayFrameColumn(
Frame frame,
Memory memory,
ColumnType columnType
)
{
super(frame, memory, columnType);
}
@Override
Number getElement(Memory memory, long rowDataOffset, int cumulativeIndex)
{
return memory.getFloat(LongMath.checkedAdd(rowDataOffset, (long) cumulativeIndex * Float.BYTES));
}
}
}

View File

@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.frame.read.columnar;
import org.apache.datasketches.memory.Memory;
public class FrameColumnReaderUtils
{
/**
* Adjusts a negative cumulative row length from {@link #getCumulativeRowLength(Memory, long, int)} to be the actual
* positive length.
*/
public static int adjustCumulativeRowLength(final int cumulativeRowLength)
{
if (cumulativeRowLength < 0) {
return -(cumulativeRowLength + 1);
} else {
return cumulativeRowLength;
}
}
/**
* Returns cumulative row length, if the row is not null itself, or -(cumulative row length) - 1 if the row is
* null itself.
*
* To check if the return value from this function indicate a null row, use {@link #isNullRow(int)}
*
* To get the actual cumulative row length, use {@link FrameColumnReaderUtils#adjustCumulativeRowLength(int)}.
*/
public static int getCumulativeRowLength(final Memory memory, final long offset, final int physicalRow)
{
// Note: only valid to call this if multiValue = true.
return memory.getInt(offset + (long) Integer.BYTES * physicalRow);
}
public static int getAdjustedCumulativeRowLength(final Memory memory, final long offset, final int physicalRow)
{
return adjustCumulativeRowLength(getCumulativeRowLength(memory, offset, physicalRow));
}
/**
* When given a return value from {@link FrameColumnReaderUtils#getCumulativeRowLength(Memory, long, int)}, returns whether the row is
* null itself (i.e. a null array).
*/
public static boolean isNullRow(final int cumulativeRowLength)
{
return cumulativeRowLength < 0;
}
}

View File

@ -20,7 +20,6 @@
package org.apache.druid.frame.read.columnar;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.ValueType;
/**
* Creates {@link FrameColumnReader} corresponding to a given column type and number.
@ -58,12 +57,18 @@ public class FrameColumnReaders
return new ComplexFrameColumnReader(columnNumber);
case ARRAY:
if (columnType.getElementType().getType() == ValueType.STRING) {
return new StringFrameColumnReader(columnNumber, true);
} else {
return new UnsupportedColumnTypeFrameColumnReader(columnName, columnType);
switch (columnType.getElementType().getType()) {
case STRING:
return new StringFrameColumnReader(columnNumber, true);
case LONG:
return new LongArrayFrameColumnReader(columnNumber);
case FLOAT:
return new FloatArrayFrameColumnReader(columnNumber);
case DOUBLE:
return new DoubleArrayFrameColumnReader(columnNumber);
default:
return new UnsupportedColumnTypeFrameColumnReader(columnName, columnType);
}
default:
return new UnsupportedColumnTypeFrameColumnReader(columnName, columnType);
}

View File

@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.frame.read.columnar;
import com.google.common.math.LongMath;
import org.apache.datasketches.memory.Memory;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.write.columnar.FrameColumnWriters;
import org.apache.druid.segment.column.ColumnType;
/**
* Reader for columns written by {@link org.apache.druid.frame.write.columnar.LongArrayFrameColumnWriter}
*
* @see NumericArrayFrameColumnReader
* @see org.apache.druid.frame.write.columnar.NumericArrayFrameColumnWriter for column's layout in memory
*/
public class LongArrayFrameColumnReader extends NumericArrayFrameColumnReader
{
public LongArrayFrameColumnReader(int columnNumber)
{
super(FrameColumnWriters.TYPE_LONG_ARRAY, ColumnType.LONG_ARRAY, columnNumber);
}
@Override
NumericArrayFrameColumn column(Frame frame, Memory memory, ColumnType columnType)
{
return new LongArrayFrameColumn(frame, memory, columnType);
}
private static class LongArrayFrameColumn extends NumericArrayFrameColumn
{
public LongArrayFrameColumn(
Frame frame,
Memory memory,
ColumnType columnType
)
{
super(frame, memory, columnType);
}
@Override
Number getElement(Memory memory, long rowDataOffset, int cumulativeIndex)
{
return memory.getLong(LongMath.checkedAdd(rowDataOffset, (long) cumulativeIndex * Long.BYTES));
}
}
}

View File

@ -0,0 +1,360 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.frame.read.columnar;
import com.google.common.math.LongMath;
import it.unimi.dsi.fastutil.objects.ObjectArrays;
import org.apache.datasketches.memory.Memory;
import org.apache.druid.error.DruidException;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.write.columnar.NumericArrayFrameColumnWriter;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.query.rowsandcols.column.ColumnAccessorBasedColumn;
import org.apache.druid.query.rowsandcols.column.accessor.ObjectColumnAccessorBase;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.ObjectColumnSelector;
import org.apache.druid.segment.column.BaseColumn;
import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.data.ReadableOffset;
import org.apache.druid.segment.vector.ReadableVectorInspector;
import org.apache.druid.segment.vector.ReadableVectorOffset;
import org.apache.druid.segment.vector.VectorObjectSelector;
import javax.annotation.Nullable;
import java.util.Comparator;
/**
* Implementations of this class reads columns written by the corresponding implementations of {@link NumericArrayFrameColumnWriter}.
*
* @see NumericArrayFrameColumnWriter for the column format read
*/
public abstract class NumericArrayFrameColumnReader implements FrameColumnReader
{
private final byte typeCode;
private final ColumnType columnType;
private final int columnNumber;
public NumericArrayFrameColumnReader(byte typeCode, ColumnType columnType, int columnNumber)
{
this.typeCode = typeCode;
this.columnType = columnType;
this.columnNumber = columnNumber;
}
@Override
public Column readRACColumn(Frame frame)
{
final Memory memory = frame.region(columnNumber);
validate(memory);
return new ColumnAccessorBasedColumn(column(frame, memory, columnType));
}
@Override
public ColumnPlus readColumn(Frame frame)
{
final Memory memory = frame.region(columnNumber);
validate(memory);
return new ColumnPlus(
column(frame, memory, columnType),
ColumnCapabilitiesImpl.createSimpleArrayColumnCapabilities(columnType),
frame.numRows()
);
}
abstract NumericArrayFrameColumn column(Frame frame, Memory memory, ColumnType columnType);
/**
* Validates that the written type code is the same as the provided type code. It's a defensive check that prevents
* unexpected results by reading columns of different types
*/
private void validate(final Memory region)
{
if (region.getCapacity() < NumericArrayFrameColumnWriter.DATA_OFFSET) {
throw DruidException.defensive("Column[%s] is not big enough for a header", columnNumber);
}
final byte actualTypeCode = region.getByte(0);
if (actualTypeCode != this.typeCode) {
throw DruidException.defensive(
"Column[%s] does not have the correct type code; expected[%s], got[%s]",
columnNumber,
this.typeCode,
actualTypeCode
);
}
}
/**
* Gets the start of the section where cumulative lengths of the array elements are stored (section 1)
*/
private static long getStartOfCumulativeLengthSection()
{
return NumericArrayFrameColumnWriter.DATA_OFFSET;
}
/**
* Gets the start of the section where information about element's nullity is stored (section 2)
*/
private static long getStartOfRowNullityData(final int numRows)
{
return getStartOfCumulativeLengthSection() + ((long) numRows * Integer.BYTES);
}
/**
* Gets the start of the section where elements are stored (section 3)
*/
private static long getStartOfRowData(final Memory memory, final int numRows)
{
long nullityDataOffset =
(long) Byte.BYTES * FrameColumnReaderUtils.getAdjustedCumulativeRowLength(
memory,
getStartOfCumulativeLengthSection(),
numRows - 1
);
return LongMath.checkedAdd(getStartOfRowNullityData(numRows), nullityDataOffset);
}
public abstract static class NumericArrayFrameColumn extends ObjectColumnAccessorBase implements BaseColumn
{
private final Frame frame;
private final Memory memory;
private final ColumnType columnType;
/**
* Cache start of rowNullityDataOffset, as it won't change
*/
private final long rowNullityDataOffset;
/**
* Cache start of rowDataOffset, as it won't change
*/
private final long rowDataOffset;
public NumericArrayFrameColumn(Frame frame, Memory memory, ColumnType columnType)
{
this.frame = frame;
this.memory = memory;
this.columnType = columnType;
this.rowNullityDataOffset = getStartOfRowNullityData(frame.numRows());
this.rowDataOffset = getStartOfRowData(memory, frame.numRows());
}
@Override
public ColumnType getType()
{
return columnType;
}
@Override
public int numRows()
{
return frame.numRows();
}
@Override
protected Object getVal(int rowNum)
{
return getNumericArray(physicalRow(rowNum));
}
@Override
protected Comparator<Object> getComparator()
{
return columnType.getNullableStrategy();
}
@Override
public ColumnValueSelector<?> makeColumnValueSelector(ReadableOffset offset)
{
// Cache's the row's value before returning
return new ObjectColumnSelector<Object>()
{
// Cached row number
private int cachedLogicalRow = -1;
// Cached value
@Nullable
private Object[] cachedValue = null;
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
}
@Nullable
@Override
public Object getObject()
{
compute();
return cachedValue;
}
@Override
public Class<?> classOfObject()
{
return Object[].class;
}
/**
* Cache's the row value and the logical row number into the class variables
*/
private void compute()
{
int currentLogicalRow = offset.getOffset();
if (cachedLogicalRow == currentLogicalRow) {
return;
}
cachedValue = getNumericArray(physicalRow(currentLogicalRow));
cachedLogicalRow = currentLogicalRow;
}
};
}
@Override
public VectorObjectSelector makeVectorObjectSelector(ReadableVectorOffset offset)
{
return new VectorObjectSelector()
{
private final Object[] vector = new Object[offset.getMaxVectorSize()];
private int id = ReadableVectorInspector.NULL_ID;
@Override
public Object[] getObjectVector()
{
computeVector();
return vector;
}
@Override
public int getMaxVectorSize()
{
return offset.getMaxVectorSize();
}
@Override
public int getCurrentVectorSize()
{
return offset.getCurrentVectorSize();
}
private void computeVector()
{
if (id == offset.getId()) {
return;
}
if (offset.isContiguous()) {
// Contiguous offsets can have a cache optimized implementation if 'frame.isPermuted() == false',
// i.e. logicalRow == physicalRow. The implementation can separately fetch out the nullity data, and the
// element data continguously.
final int start = offset.getStartOffset();
for (int i = 0; i < offset.getCurrentVectorSize(); ++i) {
vector[i] = getNumericArray(physicalRow(start + i));
}
} else {
final int[] offsets = offset.getOffsets();
for (int i = 0; i < offset.getCurrentVectorSize(); ++i) {
vector[i] = getNumericArray(physicalRow(offsets[i]));
}
id = offset.getId();
}
}
};
}
@Override
public void close()
{
// Do nothing
}
private int physicalRow(int logicalRow)
{
return frame.physicalRow(logicalRow);
}
/**
* Given the physical row, it fetches the value from the memory
*/
@Nullable
private Object[] getNumericArray(final int physicalRow)
{
final int cumulativeLength = FrameColumnReaderUtils.getCumulativeRowLength(
memory,
getStartOfCumulativeLengthSection(),
physicalRow
);
final int rowLength;
if (FrameColumnReaderUtils.isNullRow(cumulativeLength)) {
return null;
} else if (physicalRow == 0) {
rowLength = cumulativeLength;
} else {
final int previousCumulativeLength = FrameColumnReaderUtils.adjustCumulativeRowLength(
FrameColumnReaderUtils.getCumulativeRowLength(
memory,
getStartOfCumulativeLengthSection(),
physicalRow - 1
)
);
// cumulativeLength doesn't need to be adjusted, since its greater than 0 or else it would have been a null row,
// which we check for in the first if..else
rowLength = cumulativeLength - previousCumulativeLength;
}
if (rowLength == 0) {
return ObjectArrays.EMPTY_ARRAY;
}
final Object[] row = new Object[rowLength];
for (int i = 0; i < rowLength; ++i) {
final int cumulativeIndex = cumulativeLength - rowLength + i;
row[i] = getElementNullity(cumulativeIndex) ? null : getElement(memory, rowDataOffset, cumulativeIndex);
}
return row;
}
/**
* Returns true if element is null, else false
*/
private boolean getElementNullity(final int cumulativeIndex)
{
byte b = memory.getByte(LongMath.checkedAdd(rowNullityDataOffset, (long) cumulativeIndex * Byte.BYTES));
if (b == NumericArrayFrameColumnWriter.NULL_ELEMENT_MARKER) {
return true;
}
assert b == NumericArrayFrameColumnWriter.NON_NULL_ELEMENT_MARKER;
return false;
}
/**
* Returns the value of the element of the array in the memory provided, given that the start of the array is
* {@code rowDataOffset} and the index of the element in the array is {@code cumulativeIndex}
*/
abstract Number getElement(Memory memory, long rowDataOffset, int cumulativeIndex);
}
}

View File

@ -92,17 +92,18 @@ public class StringFrameColumnReader implements FrameColumnReader
final Memory memory = frame.region(columnNumber);
validate(memory);
if (isMultiValue(memory)) {
// When we implement handling of multi-value, we should actually make this look like an Array of String instead
// of perpetuating the multi-value idea. Thus, when we add support for Arrays to the RAC stuff, that's when
// we can start supporting multi-value.
throw new ISE("Multivalue not yet handled by RAC");
}
final long positionOfLengths = getStartOfStringLengthSection(frame.numRows(), false);
final long positionOfPayloads = getStartOfStringDataSection(memory, frame.numRows(), false);
StringFrameColumn frameCol =
new StringFrameColumn(frame, false, memory, positionOfLengths, positionOfPayloads, false);
new StringFrameColumn(
frame,
false,
memory,
positionOfLengths,
positionOfPayloads,
asArray || isMultiValue(memory) // Read MVDs as String arrays
);
return new ColumnAccessorBasedColumn(frameCol);
}
@ -174,40 +175,9 @@ public class StringFrameColumnReader implements FrameColumnReader
return memory.getByte(1) == 1;
}
/**
* Returns cumulative row length, if the row is not null itself, or -(cumulative row length) - 1 if the row is
* null itself.
*
* To check if the return value from this function indicate a null row, use {@link #isNullRow(int)}
*
* To get the actual cumulative row length, use {@link #adjustCumulativeRowLength(int)}.
*/
private static int getCumulativeRowLength(final Memory memory, final int physicalRow)
private static long getStartOfCumulativeLengthSection()
{
// Note: only valid to call this if multiValue = true.
return memory.getInt(StringFrameColumnWriter.DATA_OFFSET + (long) Integer.BYTES * physicalRow);
}
/**
* When given a return value from {@link #getCumulativeRowLength(Memory, int)}, returns whether the row is
* null itself (i.e. a null array).
*/
private static boolean isNullRow(final int cumulativeRowLength)
{
return cumulativeRowLength < 0;
}
/**
* Adjusts a negative cumulative row length from {@link #getCumulativeRowLength(Memory, int)} to be the actual
* positive length.
*/
private static int adjustCumulativeRowLength(final int cumulativeRowLength)
{
if (cumulativeRowLength < 0) {
return -(cumulativeRowLength + 1);
} else {
return cumulativeRowLength;
}
return StringFrameColumnWriter.DATA_OFFSET;
}
private static long getStartOfStringLengthSection(
@ -231,7 +201,11 @@ public class StringFrameColumnReader implements FrameColumnReader
final int totalNumValues;
if (multiValue) {
totalNumValues = adjustCumulativeRowLength(getCumulativeRowLength(memory, numRows - 1));
totalNumValues = FrameColumnReaderUtils.getAdjustedCumulativeRowLength(
memory,
getStartOfCumulativeLengthSection(),
numRows - 1
);
} else {
totalNumValues = numRows;
}
@ -489,15 +463,23 @@ public class StringFrameColumnReader implements FrameColumnReader
private Object getRowAsObject(final int physicalRow, final boolean decode)
{
if (multiValue) {
final int cumulativeRowLength = getCumulativeRowLength(memory, physicalRow);
final int cumulativeRowLength = FrameColumnReaderUtils.getCumulativeRowLength(
memory,
getStartOfCumulativeLengthSection(),
physicalRow
);
final int rowLength;
if (isNullRow(cumulativeRowLength)) {
if (FrameColumnReaderUtils.isNullRow(cumulativeRowLength)) {
return null;
} else if (physicalRow == 0) {
rowLength = cumulativeRowLength;
} else {
rowLength = cumulativeRowLength - adjustCumulativeRowLength(getCumulativeRowLength(memory, physicalRow - 1));
rowLength = cumulativeRowLength - FrameColumnReaderUtils.getAdjustedCumulativeRowLength(
memory,
getStartOfCumulativeLengthSection(),
physicalRow - 1
);
}
if (rowLength == 0) {

View File

@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.frame.write.columnar;
import org.apache.datasketches.memory.WritableMemory;
import org.apache.druid.frame.allocation.MemoryAllocator;
import org.apache.druid.segment.ColumnValueSelector;
/**
* Columnar frame writer for {@link org.apache.druid.segment.column.ColumnType#DOUBLE_ARRAY} columns
*/
public class DoubleArrayFrameColumnWriter extends NumericArrayFrameColumnWriter
{
public DoubleArrayFrameColumnWriter(
ColumnValueSelector selector,
MemoryAllocator allocator
)
{
super(selector, allocator, FrameColumnWriters.TYPE_DOUBLE_ARRAY);
}
@Override
int elementSizeBytes()
{
return Double.BYTES;
}
@Override
void putNull(WritableMemory memory, long offset)
{
memory.putDouble(offset, 0d);
}
@Override
void putArrayElement(WritableMemory memory, long offset, Number element)
{
// The element is of type Double, and non-null, therefore it can be cast safely
memory.putDouble(offset, (double) element);
}
}

View File

@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.frame.write.columnar;
import org.apache.datasketches.memory.WritableMemory;
import org.apache.druid.frame.allocation.MemoryAllocator;
import org.apache.druid.segment.ColumnValueSelector;
/**
* Columnar frame writer for {@link org.apache.druid.segment.column.ColumnType#FLOAT_ARRAY} columns
*/
public class FloatArrayFrameColumnWriter extends NumericArrayFrameColumnWriter
{
public FloatArrayFrameColumnWriter(
ColumnValueSelector selector,
MemoryAllocator allocator
)
{
super(selector, allocator, FrameColumnWriters.TYPE_FLOAT_ARRAY);
}
@Override
int elementSizeBytes()
{
return Float.BYTES;
}
@Override
void putNull(WritableMemory memory, long offset)
{
memory.putFloat(offset, 0f);
}
@Override
void putArrayElement(WritableMemory memory, long offset, Number element)
{
// The element is of type Float, and non-null, therefore it can be cast safely
memory.putFloat(offset, (float) element);
}
}

View File

@ -23,14 +23,34 @@ import org.apache.datasketches.memory.WritableMemory;
import java.io.Closeable;
/**
* Represent writers for the columnar frames.
*
* The class objects must be provided with information on what values to write, usually provided as a
* {@link org.apache.druid.segment.ColumnValueSelector} and where to write to, usually temporary growable memory
* {@link #addSelection()} will be called repeatedly, as the current value to write gets updated. For the final write,
* call {@link #writeTo}, which will copy the values we have added so far to the destination memory.
*/
public interface FrameColumnWriter extends Closeable
{
/**
* Adds the current value to the writer
*/
boolean addSelection();
/**
* Reverts the last added value. Undo calls cannot be called in successsion
*/
void undo();
/**
* Size (in bytes) of the column data that will get written when {@link #writeTo} will be called
*/
long size();
/**
* Writes the value of the column to the provided memory at the given position
*/
long writeTo(WritableMemory memory, long position);
@Override

View File

@ -41,6 +41,9 @@ public class FrameColumnWriters
public static final byte TYPE_STRING = 4;
public static final byte TYPE_COMPLEX = 5;
public static final byte TYPE_STRING_ARRAY = 6;
public static final byte TYPE_LONG_ARRAY = 7;
public static final byte TYPE_FLOAT_ARRAY = 8;
public static final byte TYPE_DOUBLE_ARRAY = 9;
private FrameColumnWriters()
{
@ -76,6 +79,12 @@ public class FrameColumnWriters
switch (type.getElementType().getType()) {
case STRING:
return makeStringArrayWriter(columnSelectorFactory, allocator, column);
case LONG:
return makeLongArrayWriter(columnSelectorFactory, allocator, column);
case FLOAT:
return makeFloatArrayWriter(columnSelectorFactory, allocator, column);
case DOUBLE:
return makeDoubleArrayWriter(columnSelectorFactory, allocator, column);
default:
throw new UnsupportedColumnTypeException(column, type);
}
@ -144,6 +153,36 @@ public class FrameColumnWriters
return new StringArrayFrameColumnWriterImpl(selector, allocator);
}
private static NumericArrayFrameColumnWriter makeLongArrayWriter(
final ColumnSelectorFactory selectorFactory,
final MemoryAllocator allocator,
final String columnName
)
{
final ColumnValueSelector<?> selector = selectorFactory.makeColumnValueSelector(columnName);
return new LongArrayFrameColumnWriter(selector, allocator);
}
private static NumericArrayFrameColumnWriter makeFloatArrayWriter(
final ColumnSelectorFactory selectorFactory,
final MemoryAllocator allocator,
final String columnName
)
{
final ColumnValueSelector<?> selector = selectorFactory.makeColumnValueSelector(columnName);
return new FloatArrayFrameColumnWriter(selector, allocator);
}
private static NumericArrayFrameColumnWriter makeDoubleArrayWriter(
final ColumnSelectorFactory selectorFactory,
final MemoryAllocator allocator,
final String columnName
)
{
final ColumnValueSelector<?> selector = selectorFactory.makeColumnValueSelector(columnName);
return new DoubleArrayFrameColumnWriter(selector, allocator);
}
private static ComplexFrameColumnWriter makeComplexWriter(
final ColumnSelectorFactory selectorFactory,
final MemoryAllocator allocator,

View File

@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.frame.write.columnar;
import org.apache.datasketches.memory.WritableMemory;
import org.apache.druid.frame.allocation.MemoryAllocator;
import org.apache.druid.segment.ColumnValueSelector;
/**
* Columnar frame writer for {@link org.apache.druid.segment.column.ColumnType#LONG_ARRAY} columns
*/
public class LongArrayFrameColumnWriter extends NumericArrayFrameColumnWriter
{
public LongArrayFrameColumnWriter(
ColumnValueSelector selector,
MemoryAllocator allocator
)
{
super(selector, allocator, FrameColumnWriters.TYPE_LONG_ARRAY);
}
@Override
int elementSizeBytes()
{
return Long.BYTES;
}
@Override
void putNull(WritableMemory memory, long offset)
{
memory.putLong(offset, 0L);
}
@Override
void putArrayElement(WritableMemory memory, long offset, Number element)
{
// The element is of type Long, and non-null, therefore it can be casted safely
memory.putLong(offset, (long) element);
}
}

View File

@ -0,0 +1,223 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.frame.write.columnar;
import org.apache.datasketches.memory.WritableMemory;
import org.apache.druid.error.DruidException;
import org.apache.druid.frame.allocation.AppendableMemory;
import org.apache.druid.frame.allocation.MemoryAllocator;
import org.apache.druid.frame.allocation.MemoryRange;
import org.apache.druid.frame.write.FrameWriterUtils;
import org.apache.druid.segment.ColumnValueSelector;
import java.util.List;
/**
* Parent class for the family of writers writing numeric arrays in columnar frames. Since the numeric primitives are
* fixed width, we don't need to store the width of each element. The memory layout of a column written by this writer
* is as follows:
*
* n : Total number of rows
* k : Total number of elements in all the rows, cumulative
*
* | Section | Length of the section | Denotion |
* |---------|-----------------------|--------------------------------------------------------------------------------------|
* | 0 | 1 | typeCode |
* | 1 | n * Integer.BYTES | n integers, where i-th integer represents the cumulative length of the array |
* | 2 | k * Byte.BYTES | k bytes, where i-th byte represent whether the i-th value from the start is null |
* | 3 | k * ELEMENT_SIZE | k values, each representing the element, or null equivalent value (e.g 0 for double) |
*
* Note on cumulative lengths stored in section 1: Cumulative lengths are stored so that its fast to offset into the
* elements of the array. We also use negative cumulative length to denote that the array itself is null (as opposed to
* individual elements being null, which we store in section 2)
*/
public abstract class NumericArrayFrameColumnWriter implements FrameColumnWriter
{
/**
* Equivalent to {@link AppendableMemory#DEFAULT_INITIAL_ALLOCATION_SIZE} / 3, since the memory would be further split
* up into three regions
*/
private static final int INITIAL_ALLOCATION_SIZE = 120;
public static final byte NULL_ELEMENT_MARKER = 0x00;
public static final byte NON_NULL_ELEMENT_MARKER = 0x01;
/**
* A byte required at the beginning for type code
*/
public static final long DATA_OFFSET = 1;
final ColumnValueSelector selector;
final byte typeCode;
/**
* Row lengths: one int per row with the number of values contained by that row and all previous rows.
* Only written for multi-value and array columns. When the corresponding row is null itself, the length is
* written as -(actual length) - 1. (Guaranteed to be a negative number even if "actual length" is zero.)
*/
private final AppendableMemory cumulativeRowLengths;
/**
* Denotes if the element of the row is null or not
*/
private final AppendableMemory rowNullityData;
/**
* Row data.
*/
private final AppendableMemory rowData;
private int lastCumulativeRowLength = 0;
private int lastRowLength = -1;
public NumericArrayFrameColumnWriter(
final ColumnValueSelector selector,
final MemoryAllocator allocator,
final byte typeCode
)
{
this.selector = selector;
this.typeCode = typeCode;
this.cumulativeRowLengths = AppendableMemory.create(allocator, INITIAL_ALLOCATION_SIZE);
this.rowNullityData = AppendableMemory.create(allocator, INITIAL_ALLOCATION_SIZE);
this.rowData = AppendableMemory.create(allocator, INITIAL_ALLOCATION_SIZE);
}
/**
* Returns the size of the elements of the array
*/
abstract int elementSizeBytes();
/**
* Inserts default null value in the given memory location at the provided offset.
*/
abstract void putNull(WritableMemory memory, long offset);
/**
* Inserts the element value in the given memory location at the provided offset.
*/
abstract void putArrayElement(WritableMemory memory, long offset, Number element);
@Override
public boolean addSelection()
{
List<? extends Number> numericArray = FrameWriterUtils.getNumericArrayFromObject(selector.getObject());
int rowLength = numericArray == null ? 0 : numericArray.size();
// Begin memory allocations before writing
if ((long) lastCumulativeRowLength + rowLength > Integer.MAX_VALUE) {
return false;
}
if (!cumulativeRowLengths.reserveAdditional(Integer.BYTES)) {
return false;
}
if (!rowNullityData.reserveAdditional(rowLength * Byte.BYTES)) {
return false;
}
if (!rowData.reserveAdditional(rowLength * elementSizeBytes())) {
return false;
}
// Memory allocations completed
final MemoryRange<WritableMemory> rowLengthsCursor = cumulativeRowLengths.cursor();
if (numericArray == null) {
rowLengthsCursor.memory().putInt(rowLengthsCursor.start(), -(lastCumulativeRowLength + rowLength) - 1);
} else {
rowLengthsCursor.memory().putInt(rowLengthsCursor.start(), lastCumulativeRowLength + rowLength);
}
cumulativeRowLengths.advanceCursor(Integer.BYTES);
lastRowLength = rowLength;
lastCumulativeRowLength += rowLength;
final MemoryRange<WritableMemory> rowNullityDataCursor = rowLength > 0 ? rowNullityData.cursor() : null;
final MemoryRange<WritableMemory> rowDataCursor = rowLength > 0 ? rowData.cursor() : null;
for (int i = 0; i < rowLength; ++i) {
final Number element = numericArray.get(i);
final long memoryOffset = rowDataCursor.start() + ((long) elementSizeBytes() * i);
if (element == null) {
rowNullityDataCursor.memory()
.putByte(rowNullityDataCursor.start() + (long) Byte.BYTES * i, NULL_ELEMENT_MARKER);
putNull(rowDataCursor.memory(), memoryOffset);
} else {
rowNullityDataCursor.memory()
.putByte(rowNullityDataCursor.start() + (long) Byte.BYTES * i, NON_NULL_ELEMENT_MARKER);
putArrayElement(rowDataCursor.memory(), memoryOffset, element);
}
}
if (rowLength > 0) {
rowNullityData.advanceCursor(Byte.BYTES * rowLength);
rowData.advanceCursor(elementSizeBytes() * rowLength);
}
return true;
}
@Override
public void undo()
{
if (lastRowLength == -1) {
throw DruidException.defensive("Nothing written to undo()");
}
cumulativeRowLengths.rewindCursor(Integer.BYTES);
rowNullityData.rewindCursor(lastRowLength * Byte.BYTES);
rowData.rewindCursor(lastRowLength * elementSizeBytes());
lastCumulativeRowLength -= lastRowLength;
// Multiple undo calls cannot be chained together
lastRowLength = -1;
}
@Override
public long size()
{
return DATA_OFFSET + cumulativeRowLengths.size() + rowNullityData.size() + rowData.size();
}
@Override
public long writeTo(final WritableMemory memory, final long startPosition)
{
long currentPosition = startPosition;
memory.putByte(currentPosition, typeCode);
++currentPosition;
currentPosition += cumulativeRowLengths.writeTo(memory, currentPosition);
currentPosition += rowNullityData.writeTo(memory, currentPosition);
currentPosition += rowData.writeTo(memory, currentPosition);
return currentPosition - startPosition;
}
@Override
public void close()
{
cumulativeRowLengths.close();
rowNullityData.close();
rowData.close();
}
}

View File

@ -244,27 +244,18 @@ public class FrameWriterTest extends InitializedNullHandlingTest
@Test
public void test_arrayLong()
{
// ARRAY<LONG> can't be read or written for columnar frames, therefore skip the check if it encounters those
// parameters
Assume.assumeFalse(inputFrameType == FrameType.COLUMNAR || outputFrameType == FrameType.COLUMNAR);
testWithDataset(FrameWriterTestData.TEST_ARRAYS_LONG);
}
@Test
public void test_arrayFloat()
{
// ARRAY<FLOAT> can't be read or written for columnar frames, therefore skip the check if it encounters those
// parameters
Assume.assumeFalse(inputFrameType == FrameType.COLUMNAR || outputFrameType == FrameType.COLUMNAR);
testWithDataset(FrameWriterTestData.TEST_ARRAYS_FLOAT);
}
@Test
public void test_arrayDouble()
{
// ARRAY<DOUBLE> can't be read or written for columnar frames, therefore skip the check if it encounters those
// parameters
Assume.assumeFalse(inputFrameType == FrameType.COLUMNAR || outputFrameType == FrameType.COLUMNAR);
testWithDataset(FrameWriterTestData.TEST_ARRAYS_DOUBLE);
}

View File

@ -67,7 +67,16 @@ public class FrameWritersTest extends InitializedNullHandlingTest
final FrameWriterFactory factory = FrameWriters.makeFrameWriterFactory(
FrameType.COLUMNAR,
new ArenaMemoryAllocatorFactory(ALLOCATOR_CAPACITY),
RowSignature.builder().add("x", ColumnType.LONG).build(),
RowSignature.builder()
.add("a", ColumnType.LONG)
.add("b", ColumnType.FLOAT)
.add("c", ColumnType.DOUBLE)
.add("d", ColumnType.STRING)
.add("e", ColumnType.LONG_ARRAY)
.add("f", ColumnType.FLOAT_ARRAY)
.add("g", ColumnType.DOUBLE_ARRAY)
.add("h", ColumnType.STRING_ARRAY)
.build(),
Collections.emptyList()
);
@ -81,7 +90,7 @@ public class FrameWritersTest extends InitializedNullHandlingTest
final FrameWriterFactory factory = FrameWriters.makeFrameWriterFactory(
FrameType.COLUMNAR,
new ArenaMemoryAllocatorFactory(ALLOCATOR_CAPACITY),
RowSignature.builder().add("x", ColumnType.LONG_ARRAY).build(),
RowSignature.builder().add("x", ColumnType.ofArray(ColumnType.LONG_ARRAY)).build(),
Collections.emptyList()
);
@ -91,7 +100,7 @@ public class FrameWritersTest extends InitializedNullHandlingTest
);
Assert.assertEquals("x", e.getColumnName());
Assert.assertEquals(ColumnType.LONG_ARRAY, e.getColumnType());
Assert.assertEquals(ColumnType.ofArray(ColumnType.LONG_ARRAY), e.getColumnType());
}
@Test