Frames support for string arrays that are null. (#14653)

* Frames support for string arrays that are null.

The row format represents null arrays as 0x0001, which older readers
would interpret as an empty array. This provides compatibility with
older readers, which is useful during updates.

The column format represents null arrays by writing -(actual length) - 1
instead of the length, and using FrameColumnWriters.TYPE_STRING_ARRAY for
the type code for string arrays generally. Older readers will report this
as an unrecognized type code. Column format is only used by the operator
query, which is currently experimental, so the impact isn't too severe.

* Remove unused import.

* Return Object[] instead of List from frame array selectors.

Update MSQSelectTest and MSQInsertTest to reflect the fact that null
arrays are possible.

Add a bunch of javadocs to object selectors describing expected behavior,
including the requirement that array selectors return Object[].

* update test case.

* Update test cases.
This commit is contained in:
Gian Merlino 2023-07-28 10:23:39 -07:00 committed by GitHub
parent 22290fd632
commit 46ecc6b900
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 993 additions and 409 deletions

View File

@ -1,56 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.msq;
import com.google.common.collect.Iterables;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexStorageAdapter;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import java.util.Optional;
public class TestArrayStorageAdapter extends QueryableIndexStorageAdapter
{
public TestArrayStorageAdapter(QueryableIndex index)
{
super(index);
}
@Override
public RowSignature getRowSignature()
{
final RowSignature.Builder builder = RowSignature.builder();
builder.addTimeColumn();
for (final String column : Iterables.concat(getAvailableDimensions(), getAvailableMetrics())) {
Optional<ColumnCapabilities> columnCapabilities = Optional.ofNullable(getColumnCapabilities(column));
ColumnType columnType = columnCapabilities.isPresent() ? columnCapabilities.get().toColumnType() : null;
//change MV columns to Array<String>
if (columnCapabilities.isPresent() && columnCapabilities.get().hasMultipleValues().isMaybeTrue()) {
columnType = ColumnType.STRING_ARRAY;
}
builder.add(column, columnType);
}
return builder.build();
}
}

View File

@ -647,12 +647,12 @@ public class MSQInsertTest extends MSQTestBase
.setExpectedResultRows(
NullHandling.replaceWithDefault() ?
ImmutableList.of(
new Object[]{0L, new Object[]{null}},
new Object[]{0L, null},
new Object[]{0L, new Object[]{"a", "b"}},
new Object[]{0L, new Object[]{"b", "c"}},
new Object[]{0L, new Object[]{"d"}}
) : ImmutableList.of(
new Object[]{0L, new Object[]{null}},
new Object[]{0L, null},
new Object[]{0L, new Object[]{"a", "b"}},
new Object[]{0L, new Object[]{""}},
new Object[]{0L, new Object[]{"b", "c"}},

View File

@ -1500,9 +1500,9 @@ public class MSQSelectTest extends MSQTestBase
new Object[]{"[\"a\",\"b\"]", ImmutableList.of("a", "b")},
new Object[]{"[\"b\",\"c\"]", ImmutableList.of("b", "c")},
new Object[]{"d", ImmutableList.of("d")},
new Object[]{"", Collections.singletonList(useDefault ? null : "")},
new Object[]{NullHandling.defaultStringValue(), Collections.singletonList(null)},
new Object[]{NullHandling.defaultStringValue(), Collections.singletonList(null)}
new Object[]{"", useDefault ? null : Collections.singletonList("")},
new Object[]{NullHandling.defaultStringValue(), null},
new Object[]{NullHandling.defaultStringValue(), null}
)).verifyResults();
}
@ -1709,7 +1709,7 @@ public class MSQSelectTest extends MSQTestBase
.build();
ArrayList<Object[]> expected = new ArrayList<>();
expected.add(new Object[]{Collections.singletonList(null), !useDefault ? 2L : 3L});
expected.add(new Object[]{null, !useDefault ? 2L : 3L});
if (!useDefault) {
expected.add(new Object[]{Collections.singletonList(""), 1L});
}
@ -2136,7 +2136,7 @@ public class MSQSelectTest extends MSQTestBase
private List<Object[]> expectedMultiValueFooRowsGroupByList()
{
ArrayList<Object[]> expected = new ArrayList<>();
expected.add(new Object[]{Collections.singletonList(null), !useDefault ? 2L : 3L});
expected.add(new Object[]{null, !useDefault ? 2L : 3L});
if (!useDefault) {
expected.add(new Object[]{Collections.singletonList(""), 1L});
}

View File

@ -32,7 +32,6 @@ import org.apache.druid.segment.serde.ComplexMetricSerde;
import org.apache.druid.segment.serde.ComplexMetrics;
import javax.annotation.Nullable;
import java.util.List;
/**
* Helper used to write field values to row-based frames or {@link RowKey}.
@ -128,8 +127,8 @@ public class FieldWriters
final String columnName
)
{
//noinspection unchecked
final ColumnValueSelector<List<String>> selector = selectorFactory.makeColumnValueSelector(columnName);
//noinspection rawtypes
final ColumnValueSelector selector = selectorFactory.makeColumnValueSelector(columnName);
return new StringArrayFieldWriter(selector);
}

View File

@ -21,10 +21,9 @@ package org.apache.druid.frame.field;
import org.apache.datasketches.memory.WritableMemory;
import org.apache.druid.frame.write.FrameWriterUtils;
import org.apache.druid.segment.BaseObjectColumnValueSelector;
import org.apache.druid.segment.ColumnValueSelector;
import java.util.List;
/**
* Like {@link StringFieldWriter}, but reads arrays from a {@link ColumnValueSelector} instead of reading from
* a {@link org.apache.druid.segment.DimensionSelector}.
@ -33,9 +32,9 @@ import java.util.List;
*/
public class StringArrayFieldWriter implements FieldWriter
{
private final ColumnValueSelector<List<String>> selector;
private final BaseObjectColumnValueSelector<?> selector;
public StringArrayFieldWriter(final ColumnValueSelector<List<String>> selector)
public StringArrayFieldWriter(final BaseObjectColumnValueSelector<?> selector)
{
this.selector = selector;
}

View File

@ -21,6 +21,7 @@ package org.apache.druid.frame.field;
import com.google.common.base.Predicate;
import com.google.common.primitives.Ints;
import it.unimi.dsi.fastutil.objects.ObjectArrays;
import org.apache.datasketches.memory.Memory;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.frame.read.FrameReaderUtils;
@ -34,13 +35,14 @@ import org.apache.druid.segment.DimensionSelector;
import org.apache.druid.segment.DimensionSelectorUtils;
import org.apache.druid.segment.IdLookup;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.data.IndexedInts;
import org.apache.druid.segment.data.RangeIndexedInts;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Arrays;
import java.util.List;
/**
@ -50,9 +52,14 @@ import java.util.List;
* appears in valid UTF8 encodings if and only if the string contains a NUL (char 0). Therefore, this field writer
* cannot write out strings containing NUL characters.
*
* Rows are terminated by {@link StringFieldWriter#ROW_TERMINATOR}.
* All rows are terminated by {@link StringFieldWriter#ROW_TERMINATOR}.
*
* Nulls are stored as {@link StringFieldWriter#NULL_BYTE}. All other strings are prepended by
* Empty rows are represented in one byte: solely that {@link StringFieldWriter#ROW_TERMINATOR}. Rows that are null
* themselves (i.e., a null array) are represented as a {@link StringFieldWriter#NULL_ROW} followed by a
* {@link StringFieldWriter#ROW_TERMINATOR}. This encoding for null arrays is decoded by older readers as an
* empty array; null arrays are a feature that did not exist in earlier versions of the code.
*
* Null strings are stored as {@link StringFieldWriter#NULL_BYTE}. All other strings are prepended by
* {@link StringFieldWriter#NOT_NULL_BYTE} byte to differentiate them from nulls.
*
* This encoding allows the encoded data to be compared as bytes in a way that matches the behavior of
@ -63,6 +70,13 @@ public class StringFieldReader implements FieldReader
{
private final boolean asArray;
/**
* Create a string reader.
*
* @param asArray if false, selectors from {@link #makeColumnValueSelector} behave like {@link ValueType#STRING}
* selectors (potentially multi-value ones). If true, selectors from {@link #makeColumnValueSelector}
* behave like string array selectors.
*/
StringFieldReader(final boolean asArray)
{
this.asArray = asArray;
@ -91,13 +105,17 @@ public class StringFieldReader implements FieldReader
@Override
public boolean isNull(Memory memory, long position)
{
final byte nullByte = memory.getByte(position);
assert nullByte == StringFieldWriter.NULL_BYTE || nullByte == StringFieldWriter.NOT_NULL_BYTE;
final byte firstByte = memory.getByte(position);
// When NullHandling.replaceWithDefault(), empty strings are considered nulls as well.
return (NullHandling.replaceWithDefault() || nullByte == StringFieldWriter.NULL_BYTE)
&& memory.getByte(position + 1) == StringFieldWriter.VALUE_TERMINATOR
&& memory.getByte(position + 2) == StringFieldWriter.ROW_TERMINATOR;
if (firstByte == StringFieldWriter.NULL_ROW) {
return true;
} else if (!asArray) {
return (NullHandling.replaceWithDefault() || firstByte == StringFieldWriter.NULL_BYTE)
&& memory.getByte(position + 1) == StringFieldWriter.VALUE_TERMINATOR
&& memory.getByte(position + 2) == StringFieldWriter.ROW_TERMINATOR;
} else {
return false;
}
}
@Override
@ -119,8 +137,19 @@ public class StringFieldReader implements FieldReader
private long currentFieldPosition = -1;
private final RangeIndexedInts indexedInts = new RangeIndexedInts();
/**
* Current UTF-8 buffers, updated by {@link #computeCurrentUtf8Strings()}. Readers must only use this if
* {@link #currentUtf8StringsIsNull} is false.
*/
private final List<ByteBuffer> currentUtf8Strings = new ArrayList<>();
/**
* If true, {@link #currentUtf8Strings} must be ignored by readers, and null must be used instead. This is done
* instead of nulling out {@link #currentUtf8Strings} to save on garbage.
*/
private boolean currentUtf8StringsIsNull;
private Selector(
final Memory memory,
final ReadableFieldPointer fieldPointer,
@ -139,25 +168,32 @@ public class StringFieldReader implements FieldReader
public Object getObject()
{
final List<ByteBuffer> currentStrings = computeCurrentUtf8Strings();
if (currentStrings == null) {
return null;
}
final int size = currentStrings.size();
if (size == 0) {
return asArray ? Collections.emptyList() : null;
return asArray ? ObjectArrays.EMPTY_ARRAY : null;
} else if (size == 1) {
return asArray ? Collections.singletonList(lookupName(0)) : lookupName(0);
return asArray ? new Object[]{lookupName(0)} : lookupName(0);
} else {
final List<String> strings = new ArrayList<>(size);
final Object[] strings = new Object[size];
for (int i = 0; i < size; i++) {
strings.add(lookupName(i));
strings[i] = lookupName(i);
}
return strings;
return asArray ? strings : Arrays.asList(strings);
}
}
@Override
public IndexedInts getRow()
{
indexedInts.setSize(computeCurrentUtf8Strings().size());
final List<ByteBuffer> strings = computeCurrentUtf8Strings();
final int size = strings == null ? 0 : strings.size();
indexedInts.setSize(size);
return indexedInts;
}
@ -165,9 +201,15 @@ public class StringFieldReader implements FieldReader
@Override
public String lookupName(int id)
{
final ByteBuffer byteBuffer = computeCurrentUtf8Strings().get(id);
final String s = byteBuffer != null ? StringUtils.fromUtf8(byteBuffer.duplicate()) : null;
return extractionFn == null ? s : extractionFn.apply(s);
final List<ByteBuffer> strings = computeCurrentUtf8Strings();
if (strings == null) {
return null;
} else {
final ByteBuffer byteBuffer = strings.get(id);
final String s = byteBuffer != null ? StringUtils.fromUtf8(byteBuffer.duplicate()) : null;
return extractionFn == null ? s : extractionFn.apply(s);
}
}
@Override
@ -184,7 +226,8 @@ public class StringFieldReader implements FieldReader
throw new ISE("Cannot use lookupNameUtf8 on this selector");
}
return computeCurrentUtf8Strings().get(id);
final List<ByteBuffer> strings = computeCurrentUtf8Strings();
return strings == null ? null : strings.get(id);
}
@Override
@ -233,6 +276,7 @@ public class StringFieldReader implements FieldReader
/**
* Update {@link #currentUtf8Strings} if needed, then return it.
*/
@Nullable
private List<ByteBuffer> computeCurrentUtf8Strings()
{
final long fieldPosition = fieldPointer.position();
@ -242,11 +286,17 @@ public class StringFieldReader implements FieldReader
}
this.currentFieldPosition = fieldPosition;
return currentUtf8Strings;
if (currentUtf8StringsIsNull) {
return null;
} else {
return currentUtf8Strings;
}
}
private void updateCurrentUtf8Strings(final long fieldPosition)
{
currentUtf8StringsIsNull = false;
currentUtf8Strings.clear();
long position = fieldPosition;
@ -259,7 +309,12 @@ public class StringFieldReader implements FieldReader
position++;
switch (kind) {
case StringFieldWriter.VALUE_TERMINATOR:
case StringFieldWriter.VALUE_TERMINATOR: // Or NULL_ROW (same byte value)
if (position == fieldPosition + 1) {
// It was NULL_ROW.
currentUtf8StringsIsNull = true;
}
// Skip; next byte will be a null/not-null byte or a row terminator.
break;

View File

@ -23,6 +23,7 @@ import org.apache.datasketches.memory.WritableMemory;
import org.apache.druid.frame.write.FrameWriterUtils;
import org.apache.druid.segment.DimensionSelector;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.List;
@ -36,12 +37,22 @@ public class StringFieldWriter implements FieldWriter
public static final byte VALUE_TERMINATOR = (byte) 0x00;
public static final byte ROW_TERMINATOR = (byte) 0x01;
// Different from the values in NullHandling, since we want to be able to sort as bytes, and we want
// nulls to come before non-nulls.
/**
* Null rows are represented by {@code NULL_ROW}. Same byte value as {@link #VALUE_TERMINATOR}, but not ambiguous:
* {@code NULL_ROW} can only occur as the first byte in a row, and {@link #VALUE_TERMINATOR} can never occur as
* the first byte in a row.
*/
public static final byte NULL_ROW = 0x00;
/**
* Different from the values in {@link org.apache.druid.common.config.NullHandling}, since we want to be able to
* sort as bytes, and we want nulls to come before non-nulls.
*/
public static final byte NULL_BYTE = 0x02;
public static final byte NOT_NULL_BYTE = 0x03;
private static final int ROW_OVERHEAD_BYTES = 3; // Null byte + value terminator + row terminator
private static final int NONNULL_ROW_MINIMUM_SIZE = 3; // NULL_BYTE + VALUE_TERMINATOR + ROW_TERMINATOR
private static final byte NULL_ROW_SIZE = 2; // NULL_ROW + ROW_TERMINATOR
private final DimensionSelector selector;
@ -67,21 +78,36 @@ public class StringFieldWriter implements FieldWriter
* Writes a collection of UTF-8 buffers in string-field format. Helper for {@link #writeTo}.
* All buffers must be nonnull. Null strings must be represented as {@link FrameWriterUtils#NULL_STRING_MARKER_ARRAY}.
*
* @param memory destination memory
* @param position position in memory to write to
* @param maxSize maximum number of bytes to write to memory
* @param byteBuffers utf8 string array to write to memory
*
* @return number of bytes written, or -1 if "maxSize" was not enough memory
*/
static long writeUtf8ByteBuffers(
final WritableMemory memory,
final long position,
final long maxSize,
final List<ByteBuffer> byteBuffers
@Nullable final List<ByteBuffer> byteBuffers
)
{
if (byteBuffers == null) {
if (maxSize < NULL_ROW_SIZE) {
return -1;
}
memory.putByte(position, NULL_ROW);
memory.putByte(position + 1, ROW_TERMINATOR);
return NULL_ROW_SIZE;
}
long written = 0;
for (final ByteBuffer utf8Datum : byteBuffers) {
final int len = utf8Datum.remaining();
if (written + ROW_OVERHEAD_BYTES > maxSize) {
if (written + NONNULL_ROW_MINIMUM_SIZE > maxSize) {
return -1;
}
@ -91,7 +117,7 @@ public class StringFieldWriter implements FieldWriter
written++;
} else {
// Not null.
if (written + len + ROW_OVERHEAD_BYTES > maxSize) {
if (written + len + NONNULL_ROW_MINIMUM_SIZE > maxSize) {
return -1;
}

View File

@ -125,7 +125,7 @@ public class RowKeyReader
rowReader.fieldReader(fieldNumber)
.makeColumnValueSelector(keyMemory, fieldPointer);
return selector.getObject() instanceof List;
return selector.getObject() instanceof List || selector.getObject() instanceof Object[];
}
}
}

View File

@ -22,8 +22,10 @@ package org.apache.druid.frame.read.columnar;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Predicate;
import com.google.common.primitives.Ints;
import it.unimi.dsi.fastutil.objects.ObjectArrays;
import org.apache.datasketches.memory.Memory;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.error.DruidException;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.read.FrameReaderUtils;
import org.apache.druid.frame.write.FrameWriterUtils;
@ -43,7 +45,6 @@ import org.apache.druid.segment.DimensionDictionarySelector;
import org.apache.druid.segment.DimensionSelector;
import org.apache.druid.segment.DimensionSelectorUtils;
import org.apache.druid.segment.IdLookup;
import org.apache.druid.segment.ObjectColumnSelector;
import org.apache.druid.segment.column.BaseColumn;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
@ -60,16 +61,25 @@ import org.apache.druid.segment.vector.VectorObjectSelector;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
/**
* Reader for {@link StringFrameColumnWriter}, types {@link ColumnType#STRING} and {@link ColumnType#STRING_ARRAY}.
*/
public class StringFrameColumnReader implements FrameColumnReader
{
private final int columnNumber;
private final boolean asArray;
/**
* Create a new reader.
*
* @param columnNumber column number
* @param asArray true for {@link ColumnType#STRING_ARRAY}, false for {@link ColumnType#STRING}
*/
StringFrameColumnReader(int columnNumber, boolean asArray)
{
this.columnNumber = columnNumber;
@ -91,10 +101,10 @@ public class StringFrameColumnReader implements FrameColumnReader
final long positionOfLengths = getStartOfStringLengthSection(frame.numRows(), false);
final long positionOfPayloads = getStartOfStringDataSection(memory, frame.numRows(), false);
StringFrameColumn frameCol = new StringFrameColumn(frame, false, memory, positionOfLengths, positionOfPayloads);
StringFrameColumn frameCol =
new StringFrameColumn(frame, false, memory, positionOfLengths, positionOfPayloads, false);
return new ColumnAccessorBasedColumn(frameCol);
}
@Override
@ -123,7 +133,8 @@ public class StringFrameColumnReader implements FrameColumnReader
multiValue,
memory,
startOfStringLengthSection,
startOfStringDataSection
startOfStringDataSection,
false
);
}
@ -143,12 +154,18 @@ public class StringFrameColumnReader implements FrameColumnReader
{
// Check if column is big enough for a header
if (region.getCapacity() < StringFrameColumnWriter.DATA_OFFSET) {
throw new ISE("Column is not big enough for a header");
throw DruidException.defensive("Column[%s] is not big enough for a header", columnNumber);
}
final byte typeCode = region.getByte(0);
if (typeCode != FrameColumnWriters.TYPE_STRING) {
throw new ISE("Column does not have the correct type code");
final byte expectedTypeCode = asArray ? FrameColumnWriters.TYPE_STRING_ARRAY : FrameColumnWriters.TYPE_STRING;
if (typeCode != expectedTypeCode) {
throw DruidException.defensive(
"Column[%s] does not have the correct type code; expected[%s], got[%s]",
columnNumber,
expectedTypeCode,
typeCode
);
}
}
@ -157,15 +174,42 @@ public class StringFrameColumnReader implements FrameColumnReader
return memory.getByte(1) == 1;
}
private static int getCumulativeRowLength(
final Memory memory,
final int physicalRow
)
/**
* Returns cumulative row length, if the row is not null itself, or -(cumulative row length) - 1 if the row is
* null itself.
*
* To check if the return value from this function indicate a null row, use {@link #isNullRow(int)}
*
* To get the actual cumulative row length, use {@link #adjustCumulativeRowLength(int)}.
*/
private static int getCumulativeRowLength(final Memory memory, final int physicalRow)
{
// Note: only valid to call this if multiValue = true.
return memory.getInt(StringFrameColumnWriter.DATA_OFFSET + (long) Integer.BYTES * physicalRow);
}
/**
* When given a return value from {@link #getCumulativeRowLength(Memory, int)}, returns whether the row is
* null itself (i.e. a null array).
*/
private static boolean isNullRow(final int cumulativeRowLength)
{
return cumulativeRowLength < 0;
}
/**
* Adjusts a negative cumulative row length from {@link #getCumulativeRowLength(Memory, int)} to be the actual
* positive length.
*/
private static int adjustCumulativeRowLength(final int cumulativeRowLength)
{
if (cumulativeRowLength < 0) {
return -(cumulativeRowLength + 1);
} else {
return cumulativeRowLength;
}
}
private static long getStartOfStringLengthSection(
final int numRows,
final boolean multiValue
@ -187,7 +231,7 @@ public class StringFrameColumnReader implements FrameColumnReader
final int totalNumValues;
if (multiValue) {
totalNumValues = getCumulativeRowLength(memory, numRows - 1);
totalNumValues = adjustCumulativeRowLength(getCumulativeRowLength(memory, numRows - 1));
} else {
totalNumValues = numRows;
}
@ -199,17 +243,27 @@ public class StringFrameColumnReader implements FrameColumnReader
static class StringFrameColumn extends ObjectColumnAccessorBase implements DictionaryEncodedColumn<String>
{
private final Frame frame;
private final boolean multiValue;
private final Memory memory;
private final long startOfStringLengthSection;
private final long startOfStringDataSection;
/**
* Whether the column is stored in multi-value format.
*/
private final boolean multiValue;
/**
* Whether the column is being read as {@link ColumnType#STRING_ARRAY} (true) or {@link ColumnType#STRING} (false).
*/
private final boolean asArray;
private StringFrameColumn(
Frame frame,
boolean multiValue,
Memory memory,
long startOfStringLengthSection,
long startOfStringDataSection
long startOfStringDataSection,
final boolean asArray
)
{
this.frame = frame;
@ -217,6 +271,7 @@ public class StringFrameColumnReader implements FrameColumnReader
this.memory = memory;
this.startOfStringLengthSection = startOfStringLengthSection;
this.startOfStringDataSection = startOfStringDataSection;
this.asArray = asArray;
}
@Override
@ -264,142 +319,11 @@ public class StringFrameColumnReader implements FrameColumnReader
@Override
public DimensionSelector makeDimensionSelector(ReadableOffset offset, @Nullable ExtractionFn extractionFn)
{
if (multiValue) {
class MultiValueSelector implements DimensionSelector
{
private int currentRow = -1;
private List<ByteBuffer> currentValues = null;
private final RangeIndexedInts indexedInts = new RangeIndexedInts();
@Override
public int getValueCardinality()
{
return CARDINALITY_UNKNOWN;
}
@Nullable
@Override
public String lookupName(int id)
{
populate();
final ByteBuffer buf = currentValues.get(id);
final String s = buf == null ? null : StringUtils.fromUtf8(buf.duplicate());
return extractionFn == null ? s : extractionFn.apply(s);
}
@Nullable
@Override
public ByteBuffer lookupNameUtf8(int id)
{
assert supportsLookupNameUtf8();
populate();
return currentValues.get(id);
}
@Override
public boolean supportsLookupNameUtf8()
{
return extractionFn == null;
}
@Override
public boolean nameLookupPossibleInAdvance()
{
return false;
}
@Nullable
@Override
public IdLookup idLookup()
{
return null;
}
@Override
public IndexedInts getRow()
{
populate();
return indexedInts;
}
@Override
public ValueMatcher makeValueMatcher(@Nullable String value)
{
return DimensionSelectorUtils.makeValueMatcherGeneric(this, value);
}
@Override
public ValueMatcher makeValueMatcher(Predicate<String> predicate)
{
return DimensionSelectorUtils.makeValueMatcherGeneric(this, predicate);
}
@Nullable
@Override
public Object getObject()
{
return defaultGetObject();
}
@Override
public Class<?> classOfObject()
{
return String.class;
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
// Do nothing.
}
private void populate()
{
final int row = offset.getOffset();
if (row != currentRow) {
currentValues = getRowAsListUtf8(frame.physicalRow(row));
indexedInts.setSize(currentValues.size());
currentRow = row;
}
}
}
return new MultiValueSelector();
} else {
class SingleValueSelector extends BaseSingleValueDimensionSelector
{
@Nullable
@Override
protected String getValue()
{
final String s = getString(frame.physicalRow(offset.getOffset()));
return extractionFn == null ? s : extractionFn.apply(s);
}
@Nullable
@Override
public ByteBuffer lookupNameUtf8(int id)
{
assert supportsLookupNameUtf8();
return getStringUtf8(frame.physicalRow(offset.getOffset()));
}
@Override
public boolean supportsLookupNameUtf8()
{
return extractionFn == null;
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
// Do nothing.
}
}
return new SingleValueSelector();
if (asArray) {
throw new ISE("Cannot call makeDimensionSelector on field of type [%s]", ColumnType.STRING_ARRAY);
}
return makeDimensionSelectorInternal(offset, extractionFn);
}
@Override
@ -487,7 +411,7 @@ public class StringFrameColumnReader implements FrameColumnReader
@Override
public ColumnType getType()
{
return ColumnType.STRING;
return asArray ? ColumnType.STRING_ARRAY : ColumnType.STRING;
}
@Override
@ -551,46 +475,220 @@ public class StringFrameColumnReader implements FrameColumnReader
}
}
/**
* Returns the object at the given physical row number.
*
* When {@link #asArray}, the return value is always of type {@code Object[]}. Otherwise, the return value
* is either an empty list (if the row is empty), a single String (if the row has one value), or a List
* of Strings (if the row has more than one value).
*
* @param physicalRow physical row number
* @param decode if true, return java.lang.String. If false, return UTF-8 ByteBuffer.
*/
@Nullable
private Object getRowAsObject(final int physicalRow, final boolean decode)
{
if (multiValue) {
final int cumulativeRowLength = getCumulativeRowLength(memory, physicalRow);
final int rowLength = physicalRow == 0
? cumulativeRowLength
: cumulativeRowLength - getCumulativeRowLength(memory, physicalRow - 1);
final int rowLength;
if (isNullRow(cumulativeRowLength)) {
return null;
} else if (physicalRow == 0) {
rowLength = cumulativeRowLength;
} else {
rowLength = cumulativeRowLength - adjustCumulativeRowLength(getCumulativeRowLength(memory, physicalRow - 1));
}
if (rowLength == 0) {
return Collections.emptyList();
return asArray ? ObjectArrays.EMPTY_ARRAY : Collections.emptyList();
} else if (rowLength == 1) {
final int index = cumulativeRowLength - 1;
return decode ? getString(index) : getStringUtf8(index);
final Object o = decode ? getString(index) : getStringUtf8(index);
return asArray ? new Object[]{o} : o;
} else {
final List<Object> row = new ArrayList<>(rowLength);
final Object[] row = new Object[rowLength];
for (int i = 0; i < rowLength; i++) {
final int index = cumulativeRowLength - rowLength + i;
row.add(decode ? getString(index) : getStringUtf8(index));
row[i] = decode ? getString(index) : getStringUtf8(index);
}
return row;
return asArray ? row : Arrays.asList(row);
}
} else {
return decode ? getString(physicalRow) : getStringUtf8(physicalRow);
final Object o = decode ? getString(physicalRow) : getStringUtf8(physicalRow);
return asArray ? new Object[]{o} : o;
}
}
/**
* Returns the value at the given physical row number as a list of ByteBuffers. Only valid when !asArray, i.e.,
* when type is {@link ColumnType#STRING}.
*
* @param physicalRow physical row number
*/
private List<ByteBuffer> getRowAsListUtf8(final int physicalRow)
{
if (asArray) {
throw DruidException.defensive("Unexpected call for array column");
}
final Object object = getRowAsObject(physicalRow, false);
if (object instanceof List) {
if (object == null) {
return Collections.singletonList(null);
} else if (object instanceof List) {
//noinspection unchecked
return (List<ByteBuffer>) object;
} else {
return Collections.singletonList((ByteBuffer) object);
}
}
/**
* Selector used by this column. It's versatile: it can run as string array (asArray = true) or regular string
* column (asArray = false).
*/
private DimensionSelector makeDimensionSelectorInternal(ReadableOffset offset, @Nullable ExtractionFn extractionFn)
{
if (multiValue) {
class MultiValueSelector implements DimensionSelector
{
private int currentRow = -1;
private List<ByteBuffer> currentValues = null;
private final RangeIndexedInts indexedInts = new RangeIndexedInts();
@Override
public int getValueCardinality()
{
return CARDINALITY_UNKNOWN;
}
@Nullable
@Override
public String lookupName(int id)
{
populate();
final ByteBuffer buf = currentValues.get(id);
final String s = buf == null ? null : StringUtils.fromUtf8(buf.duplicate());
return extractionFn == null ? s : extractionFn.apply(s);
}
@Nullable
@Override
public ByteBuffer lookupNameUtf8(int id)
{
assert supportsLookupNameUtf8();
populate();
return currentValues.get(id);
}
@Override
public boolean supportsLookupNameUtf8()
{
return extractionFn == null;
}
@Override
public boolean nameLookupPossibleInAdvance()
{
return false;
}
@Nullable
@Override
public IdLookup idLookup()
{
return null;
}
@Override
public IndexedInts getRow()
{
populate();
return indexedInts;
}
@Override
public ValueMatcher makeValueMatcher(@Nullable String value)
{
return DimensionSelectorUtils.makeValueMatcherGeneric(this, value);
}
@Override
public ValueMatcher makeValueMatcher(Predicate<String> predicate)
{
return DimensionSelectorUtils.makeValueMatcherGeneric(this, predicate);
}
@Nullable
@Override
public Object getObject()
{
return getRowAsObject(frame.physicalRow(offset.getOffset()), true);
}
@Override
public Class<?> classOfObject()
{
return String.class;
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
// Do nothing.
}
private void populate()
{
final int row = offset.getOffset();
if (row != currentRow) {
currentValues = getRowAsListUtf8(frame.physicalRow(row));
indexedInts.setSize(currentValues.size());
currentRow = row;
}
}
}
return new MultiValueSelector();
} else {
class SingleValueSelector extends BaseSingleValueDimensionSelector
{
@Nullable
@Override
protected String getValue()
{
final String s = getString(frame.physicalRow(offset.getOffset()));
return extractionFn == null ? s : extractionFn.apply(s);
}
@Nullable
@Override
public ByteBuffer lookupNameUtf8(int id)
{
assert supportsLookupNameUtf8();
return getStringUtf8(frame.physicalRow(offset.getOffset()));
}
@Override
public boolean supportsLookupNameUtf8()
{
return extractionFn == null;
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
// Do nothing.
}
}
return new SingleValueSelector();
}
}
}
static class StringArrayFrameColumn implements BaseColumn
@ -610,57 +708,22 @@ public class StringFrameColumnReader implements FrameColumnReader
multiValue,
memory,
startOfStringLengthSection,
startOfStringDataSection
startOfStringDataSection,
true
);
}
@Override
@SuppressWarnings("rawtypes")
public ColumnValueSelector<List> makeColumnValueSelector(ReadableOffset offset)
public ColumnValueSelector makeColumnValueSelector(ReadableOffset offset)
{
final DimensionSelector delegateSelector = delegate.makeDimensionSelector(offset, null);
return new ObjectColumnSelector<List>()
{
@Override
public List getObject()
{
final IndexedInts row = delegateSelector.getRow();
final int sz = row.size();
if (sz == 0) {
return Collections.emptyList();
} else if (sz == 1) {
return Collections.singletonList(delegateSelector.lookupName(0));
} else {
final List<String> retVal = new ArrayList<>(sz);
for (int i = 0; i < sz; i++) {
retVal.add(delegateSelector.lookupName(i));
}
return retVal;
}
}
@Override
public Class<List> classOfObject()
{
return List.class;
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
delegateSelector.inspectRuntimeShape(inspector);
}
};
return delegate.makeDimensionSelectorInternal(offset, null);
}
@Override
public void close()
{
// do nothing
delegate.close();
}
}
}

View File

@ -27,6 +27,7 @@ import org.apache.druid.frame.key.KeyColumn;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.segment.BaseObjectColumnValueSelector;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.DimensionDictionarySelector;
import org.apache.druid.segment.DimensionSelector;
@ -96,6 +97,8 @@ public class FrameWriterUtils
* @param multiValue if true, return an array that corresponds exactly to {@link DimensionSelector#getRow()}.
* if false, always return a single-valued array. In particular, this means [] is
* returned as [NULL_STRING_MARKER_ARRAY].
*
* @return UTF-8 strings. The list itself is never null.
*/
public static List<ByteBuffer> getUtf8ByteBuffersFromStringSelector(
final DimensionSelector selector,
@ -130,14 +133,22 @@ public class FrameWriterUtils
* selector you get for an {@code ARRAY<STRING>} column.
*
* Null strings are returned as {@link #NULL_STRING_MARKER_ARRAY}.
*
* If the entire array returned by {@link BaseObjectColumnValueSelector#getObject()} is null, returns either
* null or {@link #NULL_STRING_MARKER_ARRAY} depending on the value of "useNullArrays".
*
* @param selector array selector
*
* @return UTF-8 strings. The list itself may be null.
*/
@Nullable
public static List<ByteBuffer> getUtf8ByteBuffersFromStringArraySelector(
@SuppressWarnings("rawtypes") final ColumnValueSelector selector
@SuppressWarnings("rawtypes") final BaseObjectColumnValueSelector selector
)
{
Object row = selector.getObject();
if (row == null) {
return Collections.singletonList(getUtf8ByteBufferFromString(null));
return null;
} else if (row instanceof String) {
return Collections.singletonList(getUtf8ByteBufferFromString((String) row));
}

View File

@ -41,6 +41,7 @@ public class FrameColumnWriters
public static final byte TYPE_DOUBLE = 3;
public static final byte TYPE_STRING = 4;
public static final byte TYPE_COMPLEX = 5;
public static final byte TYPE_STRING_ARRAY = 6;
private FrameColumnWriters()
{
@ -140,12 +141,8 @@ public class FrameColumnWriters
final String columnName
)
{
final ColumnValueSelector selector = selectorFactory.makeColumnValueSelector(columnName);
return new StringArrayFrameColumnWriter(
selector,
allocator,
true
);
final ColumnValueSelector<?> selector = selectorFactory.makeColumnValueSelector(columnName);
return new StringArrayFrameColumnWriterImpl(selector, allocator);
}
private static ComplexFrameColumnWriter makeComplexWriter(

View File

@ -19,7 +19,6 @@
package org.apache.druid.frame.write.columnar;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Ints;
import org.apache.datasketches.memory.WritableMemory;
import org.apache.druid.frame.allocation.AppendableMemory;
@ -30,28 +29,39 @@ import org.apache.druid.java.util.common.ISE;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.DimensionSelector;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.List;
public abstract class StringFrameColumnWriter<T extends ColumnValueSelector> implements FrameColumnWriter
{
// Multiple of 4 such that three of these fit within AppendableMemory.DEFAULT_INITIAL_ALLOCATION_SIZE.
// This guarantees we can fit a WorkerMemoryParmeters.MAX_FRAME_COLUMNS number of columns into a frame.
/**
* Multiple of 4 such that three of these fit within {@link AppendableMemory#DEFAULT_INITIAL_ALLOCATION_SIZE}.
* This guarantees we can fit a {@code Limits#MAX_FRAME_COLUMNS} number of columns into a frame.
*/
private static final int INITIAL_ALLOCATION_SIZE = 120;
public static final long DATA_OFFSET = 1 /* type code */ + 1 /* single or multi-value? */;
private final T selector;
private final byte typeCode;
protected final boolean multiValue;
// Row lengths: one int per row with the number of values contained by that row and all previous rows.
// Only written for multi-value columns.
/**
* Row lengths: one int per row with the number of values contained by that row and all previous rows.
* Only written for multi-value and array columns. When the corresponding row is null itself, the length is
* written as -(actual length) - 1. (Guaranteed to be a negative number even if "actual length" is zero.)
*/
private final AppendableMemory cumulativeRowLengths;
// String lengths: one int per string, containing the length of that string plus the length of all previous strings.
/**
* String lengths: one int per string, containing the length of that string plus the length of all previous strings.
*/
private final AppendableMemory cumulativeStringLengths;
// String data.
/**
* String data.
*/
private final AppendableMemory stringData;
private int lastCumulativeRowLength = 0;
@ -62,10 +72,12 @@ public abstract class StringFrameColumnWriter<T extends ColumnValueSelector> imp
StringFrameColumnWriter(
final T selector,
final MemoryAllocator allocator,
final byte typeCode,
final boolean multiValue
)
{
this.selector = selector;
this.typeCode = typeCode;
this.multiValue = multiValue;
if (multiValue) {
@ -82,9 +94,10 @@ public abstract class StringFrameColumnWriter<T extends ColumnValueSelector> imp
public boolean addSelection()
{
final List<ByteBuffer> utf8Data = getUtf8ByteBuffersFromSelector(selector);
final int utf8Count = utf8Data == null ? 0 : utf8Data.size();
final int utf8DataByteLength = countBytes(utf8Data);
if ((long) lastCumulativeRowLength + utf8Data.size() > Integer.MAX_VALUE) {
if ((long) lastCumulativeRowLength + utf8Count > Integer.MAX_VALUE) {
// Column is full because cumulative row length has exceeded the max capacity of an integer.
return false;
}
@ -98,7 +111,7 @@ public abstract class StringFrameColumnWriter<T extends ColumnValueSelector> imp
return false;
}
if (!cumulativeStringLengths.reserveAdditional(Integer.BYTES * utf8Data.size())) {
if (!cumulativeStringLengths.reserveAdditional(Integer.BYTES * utf8Count)) {
return false;
}
@ -109,22 +122,31 @@ public abstract class StringFrameColumnWriter<T extends ColumnValueSelector> imp
// Enough space has been reserved to write what we need to write; let's start.
if (multiValue) {
final MemoryRange<WritableMemory> rowLengthsCursor = cumulativeRowLengths.cursor();
rowLengthsCursor.memory().putInt(rowLengthsCursor.start(), lastCumulativeRowLength + utf8Data.size());
if (utf8Data == null && typeCode == FrameColumnWriters.TYPE_STRING_ARRAY) {
// Array is null itself. Signify by writing -(actual length) - 1.
rowLengthsCursor.memory().putInt(rowLengthsCursor.start(), -(lastCumulativeRowLength + utf8Count) - 1);
} else {
// When writing STRING type (as opposed to ARRAY<STRING>), treat null array as empty array. (STRING type cannot
// represent an array that is null itself.)
rowLengthsCursor.memory().putInt(rowLengthsCursor.start(), lastCumulativeRowLength + utf8Count);
}
cumulativeRowLengths.advanceCursor(Integer.BYTES);
lastRowLength = utf8Data.size();
lastCumulativeRowLength += utf8Data.size();
lastRowLength = utf8Count;
lastCumulativeRowLength += utf8Count;
}
// The utf8Data.size and utf8DataByteLength checks are necessary to avoid acquiring cursors with zero bytes
// reserved. Otherwise, if a zero-byte-reserved cursor was acquired in the first row, it would be an error since no
// bytes would have been allocated yet.
final MemoryRange<WritableMemory> stringLengthsCursor =
utf8Data.size() > 0 ? cumulativeStringLengths.cursor() : null;
utf8Count > 0 ? cumulativeStringLengths.cursor() : null;
final MemoryRange<WritableMemory> stringDataCursor =
utf8DataByteLength > 0 ? stringData.cursor() : null;
lastStringLength = 0;
for (int i = 0; i < utf8Data.size(); i++) {
for (int i = 0; i < utf8Count; i++) {
final ByteBuffer utf8Datum = utf8Data.get(i);
final int len = utf8Datum.remaining();
@ -144,13 +166,13 @@ public abstract class StringFrameColumnWriter<T extends ColumnValueSelector> imp
lastStringLength += len;
lastCumulativeStringLength += len;
assert stringLengthsCursor != null; // Won't be null when utf8Data.size() > 0
assert stringLengthsCursor != null; // Won't be null when utf8Count > 0
stringLengthsCursor.memory()
.putInt(stringLengthsCursor.start() + (long) Integer.BYTES * i, lastCumulativeStringLength);
}
if (utf8Data.size() > 0) {
cumulativeStringLengths.advanceCursor(Integer.BYTES * utf8Data.size());
if (utf8Count > 0) {
cumulativeStringLengths.advanceCursor(Integer.BYTES * utf8Count);
}
if (utf8DataByteLength > 0) {
@ -195,7 +217,7 @@ public abstract class StringFrameColumnWriter<T extends ColumnValueSelector> imp
{
long currentPosition = startPosition;
memory.putByte(currentPosition, FrameColumnWriters.TYPE_STRING);
memory.putByte(currentPosition, typeCode);
memory.putByte(currentPosition + 1, multiValue ? (byte) 1 : (byte) 0);
currentPosition += 2;
@ -224,13 +246,18 @@ public abstract class StringFrameColumnWriter<T extends ColumnValueSelector> imp
* Extracts a list of ByteBuffers from the selector. Null values are returned as
* {@link FrameWriterUtils#NULL_STRING_MARKER_ARRAY}.
*/
@Nullable
public abstract List<ByteBuffer> getUtf8ByteBuffersFromSelector(T selector);
/**
* Returns the sum of remaining bytes in the provided list of byte buffers.
*/
private static int countBytes(final List<ByteBuffer> buffers)
private static int countBytes(@Nullable final List<ByteBuffer> buffers)
{
if (buffers == null) {
return 0;
}
long count = 0;
for (final ByteBuffer buffer : buffers) {
@ -242,6 +269,9 @@ public abstract class StringFrameColumnWriter<T extends ColumnValueSelector> imp
}
}
/**
* Writer for {@link org.apache.druid.segment.column.ColumnType#STRING}.
*/
class StringFrameColumnWriterImpl extends StringFrameColumnWriter<DimensionSelector>
{
StringFrameColumnWriterImpl(
@ -250,7 +280,7 @@ class StringFrameColumnWriterImpl extends StringFrameColumnWriter<DimensionSelec
boolean multiValue
)
{
super(selector, allocator, multiValue);
super(selector, allocator, FrameColumnWriters.TYPE_STRING, multiValue);
}
@Override
@ -260,20 +290,17 @@ class StringFrameColumnWriterImpl extends StringFrameColumnWriter<DimensionSelec
}
}
class StringArrayFrameColumnWriter extends StringFrameColumnWriter<ColumnValueSelector>
/**
* Writer for {@link org.apache.druid.segment.column.ColumnType#STRING_ARRAY}.
*/
class StringArrayFrameColumnWriterImpl extends StringFrameColumnWriter<ColumnValueSelector>
{
StringArrayFrameColumnWriter(
StringArrayFrameColumnWriterImpl(
ColumnValueSelector selector,
MemoryAllocator allocator,
boolean multiValue
MemoryAllocator allocator
)
{
super(selector, allocator, multiValue);
Preconditions.checkArgument(
multiValue,
"%s can only be used when multiValue is true",
StringArrayFrameColumnWriter.class.getName()
);
super(selector, allocator, FrameColumnWriters.TYPE_STRING_ARRAY, true);
}
@Override

View File

@ -20,6 +20,8 @@
package org.apache.druid.segment;
import org.apache.druid.guice.annotations.ExtensionPoint;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnType;
import javax.annotation.Nullable;
@ -29,12 +31,42 @@ import javax.annotation.Nullable;
* BaseObjectColumnValueSelector to make it impossible to accidently call any method other than {@link #getObject()}.
*
* All implementations of this interface MUST also implement {@link ColumnValueSelector}.
*
* Typically created by {@link ColumnSelectorFactory#makeColumnValueSelector(String)}.
*/
@ExtensionPoint
public interface BaseObjectColumnValueSelector<T>
{
/**
* Returns the currently-selected object.
*
* The behavior of this method depends on the type of selector, which can be determined by calling
* {@link ColumnSelectorFactory#getColumnCapabilities(String)} on the same {@link ColumnSelectorFactory} that
* you got this selector from. If the capabilties are nonnull, the selector type is given by
* {@link ColumnCapabilities#getType()}.
*
* String selectors, where type is {@link ColumnType#STRING}, may return any type of object from this method,
* especially in cases where the selector is casting objects to string at selection time. Callers are encouraged to
* avoid the need to deal with various objects by using {@link ColumnSelectorFactory#makeDimensionSelector} instead.
*
* Numeric selectors, where {@link ColumnType#isNumeric()}, may return any type of {@link Number}. Callers that
* wish to deal with more specific types should treat the original {@link ColumnValueSelector} as a
* {@link BaseLongColumnValueSelector}, {@link BaseDoubleColumnValueSelector}, or
* {@link BaseFloatColumnValueSelector} instead.
*
* Array selectors, where {@link ColumnType#isArray()}, must return {@code Object[]}. The array may contain
* null elements, and the array itself may also be null.
*
* Selectors of unknown type, where {@link ColumnSelectorFactory#getColumnCapabilities(String)} returns null,
* may return any type of object. Callers must be prepared for a wide variety of possible input objects. This case
* is common during ingestion, where selectors are built on top of external data.
*/
@Nullable
T getObject();
/**
* Most-specific class of object returned by {@link #getObject()}, if known in advance. This method returns
* {@link Object} when selectors do not know in advance what class of object they may return.
*/
Class<? extends T> classOfObject();
}

View File

@ -116,7 +116,7 @@ public interface DimensionSelector extends ColumnValueSelector<Object>, Dimensio
/**
* Converts the current result of {@link #getRow()} into null, if the row is empty, a String, if the row has size 1,
* or a String[] array, if the row has size > 1, using {@link #lookupName(int)}.
* or a {@code List<String>}, if the row has size > 1, using {@link #lookupName(int)}.
*
* This method is not the default implementation of {@link #getObject()} to minimize the chance that implementations
* "forget" to override it with more optimized version.
@ -130,6 +130,11 @@ public interface DimensionSelector extends ColumnValueSelector<Object>, Dimensio
/**
* Converts a particular {@link IndexedInts} to an Object in a standard way, assuming each element in the IndexedInts
* is a dictionary ID that can be resolved with the provided selector.
*
* Specification:
* 1) Empty row ({@link IndexedInts#size()} zero) returns null.
* 2) Single-value row returns a single {@link String}.
* 3) Two+ value rows return {@link List} of {@link String}.
*/
@Nullable
static Object rowToObject(IndexedInts row, DimensionDictionarySelector selector)

View File

@ -83,11 +83,16 @@ public interface VectorColumnProcessorFactory<T>
*/
T makeLongProcessor(ColumnCapabilities capabilities, VectorValueSelector selector);
/**
* Called when {@link ColumnCapabilities#getType()} is ARRAY.
*/
T makeArrayProcessor(ColumnCapabilities capabilities, VectorObjectSelector selector);
/**
* Called when {@link ColumnCapabilities#getType()} is COMPLEX. May also be called for STRING typed columns in
* cases where the dictionary does not exist or is not expected to be useful.
*
* @see VectorObjectSelector#getObjectVector() for details on what can appear here when type is STRING
*/
T makeObjectProcessor(@SuppressWarnings("unused") ColumnCapabilities capabilities, VectorObjectSelector selector);

View File

@ -88,7 +88,7 @@ public interface VectorColumnSelectorFactory extends ColumnInspector
/**
* Returns an object selector. Should only be called on columns where {@link #getColumnCapabilities} indicates that
* they return STRING or COMPLEX, or on nonexistent columns.
* they return STRING, ARRAY, or COMPLEX, or on nonexistent columns.
*
* For STRING, this is needed if values are not dictionary encoded, such as computed virtual columns, or can
* optionally be used in place of {@link SingleValueDimensionVectorSelector} when using the dictionary isn't helpful.

View File

@ -19,15 +19,39 @@
package org.apache.druid.segment.vector;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.DimensionDictionarySelector;
import org.apache.druid.segment.VectorColumnProcessorFactory;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.data.IndexedInts;
/**
* Vectorized object selector, useful for complex columns.
* Vectorized object selector.
*
* @see org.apache.druid.segment.ColumnValueSelector, the non-vectorized version.
* Typically created by {@link VectorColumnSelectorFactory#makeObjectSelector(String)}.
*
* @see ColumnValueSelector, the non-vectorized version.
*/
public interface VectorObjectSelector extends VectorSizeInspector
{
/**
* Get the current vector. Individual elements of the array may be null.
* Get the current vector.
*
* The type of objects in the array depends on the type of the selector. Callers can determine this by calling
* {@link VectorColumnSelectorFactory#getColumnCapabilities(String)} if creating selectors directly. Alternatively,
* callers using {@link VectorColumnProcessorFactory} will receive capabilities as part of the callback to
* {@link VectorColumnProcessorFactory#makeObjectProcessor(ColumnCapabilities, VectorObjectSelector)}.
*
* String selectors, where type is {@link ColumnType#STRING}, must use objects compatible with the spec of
* {@link org.apache.druid.segment.DimensionSelector#rowToObject(IndexedInts, DimensionDictionarySelector)}.
*
* Array selectors, where {@link ColumnType#isArray()}, must use {@code Object[]}. The array may contain
* null elements, and the array itself may also be null.
*
* Complex selectors may use any type of object.
*
* No other type of selector is possible. Vector object selectors are only used for strings, arrays, and complex types.
*/
Object[] getObjectVector();
}

View File

@ -20,14 +20,35 @@
package org.apache.druid.frame;
import com.google.common.collect.Iterables;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.query.QueryMetrics;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.query.filter.Filter;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.DimensionSelector;
import org.apache.druid.segment.ObjectColumnSelector;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexStorageAdapter;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.data.IndexedInts;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import java.util.Optional;
import javax.annotation.Nullable;
/**
* Storage adapter around {@link QueryableIndex} that transforms all multi-value strings columns into string arrays.
*/
public class TestArrayStorageAdapter extends QueryableIndexStorageAdapter
{
public TestArrayStorageAdapter(QueryableIndex index)
@ -35,17 +56,43 @@ public class TestArrayStorageAdapter extends QueryableIndexStorageAdapter
super(index);
}
@Override
public boolean canVectorize(
@Nullable Filter filter,
VirtualColumns virtualColumns,
boolean descending
)
{
return false;
}
@Override
public Sequence<Cursor> makeCursors(
@Nullable final Filter filter,
final Interval interval,
final VirtualColumns virtualColumns,
final Granularity gran,
final boolean descending,
@Nullable final QueryMetrics<?> queryMetrics
)
{
return super.makeCursors(filter, interval, virtualColumns, gran, descending, queryMetrics)
.map(DecoratedCursor::new);
}
@Override
public RowSignature getRowSignature()
{
final RowSignature.Builder builder = RowSignature.builder();
builder.addTimeColumn();
for (final String column : Iterables.concat(getAvailableDimensions(), getAvailableMetrics())) {
Optional<ColumnCapabilities> columnCapabilities = Optional.ofNullable(getColumnCapabilities(column));
ColumnType columnType = columnCapabilities.isPresent() ? columnCapabilities.get().toColumnType() : null;
//change MV columns to Array<String>
if (columnCapabilities.isPresent() && columnCapabilities.get().hasMultipleValues().isMaybeTrue()) {
for (final String column : Iterables.concat(super.getAvailableDimensions(), super.getAvailableMetrics())) {
ColumnCapabilities columnCapabilities = super.getColumnCapabilities(column);
ColumnType columnType = columnCapabilities == null ? null : columnCapabilities.toColumnType();
//change MV strings columns to Array<String>
if (columnType != null
&& columnType.equals(ColumnType.STRING)
&& columnCapabilities.hasMultipleValues().isMaybeTrue()) {
columnType = ColumnType.STRING_ARRAY;
}
builder.add(column, columnType);
@ -53,4 +100,131 @@ public class TestArrayStorageAdapter extends QueryableIndexStorageAdapter
return builder.build();
}
@Nullable
@Override
public ColumnCapabilities getColumnCapabilities(String column)
{
final ColumnCapabilities ourType = getRowSignature().getColumnCapabilities(column);
if (ourType != null) {
return ColumnCapabilitiesImpl.copyOf(super.getColumnCapabilities(column)).setType(ourType.toColumnType());
} else {
return super.getColumnCapabilities(column);
}
}
private class DecoratedCursor implements Cursor
{
private final Cursor cursor;
public DecoratedCursor(Cursor cursor)
{
this.cursor = cursor;
}
@Override
public ColumnSelectorFactory getColumnSelectorFactory()
{
final ColumnSelectorFactory columnSelectorFactory = cursor.getColumnSelectorFactory();
return new ColumnSelectorFactory()
{
@Override
public DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec)
{
if (!(dimensionSpec instanceof DefaultDimensionSpec)) {
// No tests need this case, don't bother to implement
throw new UnsupportedOperationException();
}
final ColumnCapabilities capabilities = getColumnCapabilities(dimensionSpec.getDimension());
if (capabilities == null || capabilities.is(ValueType.ARRAY)) {
throw new UnsupportedOperationException("Must not call makeDimensionSelector on ARRAY");
}
return columnSelectorFactory.makeDimensionSelector(dimensionSpec);
}
@Override
public ColumnValueSelector makeColumnValueSelector(String columnName)
{
final ColumnCapabilities capabilities = getColumnCapabilities(columnName);
if (capabilities != null && capabilities.toColumnType().equals(ColumnType.STRING_ARRAY)) {
final DimensionSelector delegate =
columnSelectorFactory.makeDimensionSelector(DefaultDimensionSpec.of(columnName));
return new ObjectColumnSelector<Object[]>()
{
@Override
public Object[] getObject()
{
final IndexedInts row = delegate.getRow();
final int sz = row.size();
final Object[] retVal = new Object[sz];
for (int i = 0; i < sz; i++) {
retVal[i] = delegate.lookupName(row.get(i));
}
return retVal;
}
@Override
public Class<Object[]> classOfObject()
{
return Object[].class;
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
// No
}
};
} else {
return columnSelectorFactory.makeColumnValueSelector(columnName);
}
}
@Nullable
@Override
public ColumnCapabilities getColumnCapabilities(String column)
{
return TestArrayStorageAdapter.this.getColumnCapabilities(column);
}
};
}
@Override
public DateTime getTime()
{
return cursor.getTime();
}
@Override
public void advance()
{
cursor.advance();
}
@Override
public void advanceUninterruptibly()
{
cursor.advanceUninterruptibly();
}
@Override
public boolean isDone()
{
return cursor.isDone();
}
@Override
public boolean isDoneOrInterrupted()
{
return cursor.isDoneOrInterrupted();
}
@Override
public void reset()
{
cursor.reset();
}
}
}

View File

@ -0,0 +1,149 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.frame.field;
import org.apache.datasketches.memory.WritableMemory;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.segment.BaseObjectColumnValueSelector;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;
import org.mockito.quality.Strictness;
import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
public class StringArrayFieldWriterTest extends InitializedNullHandlingTest
{
private static final long MEMORY_POSITION = 1;
@Rule
public MockitoRule mockitoRule = MockitoJUnit.rule().strictness(Strictness.STRICT_STUBS);
@Mock
public BaseObjectColumnValueSelector<List<String>> selector;
private WritableMemory memory;
private FieldWriter fieldWriter;
@Before
public void setUp()
{
memory = WritableMemory.allocate(1000);
fieldWriter = new StringArrayFieldWriter(selector);
}
@After
public void tearDown()
{
fieldWriter.close();
}
@Test
public void testEmptyArray()
{
doTest(Collections.emptyList());
}
@Test
public void testNullArray()
{
doTest(null);
}
@Test
public void testOneString()
{
doTest(Collections.singletonList("foo"));
}
@Test
public void testOneNull()
{
doTest(Collections.singletonList(null));
}
@Test
public void testMultiValueString()
{
doTest(Arrays.asList("foo", "bar"));
}
@Test
public void testMultiValueStringContainingNulls()
{
doTest(Arrays.asList("foo", NullHandling.emptyToNullIfNeeded(""), "bar", null));
}
private void doTest(@Nullable final List<String> values)
{
mockSelector(values);
final long written = writeToMemory(fieldWriter);
final Object valuesRead = readFromMemory(written);
Assert.assertEquals("values read", values, valuesRead);
}
private void mockSelector(@Nullable final List<String> values)
{
Mockito.when(selector.getObject()).thenReturn(values);
}
private long writeToMemory(final FieldWriter writer)
{
// Try all maxSizes up until the one that finally works, then return it.
for (long maxSize = 0; maxSize < memory.getCapacity() - MEMORY_POSITION; maxSize++) {
final long written = writer.writeTo(memory, MEMORY_POSITION, maxSize);
if (written > 0) {
Assert.assertEquals("bytes written", maxSize, written);
return written;
}
}
throw new ISE("Could not write in memory with capacity [%,d]", memory.getCapacity() - MEMORY_POSITION);
}
@Nullable
private List<String> readFromMemory(final long written)
{
final byte[] bytes = new byte[(int) written];
memory.getByteArray(MEMORY_POSITION, bytes, 0, (int) written);
final FieldReader fieldReader = FieldReaders.create("columnNameDoesntMatterHere", ColumnType.STRING_ARRAY);
final ColumnValueSelector<?> selector =
fieldReader.makeColumnValueSelector(memory, new ConstantFieldPointer(MEMORY_POSITION));
final Object o = selector.getObject();
//noinspection rawtypes,unchecked
return o == null ? null : (List) Arrays.asList((Object[]) o);
}
}

View File

@ -20,16 +20,18 @@
package org.apache.druid.frame.field;
import com.google.common.collect.ImmutableList;
import it.unimi.dsi.fastutil.objects.ObjectArrays;
import junitparams.converters.Nullable;
import org.apache.datasketches.memory.WritableMemory;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.extraction.SubstringDimExtractionFn;
import org.apache.druid.segment.BaseObjectColumnValueSelector;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.DimensionDictionarySelector;
import org.apache.druid.segment.DimensionSelector;
import org.apache.druid.segment.data.IndexedInts;
import org.apache.druid.segment.data.RangeIndexedInts;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
@ -57,7 +59,7 @@ public class StringFieldReaderTest extends InitializedNullHandlingTest
public MockitoRule mockitoRule = MockitoJUnit.rule().strictness(Strictness.STRICT_STUBS);
@Mock
public DimensionSelector writeSelector;
public BaseObjectColumnValueSelector<List<String>> writeSelector;
private WritableMemory memory;
private FieldWriter fieldWriter;
@ -66,7 +68,7 @@ public class StringFieldReaderTest extends InitializedNullHandlingTest
public void setUp()
{
memory = WritableMemory.allocate(1000);
fieldWriter = new StringFieldWriter(writeSelector);
fieldWriter = new StringArrayFieldWriter(writeSelector);
}
@After
@ -76,10 +78,26 @@ public class StringFieldReaderTest extends InitializedNullHandlingTest
}
@Test
public void test_isNull_null()
public void test_isNull_nullValue()
{
writeToMemory(Collections.singletonList(null));
Assert.assertTrue(new StringFieldReader(false).isNull(memory, MEMORY_POSITION));
Assert.assertFalse(new StringFieldReader(true).isNull(memory, MEMORY_POSITION));
}
@Test
public void test_isNull_twoNullValues()
{
writeToMemory(Arrays.asList(null, null));
Assert.assertFalse(new StringFieldReader(false).isNull(memory, MEMORY_POSITION));
Assert.assertFalse(new StringFieldReader(true).isNull(memory, MEMORY_POSITION));
}
@Test
public void test_isNull_nullRow()
{
writeToMemory(null);
Assert.assertTrue(new StringFieldReader(false).isNull(memory, MEMORY_POSITION));
Assert.assertTrue(new StringFieldReader(true).isNull(memory, MEMORY_POSITION));
}
@ -91,10 +109,7 @@ public class StringFieldReaderTest extends InitializedNullHandlingTest
NullHandling.replaceWithDefault(),
new StringFieldReader(false).isNull(memory, MEMORY_POSITION)
);
Assert.assertEquals(
NullHandling.replaceWithDefault(),
new StringFieldReader(true).isNull(memory, MEMORY_POSITION)
);
Assert.assertFalse(new StringFieldReader(true).isNull(memory, MEMORY_POSITION));
}
@Test
@ -132,7 +147,7 @@ public class StringFieldReaderTest extends InitializedNullHandlingTest
new StringFieldReader(true).makeColumnValueSelector(memory, new ConstantFieldPointer(MEMORY_POSITION));
Assert.assertEquals("foo", readSelector.getObject());
Assert.assertEquals(Collections.singletonList("foo"), readSelectorAsArray.getObject());
Assert.assertArrayEquals(new Object[]{"foo"}, (Object[]) readSelectorAsArray.getObject());
}
@Test
@ -146,7 +161,7 @@ public class StringFieldReaderTest extends InitializedNullHandlingTest
new StringFieldReader(true).makeColumnValueSelector(memory, new ConstantFieldPointer(MEMORY_POSITION));
Assert.assertEquals(ImmutableList.of("foo", "bar"), readSelector.getObject());
Assert.assertEquals(ImmutableList.of("foo", "bar"), readSelectorAsArray.getObject());
Assert.assertArrayEquals(new Object[]{"foo", "bar"}, (Object[]) readSelectorAsArray.getObject());
}
@Test
@ -160,7 +175,7 @@ public class StringFieldReaderTest extends InitializedNullHandlingTest
new StringFieldReader(true).makeColumnValueSelector(memory, new ConstantFieldPointer(MEMORY_POSITION));
Assert.assertNull(readSelector.getObject());
Assert.assertEquals(Collections.singletonList(null), readSelectorAsArray.getObject());
Assert.assertArrayEquals(new Object[]{null}, (Object[]) readSelectorAsArray.getObject());
}
@Test
@ -174,7 +189,7 @@ public class StringFieldReaderTest extends InitializedNullHandlingTest
new StringFieldReader(true).makeColumnValueSelector(memory, new ConstantFieldPointer(MEMORY_POSITION));
Assert.assertNull(readSelector.getObject());
Assert.assertEquals(Collections.emptyList(), readSelectorAsArray.getObject());
Assert.assertArrayEquals(ObjectArrays.EMPTY_ARRAY, (Object[]) readSelectorAsArray.getObject());
}
@Test
@ -255,21 +270,9 @@ public class StringFieldReaderTest extends InitializedNullHandlingTest
Assert.assertFalse(readSelector.makeValueMatcher("bar"::equals).matches());
}
private void writeToMemory(final List<String> values)
private void writeToMemory(@Nullable final List<String> values)
{
final RangeIndexedInts row = new RangeIndexedInts();
row.setSize(values.size());
Mockito.when(writeSelector.getRow()).thenReturn(row);
if (values.size() > 0) {
Mockito.when(writeSelector.supportsLookupNameUtf8()).thenReturn(false);
}
for (int i = 0; i < values.size(); i++) {
final String value = values.get(i);
Mockito.when(writeSelector.lookupName(i)).thenReturn(value);
}
Mockito.when(writeSelector.getObject()).thenReturn(values);
if (fieldWriter.writeTo(memory, MEMORY_POSITION, memory.getCapacity() - MEMORY_POSITION) < 0) {
throw new ISE("Could not write");

View File

@ -113,15 +113,15 @@ public class StringFieldWriterTest extends InitializedNullHandlingTest
// Non-UTF8 test
{
final long written = writeToMemory(fieldWriter);
final Object valuesRead = readFromMemory(written);
Assert.assertEquals("values read (non-UTF8)", values, valuesRead);
final Object[] valuesRead = readFromMemory(written);
Assert.assertEquals("values read (non-UTF8)", values, Arrays.asList(valuesRead));
}
// UTF8 test
{
final long writtenUtf8 = writeToMemory(fieldWriterUtf8);
final Object valuesReadUtf8 = readFromMemory(writtenUtf8);
Assert.assertEquals("values read (UTF8)", values, valuesReadUtf8);
final Object[] valuesReadUtf8 = readFromMemory(writtenUtf8);
Assert.assertEquals("values read (UTF8)", values, Arrays.asList(valuesReadUtf8));
}
}
@ -177,7 +177,7 @@ public class StringFieldWriterTest extends InitializedNullHandlingTest
throw new ISE("Could not write in memory with capacity [%,d]", memory.getCapacity() - MEMORY_POSITION);
}
private List<String> readFromMemory(final long written)
private Object[] readFromMemory(final long written)
{
final byte[] bytes = new byte[(int) written];
memory.getByteArray(MEMORY_POSITION, bytes, 0, (int) written);
@ -186,7 +186,6 @@ public class StringFieldWriterTest extends InitializedNullHandlingTest
final ColumnValueSelector<?> selector =
fieldReader.makeColumnValueSelector(memory, new ConstantFieldPointer(MEMORY_POSITION));
//noinspection unchecked
return (List<String>) selector.getObject();
return (Object[]) selector.getObject();
}
}

View File

@ -20,6 +20,7 @@
package org.apache.druid.frame.key;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.frame.testutil.FrameTestUtil;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.testing.InitializedNullHandlingTest;
@ -60,7 +61,7 @@ public class RowKeyReaderTest extends InitializedNullHandlingTest
Arrays.asList("bar", "qux"),
7d,
NullHandling.defaultDoubleValue(),
Arrays.asList("abc", "xyz")
new Object[]{"abc", "xyz"}
);
private final RowKey key = KeyTestUtils.createKey(signature, objects.toArray());
@ -70,18 +71,29 @@ public class RowKeyReaderTest extends InitializedNullHandlingTest
@Test
public void test_read_all()
{
Assert.assertEquals(objects, keyReader.read(key));
FrameTestUtil.assertRowEqual(objects, keyReader.read(key));
}
@Test
public void test_read_oneField()
{
for (int i = 0; i < signature.size(); i++) {
Assert.assertEquals(
"read: " + signature.getColumnName(i),
objects.get(i),
keyReader.read(key, i)
);
final Object keyPart = keyReader.read(key, i);
if (objects.get(i) instanceof Object[]) {
MatcherAssert.assertThat(keyPart, CoreMatchers.instanceOf(Object[].class));
Assert.assertArrayEquals(
"read: " + signature.getColumnName(i),
(Object[]) objects.get(i),
(Object[]) keyPart
);
} else {
Assert.assertEquals(
"read: " + signature.getColumnName(i),
objects.get(i),
keyPart
);
}
}
}
@ -91,7 +103,7 @@ public class RowKeyReaderTest extends InitializedNullHandlingTest
for (int i = 0; i < signature.size(); i++) {
Assert.assertEquals(
"hasMultipleValues: " + signature.getColumnName(i),
objects.get(i) instanceof List,
objects.get(i) instanceof List || objects.get(i) instanceof Object[],
keyReader.hasMultipleValues(key, i)
);
}

View File

@ -59,8 +59,10 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.channels.Channels;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@ -102,7 +104,11 @@ public class FrameTestUtil
final File file
) throws IOException
{
try (final FrameFileWriter writer = FrameFileWriter.open(Channels.newChannel(new FileOutputStream(file)), null, ByteTracker.unboundedTracker())) {
try (final FrameFileWriter writer = FrameFileWriter.open(
Channels.newChannel(new FileOutputStream(file)),
null,
ByteTracker.unboundedTracker()
)) {
framesWithPartitions.forEach(
frameWithPartition -> {
try {
@ -126,7 +132,51 @@ public class FrameTestUtil
Assert.assertEquals("number of rows", expectedRows.size(), actualRows.size());
for (int i = 0; i < expectedRows.size(); i++) {
Assert.assertEquals("row #" + i, expectedRows.get(i), actualRows.get(i));
assertRowEqual("row #" + i, expectedRows.get(i), actualRows.get(i));
}
}
/**
* Asserts that two rows are equal, using {@link Objects#deepEquals} to work properly on {@code Object[]}.
*/
public static void assertRowEqual(final List<Object> expected, final List<Object> actual)
{
assertRowEqual(null, expected, actual);
}
/**
* Asserts that two rows are equal, using {@link Objects#deepEquals} to work properly on {@code Object[]}.
*/
public static void assertRowEqual(final String message, final List<Object> expected, final List<Object> actual)
{
boolean ok;
if (expected.size() == actual.size()) {
ok = true;
for (int i = 0; i < expected.size(); i++) {
final Object expectedValue = expected.get(i);
final Object actualValue = actual.get(i);
if (!Objects.deepEquals(expectedValue, actualValue)) {
ok = false;
break;
}
}
} else {
ok = false;
}
if (!ok) {
// Call Assert.assertEquals, which we expect to fail, to get a nice failure message
Assert.assertEquals(
message,
Arrays.deepToString(expected.toArray()),
Arrays.deepToString(actual.toArray())
);
// Just in case it doesn't fail for some reason, fail anyway.
Assert.fail(message);
}
}
@ -321,7 +371,7 @@ public class FrameTestUtil
private static Supplier<Object> dimensionSelectorReader(final DimensionSelector selector)
{
return () -> {
// Different from selector.getObject(): allows us to differentiate [null] from []
// Different from selector.getObject(): allows us to differentiate null, [null], and []
final IndexedInts row = selector.getRow();
final int sz = row.size();

View File

@ -494,8 +494,6 @@ public class FrameWriterTest extends InitializedNullHandlingTest
return NullHandling.defaultFloatValue();
case DOUBLE:
return NullHandling.defaultDoubleValue();
case ARRAY:
return Collections.emptyList();
default:
return null;
}

View File

@ -22,6 +22,7 @@ package org.apache.druid.frame.write;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.hash.Hashing;
import it.unimi.dsi.fastutil.objects.ObjectArrays;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.frame.key.KeyOrder;
import org.apache.druid.hll.HyperLogLogCollector;
@ -98,19 +99,20 @@ public class FrameWriterTestData
public static final Dataset<Object> TEST_ARRAYS_STRING = new Dataset<>(
ColumnType.STRING_ARRAY,
Arrays.asList(
Collections.emptyList(),
Collections.singletonList(null),
Collections.singletonList(NullHandling.emptyToNullIfNeeded("")),
Collections.singletonList("dog"),
Collections.singletonList("lazy"),
Arrays.asList("the", "quick", "brown"),
Arrays.asList("the", "quick", "brown", null),
Arrays.asList("the", "quick", "brown", NullHandling.emptyToNullIfNeeded("")),
Arrays.asList("the", "quick", "brown", "fox"),
Arrays.asList("the", "quick", "brown", "fox", "jumps", "over", "the", "lazy", "dog"),
Arrays.asList("the", "quick", "brown", "null"),
Collections.singletonList("\uD83D\uDE42"),
Arrays.asList("\uD83D\uDE42", "\uD83E\uDEE5")
null,
ObjectArrays.EMPTY_ARRAY,
new Object[]{null},
new Object[]{NullHandling.emptyToNullIfNeeded("")},
new Object[]{"dog"},
new Object[]{"lazy"},
new Object[]{"the", "quick", "brown"},
new Object[]{"the", "quick", "brown", null},
new Object[]{"the", "quick", "brown", NullHandling.emptyToNullIfNeeded("")},
new Object[]{"the", "quick", "brown", "fox"},
new Object[]{"the", "quick", "brown", "fox", "jumps", "over", "the", "lazy", "dog"},
new Object[]{"the", "quick", "brown", "null"},
new Object[]{"\uD83D\uDE42"},
new Object[]{"\uD83D\uDE42", "\uD83E\uDEE5"}
)
);

View File

@ -48,7 +48,9 @@ import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.Set;
public class FrameBasedIndexedTableTest extends InitializedNullHandlingTest
@ -365,8 +367,16 @@ public class FrameBasedIndexedTableTest extends InitializedNullHandlingTest
IndexedTable.Reader reader = frameBasedIndexedTable.columnReader(columnNumber);
List<Object[]> originalRows = dataSource.getRowsAsSequence().toList();
for (int i = 0; i < numRows; ++i) {
Object original = originalRows.get(i)[columnNumber];
Assert.assertEquals(original, reader.read(i));
final Object originalValue = originalRows.get(i)[columnNumber];
final Object actualValue = reader.read(i);
if (!Objects.deepEquals(originalValue, actualValue)) {
// Call Assert.assertEquals, which we expect to fail, to get a nice failure message
Assert.assertEquals(
originalValue instanceof Object[] ? Arrays.toString((Object[]) originalValue) : originalValue,
actualValue instanceof Object[] ? Arrays.toString((Object[]) actualValue) : actualValue
);
}
}
}
}