From a1b2c7326ede4da12bb71ce10f43ba431a22001d Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Wed, 21 Feb 2024 11:32:33 +0530 Subject: [PATCH] Numeric array support for columnar frames (#15917) Columnar frames used in subquery materialization and window functions now support numeric arrays. --- .../DoubleArrayFrameColumnReader.java | 64 ++++ .../columnar/FloatArrayFrameColumnReader.java | 64 ++++ .../read/columnar/FrameColumnReaderUtils.java | 66 ++++ .../read/columnar/FrameColumnReaders.java | 17 +- .../columnar/LongArrayFrameColumnReader.java | 64 ++++ .../NumericArrayFrameColumnReader.java | 360 ++++++++++++++++++ .../columnar/StringFrameColumnReader.java | 70 ++-- .../DoubleArrayFrameColumnWriter.java | 57 +++ .../columnar/FloatArrayFrameColumnWriter.java | 57 +++ .../write/columnar/FrameColumnWriter.java | 20 + .../write/columnar/FrameColumnWriters.java | 39 ++ .../columnar/LongArrayFrameColumnWriter.java | 57 +++ .../NumericArrayFrameColumnWriter.java | 223 +++++++++++ .../druid/frame/write/FrameWriterTest.java | 9 - .../druid/frame/write/FrameWritersTest.java | 15 +- 15 files changed, 1120 insertions(+), 62 deletions(-) create mode 100644 processing/src/main/java/org/apache/druid/frame/read/columnar/DoubleArrayFrameColumnReader.java create mode 100644 processing/src/main/java/org/apache/druid/frame/read/columnar/FloatArrayFrameColumnReader.java create mode 100644 processing/src/main/java/org/apache/druid/frame/read/columnar/FrameColumnReaderUtils.java create mode 100644 processing/src/main/java/org/apache/druid/frame/read/columnar/LongArrayFrameColumnReader.java create mode 100644 processing/src/main/java/org/apache/druid/frame/read/columnar/NumericArrayFrameColumnReader.java create mode 100644 processing/src/main/java/org/apache/druid/frame/write/columnar/DoubleArrayFrameColumnWriter.java create mode 100644 processing/src/main/java/org/apache/druid/frame/write/columnar/FloatArrayFrameColumnWriter.java create mode 100644 processing/src/main/java/org/apache/druid/frame/write/columnar/LongArrayFrameColumnWriter.java create mode 100644 processing/src/main/java/org/apache/druid/frame/write/columnar/NumericArrayFrameColumnWriter.java diff --git a/processing/src/main/java/org/apache/druid/frame/read/columnar/DoubleArrayFrameColumnReader.java b/processing/src/main/java/org/apache/druid/frame/read/columnar/DoubleArrayFrameColumnReader.java new file mode 100644 index 00000000000..909f2c5b364 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/frame/read/columnar/DoubleArrayFrameColumnReader.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.frame.read.columnar; + +import com.google.common.math.LongMath; +import org.apache.datasketches.memory.Memory; +import org.apache.druid.frame.Frame; +import org.apache.druid.frame.write.columnar.FrameColumnWriters; +import org.apache.druid.segment.column.ColumnType; + +/** + * Reader for columns written by {@link org.apache.druid.frame.write.columnar.DoubleArrayFrameColumnWriter} + * + * @see NumericArrayFrameColumnReader + * @see org.apache.druid.frame.write.columnar.NumericArrayFrameColumnWriter for column's layout in memory + */ +public class DoubleArrayFrameColumnReader extends NumericArrayFrameColumnReader +{ + public DoubleArrayFrameColumnReader(int columnNumber) + { + super(FrameColumnWriters.TYPE_DOUBLE_ARRAY, ColumnType.DOUBLE_ARRAY, columnNumber); + } + + @Override + NumericArrayFrameColumn column(Frame frame, Memory memory, ColumnType columnType) + { + return new DoubleArrayFrameColumn(frame, memory, columnType); + } + + private static class DoubleArrayFrameColumn extends NumericArrayFrameColumn + { + public DoubleArrayFrameColumn( + Frame frame, + Memory memory, + ColumnType columnType + ) + { + super(frame, memory, columnType); + } + + @Override + Number getElement(Memory memory, long rowDataOffset, int cumulativeIndex) + { + return memory.getDouble(LongMath.checkedAdd(rowDataOffset, (long) cumulativeIndex * Double.BYTES)); + } + } +} diff --git a/processing/src/main/java/org/apache/druid/frame/read/columnar/FloatArrayFrameColumnReader.java b/processing/src/main/java/org/apache/druid/frame/read/columnar/FloatArrayFrameColumnReader.java new file mode 100644 index 00000000000..ea1ffbdd060 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/frame/read/columnar/FloatArrayFrameColumnReader.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.frame.read.columnar; + +import com.google.common.math.LongMath; +import org.apache.datasketches.memory.Memory; +import org.apache.druid.frame.Frame; +import org.apache.druid.frame.write.columnar.FrameColumnWriters; +import org.apache.druid.segment.column.ColumnType; + +/** + * Reader for columns written by {@link org.apache.druid.frame.write.columnar.FloatArrayFrameColumnWriter} + * + * @see NumericArrayFrameColumnReader + * @see org.apache.druid.frame.write.columnar.NumericArrayFrameColumnWriter for column's layout in memory + */ +public class FloatArrayFrameColumnReader extends NumericArrayFrameColumnReader +{ + public FloatArrayFrameColumnReader(int columnNumber) + { + super(FrameColumnWriters.TYPE_FLOAT_ARRAY, ColumnType.FLOAT_ARRAY, columnNumber); + } + + @Override + NumericArrayFrameColumn column(Frame frame, Memory memory, ColumnType columnType) + { + return new FloatArrayFrameColumn(frame, memory, columnType); + } + + private static class FloatArrayFrameColumn extends NumericArrayFrameColumn + { + public FloatArrayFrameColumn( + Frame frame, + Memory memory, + ColumnType columnType + ) + { + super(frame, memory, columnType); + } + + @Override + Number getElement(Memory memory, long rowDataOffset, int cumulativeIndex) + { + return memory.getFloat(LongMath.checkedAdd(rowDataOffset, (long) cumulativeIndex * Float.BYTES)); + } + } +} diff --git a/processing/src/main/java/org/apache/druid/frame/read/columnar/FrameColumnReaderUtils.java b/processing/src/main/java/org/apache/druid/frame/read/columnar/FrameColumnReaderUtils.java new file mode 100644 index 00000000000..c0dfdc6a76f --- /dev/null +++ b/processing/src/main/java/org/apache/druid/frame/read/columnar/FrameColumnReaderUtils.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.frame.read.columnar; + +import org.apache.datasketches.memory.Memory; + +public class FrameColumnReaderUtils +{ + /** + * Adjusts a negative cumulative row length from {@link #getCumulativeRowLength(Memory, long, int)} to be the actual + * positive length. + */ + public static int adjustCumulativeRowLength(final int cumulativeRowLength) + { + if (cumulativeRowLength < 0) { + return -(cumulativeRowLength + 1); + } else { + return cumulativeRowLength; + } + } + + /** + * Returns cumulative row length, if the row is not null itself, or -(cumulative row length) - 1 if the row is + * null itself. + * + * To check if the return value from this function indicate a null row, use {@link #isNullRow(int)} + * + * To get the actual cumulative row length, use {@link FrameColumnReaderUtils#adjustCumulativeRowLength(int)}. + */ + public static int getCumulativeRowLength(final Memory memory, final long offset, final int physicalRow) + { + // Note: only valid to call this if multiValue = true. + return memory.getInt(offset + (long) Integer.BYTES * physicalRow); + } + + public static int getAdjustedCumulativeRowLength(final Memory memory, final long offset, final int physicalRow) + { + return adjustCumulativeRowLength(getCumulativeRowLength(memory, offset, physicalRow)); + } + + /** + * When given a return value from {@link FrameColumnReaderUtils#getCumulativeRowLength(Memory, long, int)}, returns whether the row is + * null itself (i.e. a null array). + */ + public static boolean isNullRow(final int cumulativeRowLength) + { + return cumulativeRowLength < 0; + } +} diff --git a/processing/src/main/java/org/apache/druid/frame/read/columnar/FrameColumnReaders.java b/processing/src/main/java/org/apache/druid/frame/read/columnar/FrameColumnReaders.java index 98218819ce1..9b4dc85cb1e 100644 --- a/processing/src/main/java/org/apache/druid/frame/read/columnar/FrameColumnReaders.java +++ b/processing/src/main/java/org/apache/druid/frame/read/columnar/FrameColumnReaders.java @@ -20,7 +20,6 @@ package org.apache.druid.frame.read.columnar; import org.apache.druid.segment.column.ColumnType; -import org.apache.druid.segment.column.ValueType; /** * Creates {@link FrameColumnReader} corresponding to a given column type and number. @@ -58,12 +57,18 @@ public class FrameColumnReaders return new ComplexFrameColumnReader(columnNumber); case ARRAY: - if (columnType.getElementType().getType() == ValueType.STRING) { - return new StringFrameColumnReader(columnNumber, true); - } else { - return new UnsupportedColumnTypeFrameColumnReader(columnName, columnType); + switch (columnType.getElementType().getType()) { + case STRING: + return new StringFrameColumnReader(columnNumber, true); + case LONG: + return new LongArrayFrameColumnReader(columnNumber); + case FLOAT: + return new FloatArrayFrameColumnReader(columnNumber); + case DOUBLE: + return new DoubleArrayFrameColumnReader(columnNumber); + default: + return new UnsupportedColumnTypeFrameColumnReader(columnName, columnType); } - default: return new UnsupportedColumnTypeFrameColumnReader(columnName, columnType); } diff --git a/processing/src/main/java/org/apache/druid/frame/read/columnar/LongArrayFrameColumnReader.java b/processing/src/main/java/org/apache/druid/frame/read/columnar/LongArrayFrameColumnReader.java new file mode 100644 index 00000000000..898e1f1cebb --- /dev/null +++ b/processing/src/main/java/org/apache/druid/frame/read/columnar/LongArrayFrameColumnReader.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.frame.read.columnar; + +import com.google.common.math.LongMath; +import org.apache.datasketches.memory.Memory; +import org.apache.druid.frame.Frame; +import org.apache.druid.frame.write.columnar.FrameColumnWriters; +import org.apache.druid.segment.column.ColumnType; + +/** + * Reader for columns written by {@link org.apache.druid.frame.write.columnar.LongArrayFrameColumnWriter} + * + * @see NumericArrayFrameColumnReader + * @see org.apache.druid.frame.write.columnar.NumericArrayFrameColumnWriter for column's layout in memory + */ +public class LongArrayFrameColumnReader extends NumericArrayFrameColumnReader +{ + public LongArrayFrameColumnReader(int columnNumber) + { + super(FrameColumnWriters.TYPE_LONG_ARRAY, ColumnType.LONG_ARRAY, columnNumber); + } + + @Override + NumericArrayFrameColumn column(Frame frame, Memory memory, ColumnType columnType) + { + return new LongArrayFrameColumn(frame, memory, columnType); + } + + private static class LongArrayFrameColumn extends NumericArrayFrameColumn + { + public LongArrayFrameColumn( + Frame frame, + Memory memory, + ColumnType columnType + ) + { + super(frame, memory, columnType); + } + + @Override + Number getElement(Memory memory, long rowDataOffset, int cumulativeIndex) + { + return memory.getLong(LongMath.checkedAdd(rowDataOffset, (long) cumulativeIndex * Long.BYTES)); + } + } +} diff --git a/processing/src/main/java/org/apache/druid/frame/read/columnar/NumericArrayFrameColumnReader.java b/processing/src/main/java/org/apache/druid/frame/read/columnar/NumericArrayFrameColumnReader.java new file mode 100644 index 00000000000..986baaa099d --- /dev/null +++ b/processing/src/main/java/org/apache/druid/frame/read/columnar/NumericArrayFrameColumnReader.java @@ -0,0 +1,360 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.frame.read.columnar; + +import com.google.common.math.LongMath; +import it.unimi.dsi.fastutil.objects.ObjectArrays; +import org.apache.datasketches.memory.Memory; +import org.apache.druid.error.DruidException; +import org.apache.druid.frame.Frame; +import org.apache.druid.frame.write.columnar.NumericArrayFrameColumnWriter; +import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; +import org.apache.druid.query.rowsandcols.column.Column; +import org.apache.druid.query.rowsandcols.column.ColumnAccessorBasedColumn; +import org.apache.druid.query.rowsandcols.column.accessor.ObjectColumnAccessorBase; +import org.apache.druid.segment.ColumnValueSelector; +import org.apache.druid.segment.ObjectColumnSelector; +import org.apache.druid.segment.column.BaseColumn; +import org.apache.druid.segment.column.ColumnCapabilitiesImpl; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.data.ReadableOffset; +import org.apache.druid.segment.vector.ReadableVectorInspector; +import org.apache.druid.segment.vector.ReadableVectorOffset; +import org.apache.druid.segment.vector.VectorObjectSelector; + +import javax.annotation.Nullable; +import java.util.Comparator; + +/** + * Implementations of this class reads columns written by the corresponding implementations of {@link NumericArrayFrameColumnWriter}. + * + * @see NumericArrayFrameColumnWriter for the column format read + */ +public abstract class NumericArrayFrameColumnReader implements FrameColumnReader +{ + private final byte typeCode; + private final ColumnType columnType; + private final int columnNumber; + + public NumericArrayFrameColumnReader(byte typeCode, ColumnType columnType, int columnNumber) + { + this.typeCode = typeCode; + this.columnType = columnType; + this.columnNumber = columnNumber; + } + + @Override + public Column readRACColumn(Frame frame) + { + final Memory memory = frame.region(columnNumber); + validate(memory); + return new ColumnAccessorBasedColumn(column(frame, memory, columnType)); + } + + @Override + public ColumnPlus readColumn(Frame frame) + { + final Memory memory = frame.region(columnNumber); + validate(memory); + return new ColumnPlus( + column(frame, memory, columnType), + ColumnCapabilitiesImpl.createSimpleArrayColumnCapabilities(columnType), + frame.numRows() + ); + } + + abstract NumericArrayFrameColumn column(Frame frame, Memory memory, ColumnType columnType); + + /** + * Validates that the written type code is the same as the provided type code. It's a defensive check that prevents + * unexpected results by reading columns of different types + */ + private void validate(final Memory region) + { + if (region.getCapacity() < NumericArrayFrameColumnWriter.DATA_OFFSET) { + throw DruidException.defensive("Column[%s] is not big enough for a header", columnNumber); + } + final byte actualTypeCode = region.getByte(0); + if (actualTypeCode != this.typeCode) { + throw DruidException.defensive( + "Column[%s] does not have the correct type code; expected[%s], got[%s]", + columnNumber, + this.typeCode, + actualTypeCode + ); + } + } + + /** + * Gets the start of the section where cumulative lengths of the array elements are stored (section 1) + */ + private static long getStartOfCumulativeLengthSection() + { + return NumericArrayFrameColumnWriter.DATA_OFFSET; + } + + /** + * Gets the start of the section where information about element's nullity is stored (section 2) + */ + private static long getStartOfRowNullityData(final int numRows) + { + return getStartOfCumulativeLengthSection() + ((long) numRows * Integer.BYTES); + } + + /** + * Gets the start of the section where elements are stored (section 3) + */ + private static long getStartOfRowData(final Memory memory, final int numRows) + { + long nullityDataOffset = + (long) Byte.BYTES * FrameColumnReaderUtils.getAdjustedCumulativeRowLength( + memory, + getStartOfCumulativeLengthSection(), + numRows - 1 + ); + return LongMath.checkedAdd(getStartOfRowNullityData(numRows), nullityDataOffset); + } + + public abstract static class NumericArrayFrameColumn extends ObjectColumnAccessorBase implements BaseColumn + { + private final Frame frame; + private final Memory memory; + private final ColumnType columnType; + + /** + * Cache start of rowNullityDataOffset, as it won't change + */ + private final long rowNullityDataOffset; + + /** + * Cache start of rowDataOffset, as it won't change + */ + private final long rowDataOffset; + + + public NumericArrayFrameColumn(Frame frame, Memory memory, ColumnType columnType) + { + this.frame = frame; + this.memory = memory; + this.columnType = columnType; + + this.rowNullityDataOffset = getStartOfRowNullityData(frame.numRows()); + this.rowDataOffset = getStartOfRowData(memory, frame.numRows()); + } + + @Override + public ColumnType getType() + { + return columnType; + } + + @Override + public int numRows() + { + return frame.numRows(); + } + + @Override + protected Object getVal(int rowNum) + { + return getNumericArray(physicalRow(rowNum)); + } + + @Override + protected Comparator getComparator() + { + return columnType.getNullableStrategy(); + } + + @Override + public ColumnValueSelector makeColumnValueSelector(ReadableOffset offset) + { + // Cache's the row's value before returning + return new ObjectColumnSelector() + { + + // Cached row number + private int cachedLogicalRow = -1; + + // Cached value + @Nullable + private Object[] cachedValue = null; + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + } + + @Nullable + @Override + public Object getObject() + { + compute(); + return cachedValue; + } + + @Override + public Class classOfObject() + { + return Object[].class; + } + + /** + * Cache's the row value and the logical row number into the class variables + */ + private void compute() + { + int currentLogicalRow = offset.getOffset(); + if (cachedLogicalRow == currentLogicalRow) { + return; + } + cachedValue = getNumericArray(physicalRow(currentLogicalRow)); + cachedLogicalRow = currentLogicalRow; + } + }; + } + + @Override + public VectorObjectSelector makeVectorObjectSelector(ReadableVectorOffset offset) + { + return new VectorObjectSelector() + { + private final Object[] vector = new Object[offset.getMaxVectorSize()]; + private int id = ReadableVectorInspector.NULL_ID; + + @Override + public Object[] getObjectVector() + { + computeVector(); + return vector; + } + + @Override + public int getMaxVectorSize() + { + return offset.getMaxVectorSize(); + } + + @Override + public int getCurrentVectorSize() + { + return offset.getCurrentVectorSize(); + } + + private void computeVector() + { + if (id == offset.getId()) { + return; + } + + if (offset.isContiguous()) { + // Contiguous offsets can have a cache optimized implementation if 'frame.isPermuted() == false', + // i.e. logicalRow == physicalRow. The implementation can separately fetch out the nullity data, and the + // element data continguously. + final int start = offset.getStartOffset(); + for (int i = 0; i < offset.getCurrentVectorSize(); ++i) { + vector[i] = getNumericArray(physicalRow(start + i)); + } + } else { + final int[] offsets = offset.getOffsets(); + for (int i = 0; i < offset.getCurrentVectorSize(); ++i) { + vector[i] = getNumericArray(physicalRow(offsets[i])); + } + + id = offset.getId(); + } + } + }; + } + + @Override + public void close() + { + // Do nothing + } + + private int physicalRow(int logicalRow) + { + return frame.physicalRow(logicalRow); + } + + /** + * Given the physical row, it fetches the value from the memory + */ + @Nullable + private Object[] getNumericArray(final int physicalRow) + { + final int cumulativeLength = FrameColumnReaderUtils.getCumulativeRowLength( + memory, + getStartOfCumulativeLengthSection(), + physicalRow + ); + + final int rowLength; + if (FrameColumnReaderUtils.isNullRow(cumulativeLength)) { + return null; + } else if (physicalRow == 0) { + rowLength = cumulativeLength; + } else { + final int previousCumulativeLength = FrameColumnReaderUtils.adjustCumulativeRowLength( + FrameColumnReaderUtils.getCumulativeRowLength( + memory, + getStartOfCumulativeLengthSection(), + physicalRow - 1 + ) + ); + // cumulativeLength doesn't need to be adjusted, since its greater than 0 or else it would have been a null row, + // which we check for in the first if..else + rowLength = cumulativeLength - previousCumulativeLength; + } + + if (rowLength == 0) { + return ObjectArrays.EMPTY_ARRAY; + } + + final Object[] row = new Object[rowLength]; + for (int i = 0; i < rowLength; ++i) { + final int cumulativeIndex = cumulativeLength - rowLength + i; + row[i] = getElementNullity(cumulativeIndex) ? null : getElement(memory, rowDataOffset, cumulativeIndex); + } + + return row; + } + + /** + * Returns true if element is null, else false + */ + private boolean getElementNullity(final int cumulativeIndex) + { + byte b = memory.getByte(LongMath.checkedAdd(rowNullityDataOffset, (long) cumulativeIndex * Byte.BYTES)); + if (b == NumericArrayFrameColumnWriter.NULL_ELEMENT_MARKER) { + return true; + } + assert b == NumericArrayFrameColumnWriter.NON_NULL_ELEMENT_MARKER; + return false; + } + + /** + * Returns the value of the element of the array in the memory provided, given that the start of the array is + * {@code rowDataOffset} and the index of the element in the array is {@code cumulativeIndex} + */ + abstract Number getElement(Memory memory, long rowDataOffset, int cumulativeIndex); + } +} diff --git a/processing/src/main/java/org/apache/druid/frame/read/columnar/StringFrameColumnReader.java b/processing/src/main/java/org/apache/druid/frame/read/columnar/StringFrameColumnReader.java index 119dd48a3f1..d9fb9d83a9f 100644 --- a/processing/src/main/java/org/apache/druid/frame/read/columnar/StringFrameColumnReader.java +++ b/processing/src/main/java/org/apache/druid/frame/read/columnar/StringFrameColumnReader.java @@ -92,17 +92,18 @@ public class StringFrameColumnReader implements FrameColumnReader final Memory memory = frame.region(columnNumber); validate(memory); - if (isMultiValue(memory)) { - // When we implement handling of multi-value, we should actually make this look like an Array of String instead - // of perpetuating the multi-value idea. Thus, when we add support for Arrays to the RAC stuff, that's when - // we can start supporting multi-value. - throw new ISE("Multivalue not yet handled by RAC"); - } final long positionOfLengths = getStartOfStringLengthSection(frame.numRows(), false); final long positionOfPayloads = getStartOfStringDataSection(memory, frame.numRows(), false); StringFrameColumn frameCol = - new StringFrameColumn(frame, false, memory, positionOfLengths, positionOfPayloads, false); + new StringFrameColumn( + frame, + false, + memory, + positionOfLengths, + positionOfPayloads, + asArray || isMultiValue(memory) // Read MVDs as String arrays + ); return new ColumnAccessorBasedColumn(frameCol); } @@ -174,40 +175,9 @@ public class StringFrameColumnReader implements FrameColumnReader return memory.getByte(1) == 1; } - /** - * Returns cumulative row length, if the row is not null itself, or -(cumulative row length) - 1 if the row is - * null itself. - * - * To check if the return value from this function indicate a null row, use {@link #isNullRow(int)} - * - * To get the actual cumulative row length, use {@link #adjustCumulativeRowLength(int)}. - */ - private static int getCumulativeRowLength(final Memory memory, final int physicalRow) + private static long getStartOfCumulativeLengthSection() { - // Note: only valid to call this if multiValue = true. - return memory.getInt(StringFrameColumnWriter.DATA_OFFSET + (long) Integer.BYTES * physicalRow); - } - - /** - * When given a return value from {@link #getCumulativeRowLength(Memory, int)}, returns whether the row is - * null itself (i.e. a null array). - */ - private static boolean isNullRow(final int cumulativeRowLength) - { - return cumulativeRowLength < 0; - } - - /** - * Adjusts a negative cumulative row length from {@link #getCumulativeRowLength(Memory, int)} to be the actual - * positive length. - */ - private static int adjustCumulativeRowLength(final int cumulativeRowLength) - { - if (cumulativeRowLength < 0) { - return -(cumulativeRowLength + 1); - } else { - return cumulativeRowLength; - } + return StringFrameColumnWriter.DATA_OFFSET; } private static long getStartOfStringLengthSection( @@ -231,7 +201,11 @@ public class StringFrameColumnReader implements FrameColumnReader final int totalNumValues; if (multiValue) { - totalNumValues = adjustCumulativeRowLength(getCumulativeRowLength(memory, numRows - 1)); + totalNumValues = FrameColumnReaderUtils.getAdjustedCumulativeRowLength( + memory, + getStartOfCumulativeLengthSection(), + numRows - 1 + ); } else { totalNumValues = numRows; } @@ -489,15 +463,23 @@ public class StringFrameColumnReader implements FrameColumnReader private Object getRowAsObject(final int physicalRow, final boolean decode) { if (multiValue) { - final int cumulativeRowLength = getCumulativeRowLength(memory, physicalRow); + final int cumulativeRowLength = FrameColumnReaderUtils.getCumulativeRowLength( + memory, + getStartOfCumulativeLengthSection(), + physicalRow + ); final int rowLength; - if (isNullRow(cumulativeRowLength)) { + if (FrameColumnReaderUtils.isNullRow(cumulativeRowLength)) { return null; } else if (physicalRow == 0) { rowLength = cumulativeRowLength; } else { - rowLength = cumulativeRowLength - adjustCumulativeRowLength(getCumulativeRowLength(memory, physicalRow - 1)); + rowLength = cumulativeRowLength - FrameColumnReaderUtils.getAdjustedCumulativeRowLength( + memory, + getStartOfCumulativeLengthSection(), + physicalRow - 1 + ); } if (rowLength == 0) { diff --git a/processing/src/main/java/org/apache/druid/frame/write/columnar/DoubleArrayFrameColumnWriter.java b/processing/src/main/java/org/apache/druid/frame/write/columnar/DoubleArrayFrameColumnWriter.java new file mode 100644 index 00000000000..80c1b5ebfdf --- /dev/null +++ b/processing/src/main/java/org/apache/druid/frame/write/columnar/DoubleArrayFrameColumnWriter.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.frame.write.columnar; + +import org.apache.datasketches.memory.WritableMemory; +import org.apache.druid.frame.allocation.MemoryAllocator; +import org.apache.druid.segment.ColumnValueSelector; + +/** + * Columnar frame writer for {@link org.apache.druid.segment.column.ColumnType#DOUBLE_ARRAY} columns + */ +public class DoubleArrayFrameColumnWriter extends NumericArrayFrameColumnWriter +{ + public DoubleArrayFrameColumnWriter( + ColumnValueSelector selector, + MemoryAllocator allocator + ) + { + super(selector, allocator, FrameColumnWriters.TYPE_DOUBLE_ARRAY); + } + + @Override + int elementSizeBytes() + { + return Double.BYTES; + } + + @Override + void putNull(WritableMemory memory, long offset) + { + memory.putDouble(offset, 0d); + } + + @Override + void putArrayElement(WritableMemory memory, long offset, Number element) + { + // The element is of type Double, and non-null, therefore it can be cast safely + memory.putDouble(offset, (double) element); + } +} diff --git a/processing/src/main/java/org/apache/druid/frame/write/columnar/FloatArrayFrameColumnWriter.java b/processing/src/main/java/org/apache/druid/frame/write/columnar/FloatArrayFrameColumnWriter.java new file mode 100644 index 00000000000..47c492c28bb --- /dev/null +++ b/processing/src/main/java/org/apache/druid/frame/write/columnar/FloatArrayFrameColumnWriter.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.frame.write.columnar; + +import org.apache.datasketches.memory.WritableMemory; +import org.apache.druid.frame.allocation.MemoryAllocator; +import org.apache.druid.segment.ColumnValueSelector; + +/** + * Columnar frame writer for {@link org.apache.druid.segment.column.ColumnType#FLOAT_ARRAY} columns + */ +public class FloatArrayFrameColumnWriter extends NumericArrayFrameColumnWriter +{ + public FloatArrayFrameColumnWriter( + ColumnValueSelector selector, + MemoryAllocator allocator + ) + { + super(selector, allocator, FrameColumnWriters.TYPE_FLOAT_ARRAY); + } + + @Override + int elementSizeBytes() + { + return Float.BYTES; + } + + @Override + void putNull(WritableMemory memory, long offset) + { + memory.putFloat(offset, 0f); + } + + @Override + void putArrayElement(WritableMemory memory, long offset, Number element) + { + // The element is of type Float, and non-null, therefore it can be cast safely + memory.putFloat(offset, (float) element); + } +} diff --git a/processing/src/main/java/org/apache/druid/frame/write/columnar/FrameColumnWriter.java b/processing/src/main/java/org/apache/druid/frame/write/columnar/FrameColumnWriter.java index bb954d6c366..0a91a58bd83 100644 --- a/processing/src/main/java/org/apache/druid/frame/write/columnar/FrameColumnWriter.java +++ b/processing/src/main/java/org/apache/druid/frame/write/columnar/FrameColumnWriter.java @@ -23,14 +23,34 @@ import org.apache.datasketches.memory.WritableMemory; import java.io.Closeable; +/** + * Represent writers for the columnar frames. + * + * The class objects must be provided with information on what values to write, usually provided as a + * {@link org.apache.druid.segment.ColumnValueSelector} and where to write to, usually temporary growable memory + * {@link #addSelection()} will be called repeatedly, as the current value to write gets updated. For the final write, + * call {@link #writeTo}, which will copy the values we have added so far to the destination memory. + */ public interface FrameColumnWriter extends Closeable { + /** + * Adds the current value to the writer + */ boolean addSelection(); + /** + * Reverts the last added value. Undo calls cannot be called in successsion + */ void undo(); + /** + * Size (in bytes) of the column data that will get written when {@link #writeTo} will be called + */ long size(); + /** + * Writes the value of the column to the provided memory at the given position + */ long writeTo(WritableMemory memory, long position); @Override diff --git a/processing/src/main/java/org/apache/druid/frame/write/columnar/FrameColumnWriters.java b/processing/src/main/java/org/apache/druid/frame/write/columnar/FrameColumnWriters.java index 8c5dbe75853..93f0c12bae6 100644 --- a/processing/src/main/java/org/apache/druid/frame/write/columnar/FrameColumnWriters.java +++ b/processing/src/main/java/org/apache/druid/frame/write/columnar/FrameColumnWriters.java @@ -41,6 +41,9 @@ public class FrameColumnWriters public static final byte TYPE_STRING = 4; public static final byte TYPE_COMPLEX = 5; public static final byte TYPE_STRING_ARRAY = 6; + public static final byte TYPE_LONG_ARRAY = 7; + public static final byte TYPE_FLOAT_ARRAY = 8; + public static final byte TYPE_DOUBLE_ARRAY = 9; private FrameColumnWriters() { @@ -76,6 +79,12 @@ public class FrameColumnWriters switch (type.getElementType().getType()) { case STRING: return makeStringArrayWriter(columnSelectorFactory, allocator, column); + case LONG: + return makeLongArrayWriter(columnSelectorFactory, allocator, column); + case FLOAT: + return makeFloatArrayWriter(columnSelectorFactory, allocator, column); + case DOUBLE: + return makeDoubleArrayWriter(columnSelectorFactory, allocator, column); default: throw new UnsupportedColumnTypeException(column, type); } @@ -144,6 +153,36 @@ public class FrameColumnWriters return new StringArrayFrameColumnWriterImpl(selector, allocator); } + private static NumericArrayFrameColumnWriter makeLongArrayWriter( + final ColumnSelectorFactory selectorFactory, + final MemoryAllocator allocator, + final String columnName + ) + { + final ColumnValueSelector selector = selectorFactory.makeColumnValueSelector(columnName); + return new LongArrayFrameColumnWriter(selector, allocator); + } + + private static NumericArrayFrameColumnWriter makeFloatArrayWriter( + final ColumnSelectorFactory selectorFactory, + final MemoryAllocator allocator, + final String columnName + ) + { + final ColumnValueSelector selector = selectorFactory.makeColumnValueSelector(columnName); + return new FloatArrayFrameColumnWriter(selector, allocator); + } + + private static NumericArrayFrameColumnWriter makeDoubleArrayWriter( + final ColumnSelectorFactory selectorFactory, + final MemoryAllocator allocator, + final String columnName + ) + { + final ColumnValueSelector selector = selectorFactory.makeColumnValueSelector(columnName); + return new DoubleArrayFrameColumnWriter(selector, allocator); + } + private static ComplexFrameColumnWriter makeComplexWriter( final ColumnSelectorFactory selectorFactory, final MemoryAllocator allocator, diff --git a/processing/src/main/java/org/apache/druid/frame/write/columnar/LongArrayFrameColumnWriter.java b/processing/src/main/java/org/apache/druid/frame/write/columnar/LongArrayFrameColumnWriter.java new file mode 100644 index 00000000000..cc26fd3a36a --- /dev/null +++ b/processing/src/main/java/org/apache/druid/frame/write/columnar/LongArrayFrameColumnWriter.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.frame.write.columnar; + +import org.apache.datasketches.memory.WritableMemory; +import org.apache.druid.frame.allocation.MemoryAllocator; +import org.apache.druid.segment.ColumnValueSelector; + +/** + * Columnar frame writer for {@link org.apache.druid.segment.column.ColumnType#LONG_ARRAY} columns + */ +public class LongArrayFrameColumnWriter extends NumericArrayFrameColumnWriter +{ + public LongArrayFrameColumnWriter( + ColumnValueSelector selector, + MemoryAllocator allocator + ) + { + super(selector, allocator, FrameColumnWriters.TYPE_LONG_ARRAY); + } + + @Override + int elementSizeBytes() + { + return Long.BYTES; + } + + @Override + void putNull(WritableMemory memory, long offset) + { + memory.putLong(offset, 0L); + } + + @Override + void putArrayElement(WritableMemory memory, long offset, Number element) + { + // The element is of type Long, and non-null, therefore it can be casted safely + memory.putLong(offset, (long) element); + } +} diff --git a/processing/src/main/java/org/apache/druid/frame/write/columnar/NumericArrayFrameColumnWriter.java b/processing/src/main/java/org/apache/druid/frame/write/columnar/NumericArrayFrameColumnWriter.java new file mode 100644 index 00000000000..619bf53b8d3 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/frame/write/columnar/NumericArrayFrameColumnWriter.java @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.frame.write.columnar; + +import org.apache.datasketches.memory.WritableMemory; +import org.apache.druid.error.DruidException; +import org.apache.druid.frame.allocation.AppendableMemory; +import org.apache.druid.frame.allocation.MemoryAllocator; +import org.apache.druid.frame.allocation.MemoryRange; +import org.apache.druid.frame.write.FrameWriterUtils; +import org.apache.druid.segment.ColumnValueSelector; + +import java.util.List; + +/** + * Parent class for the family of writers writing numeric arrays in columnar frames. Since the numeric primitives are + * fixed width, we don't need to store the width of each element. The memory layout of a column written by this writer + * is as follows: + * + * n : Total number of rows + * k : Total number of elements in all the rows, cumulative + * + * | Section | Length of the section | Denotion | + * |---------|-----------------------|--------------------------------------------------------------------------------------| + * | 0 | 1 | typeCode | + * | 1 | n * Integer.BYTES | n integers, where i-th integer represents the cumulative length of the array | + * | 2 | k * Byte.BYTES | k bytes, where i-th byte represent whether the i-th value from the start is null | + * | 3 | k * ELEMENT_SIZE | k values, each representing the element, or null equivalent value (e.g 0 for double) | + * + * Note on cumulative lengths stored in section 1: Cumulative lengths are stored so that its fast to offset into the + * elements of the array. We also use negative cumulative length to denote that the array itself is null (as opposed to + * individual elements being null, which we store in section 2) + */ +public abstract class NumericArrayFrameColumnWriter implements FrameColumnWriter +{ + /** + * Equivalent to {@link AppendableMemory#DEFAULT_INITIAL_ALLOCATION_SIZE} / 3, since the memory would be further split + * up into three regions + */ + private static final int INITIAL_ALLOCATION_SIZE = 120; + + public static final byte NULL_ELEMENT_MARKER = 0x00; + public static final byte NON_NULL_ELEMENT_MARKER = 0x01; + + /** + * A byte required at the beginning for type code + */ + public static final long DATA_OFFSET = 1; + + final ColumnValueSelector selector; + final byte typeCode; + + /** + * Row lengths: one int per row with the number of values contained by that row and all previous rows. + * Only written for multi-value and array columns. When the corresponding row is null itself, the length is + * written as -(actual length) - 1. (Guaranteed to be a negative number even if "actual length" is zero.) + */ + private final AppendableMemory cumulativeRowLengths; + + /** + * Denotes if the element of the row is null or not + */ + private final AppendableMemory rowNullityData; + + /** + * Row data. + */ + private final AppendableMemory rowData; + + private int lastCumulativeRowLength = 0; + private int lastRowLength = -1; + + + public NumericArrayFrameColumnWriter( + final ColumnValueSelector selector, + final MemoryAllocator allocator, + final byte typeCode + ) + { + this.selector = selector; + this.typeCode = typeCode; + this.cumulativeRowLengths = AppendableMemory.create(allocator, INITIAL_ALLOCATION_SIZE); + this.rowNullityData = AppendableMemory.create(allocator, INITIAL_ALLOCATION_SIZE); + this.rowData = AppendableMemory.create(allocator, INITIAL_ALLOCATION_SIZE); + } + + /** + * Returns the size of the elements of the array + */ + abstract int elementSizeBytes(); + + /** + * Inserts default null value in the given memory location at the provided offset. + */ + abstract void putNull(WritableMemory memory, long offset); + + /** + * Inserts the element value in the given memory location at the provided offset. + */ + abstract void putArrayElement(WritableMemory memory, long offset, Number element); + + @Override + public boolean addSelection() + { + List numericArray = FrameWriterUtils.getNumericArrayFromObject(selector.getObject()); + int rowLength = numericArray == null ? 0 : numericArray.size(); + + // Begin memory allocations before writing + if ((long) lastCumulativeRowLength + rowLength > Integer.MAX_VALUE) { + return false; + } + + if (!cumulativeRowLengths.reserveAdditional(Integer.BYTES)) { + return false; + } + + if (!rowNullityData.reserveAdditional(rowLength * Byte.BYTES)) { + return false; + } + + if (!rowData.reserveAdditional(rowLength * elementSizeBytes())) { + return false; + } + // Memory allocations completed + + final MemoryRange rowLengthsCursor = cumulativeRowLengths.cursor(); + + if (numericArray == null) { + rowLengthsCursor.memory().putInt(rowLengthsCursor.start(), -(lastCumulativeRowLength + rowLength) - 1); + } else { + rowLengthsCursor.memory().putInt(rowLengthsCursor.start(), lastCumulativeRowLength + rowLength); + } + cumulativeRowLengths.advanceCursor(Integer.BYTES); + lastRowLength = rowLength; + lastCumulativeRowLength += rowLength; + + final MemoryRange rowNullityDataCursor = rowLength > 0 ? rowNullityData.cursor() : null; + final MemoryRange rowDataCursor = rowLength > 0 ? rowData.cursor() : null; + + for (int i = 0; i < rowLength; ++i) { + final Number element = numericArray.get(i); + final long memoryOffset = rowDataCursor.start() + ((long) elementSizeBytes() * i); + if (element == null) { + rowNullityDataCursor.memory() + .putByte(rowNullityDataCursor.start() + (long) Byte.BYTES * i, NULL_ELEMENT_MARKER); + putNull(rowDataCursor.memory(), memoryOffset); + } else { + rowNullityDataCursor.memory() + .putByte(rowNullityDataCursor.start() + (long) Byte.BYTES * i, NON_NULL_ELEMENT_MARKER); + putArrayElement(rowDataCursor.memory(), memoryOffset, element); + } + } + + if (rowLength > 0) { + rowNullityData.advanceCursor(Byte.BYTES * rowLength); + rowData.advanceCursor(elementSizeBytes() * rowLength); + } + + return true; + } + + @Override + public void undo() + { + if (lastRowLength == -1) { + throw DruidException.defensive("Nothing written to undo()"); + } + + cumulativeRowLengths.rewindCursor(Integer.BYTES); + rowNullityData.rewindCursor(lastRowLength * Byte.BYTES); + rowData.rewindCursor(lastRowLength * elementSizeBytes()); + + lastCumulativeRowLength -= lastRowLength; + // Multiple undo calls cannot be chained together + lastRowLength = -1; + } + + @Override + public long size() + { + return DATA_OFFSET + cumulativeRowLengths.size() + rowNullityData.size() + rowData.size(); + } + + @Override + public long writeTo(final WritableMemory memory, final long startPosition) + { + long currentPosition = startPosition; + + memory.putByte(currentPosition, typeCode); + ++currentPosition; + + currentPosition += cumulativeRowLengths.writeTo(memory, currentPosition); + currentPosition += rowNullityData.writeTo(memory, currentPosition); + currentPosition += rowData.writeTo(memory, currentPosition); + + return currentPosition - startPosition; + } + + @Override + public void close() + { + cumulativeRowLengths.close(); + rowNullityData.close(); + rowData.close(); + } +} diff --git a/processing/src/test/java/org/apache/druid/frame/write/FrameWriterTest.java b/processing/src/test/java/org/apache/druid/frame/write/FrameWriterTest.java index 16a1b667556..61b01ad0646 100644 --- a/processing/src/test/java/org/apache/druid/frame/write/FrameWriterTest.java +++ b/processing/src/test/java/org/apache/druid/frame/write/FrameWriterTest.java @@ -244,27 +244,18 @@ public class FrameWriterTest extends InitializedNullHandlingTest @Test public void test_arrayLong() { - // ARRAY can't be read or written for columnar frames, therefore skip the check if it encounters those - // parameters - Assume.assumeFalse(inputFrameType == FrameType.COLUMNAR || outputFrameType == FrameType.COLUMNAR); testWithDataset(FrameWriterTestData.TEST_ARRAYS_LONG); } @Test public void test_arrayFloat() { - // ARRAY can't be read or written for columnar frames, therefore skip the check if it encounters those - // parameters - Assume.assumeFalse(inputFrameType == FrameType.COLUMNAR || outputFrameType == FrameType.COLUMNAR); testWithDataset(FrameWriterTestData.TEST_ARRAYS_FLOAT); } @Test public void test_arrayDouble() { - // ARRAY can't be read or written for columnar frames, therefore skip the check if it encounters those - // parameters - Assume.assumeFalse(inputFrameType == FrameType.COLUMNAR || outputFrameType == FrameType.COLUMNAR); testWithDataset(FrameWriterTestData.TEST_ARRAYS_DOUBLE); } diff --git a/processing/src/test/java/org/apache/druid/frame/write/FrameWritersTest.java b/processing/src/test/java/org/apache/druid/frame/write/FrameWritersTest.java index 9d359eed05e..71c67deadb0 100644 --- a/processing/src/test/java/org/apache/druid/frame/write/FrameWritersTest.java +++ b/processing/src/test/java/org/apache/druid/frame/write/FrameWritersTest.java @@ -67,7 +67,16 @@ public class FrameWritersTest extends InitializedNullHandlingTest final FrameWriterFactory factory = FrameWriters.makeFrameWriterFactory( FrameType.COLUMNAR, new ArenaMemoryAllocatorFactory(ALLOCATOR_CAPACITY), - RowSignature.builder().add("x", ColumnType.LONG).build(), + RowSignature.builder() + .add("a", ColumnType.LONG) + .add("b", ColumnType.FLOAT) + .add("c", ColumnType.DOUBLE) + .add("d", ColumnType.STRING) + .add("e", ColumnType.LONG_ARRAY) + .add("f", ColumnType.FLOAT_ARRAY) + .add("g", ColumnType.DOUBLE_ARRAY) + .add("h", ColumnType.STRING_ARRAY) + .build(), Collections.emptyList() ); @@ -81,7 +90,7 @@ public class FrameWritersTest extends InitializedNullHandlingTest final FrameWriterFactory factory = FrameWriters.makeFrameWriterFactory( FrameType.COLUMNAR, new ArenaMemoryAllocatorFactory(ALLOCATOR_CAPACITY), - RowSignature.builder().add("x", ColumnType.LONG_ARRAY).build(), + RowSignature.builder().add("x", ColumnType.ofArray(ColumnType.LONG_ARRAY)).build(), Collections.emptyList() ); @@ -91,7 +100,7 @@ public class FrameWritersTest extends InitializedNullHandlingTest ); Assert.assertEquals("x", e.getColumnName()); - Assert.assertEquals(ColumnType.LONG_ARRAY, e.getColumnType()); + Assert.assertEquals(ColumnType.ofArray(ColumnType.LONG_ARRAY), e.getColumnType()); } @Test