diff --git a/docs/querying/query-context.md b/docs/querying/query-context.md index a937d84bba4..f345b62c1c4 100644 --- a/docs/querying/query-context.md +++ b/docs/querying/query-context.md @@ -90,8 +90,8 @@ requirements: include "selector", "bound", "in", "like", "regex", "search", "and", "or", and "not". - All filters in filtered aggregators must offer vectorized row-matchers. - All aggregators must offer vectorized implementations. These include "count", "doubleSum", "floatSum", "longSum", "longMin", - "longMax", "doubleMin", "doubleMax", "floatMin", "floatMax", "hyperUnique", "filtered", "approxHistogram", - "approxHistogramFold", and "fixedBucketsHistogram" (with numerical input). + "longMax", "doubleMin", "doubleMax", "floatMin", "floatMax", "longAny", "doubleAny", "floatAny", "stringAny", + "hyperUnique", "filtered", "approxHistogram", "approxHistogramFold", and "fixedBucketsHistogram" (with numerical input). - No virtual columns. - For GroupBy: All dimension specs must be "default" (no extraction functions or filtered dimension specs). - For GroupBy: No multi-value dimensions. diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/any/DoubleAnyAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/any/DoubleAnyAggregatorFactory.java index ac962f3bde0..d18f3eecfa0 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/any/DoubleAnyAggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/any/DoubleAnyAggregatorFactory.java @@ -28,12 +28,16 @@ import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.AggregatorUtil; import org.apache.druid.query.aggregation.BufferAggregator; +import org.apache.druid.query.aggregation.VectorAggregator; import org.apache.druid.query.cache.CacheKeyBuilder; import org.apache.druid.segment.BaseDoubleColumnValueSelector; +import org.apache.druid.segment.ColumnInspector; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.NilColumnValueSelector; +import org.apache.druid.segment.column.ColumnCapabilities; import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.segment.column.ValueType; +import org.apache.druid.segment.vector.VectorColumnSelectorFactory; import javax.annotation.Nullable; import java.nio.ByteBuffer; @@ -112,6 +116,23 @@ public class DoubleAnyAggregatorFactory extends AggregatorFactory } } + @Override + public VectorAggregator factorizeVector(VectorColumnSelectorFactory selectorFactory) + { + ColumnCapabilities capabilities = selectorFactory.getColumnCapabilities(fieldName); + if (capabilities == null || capabilities.getType().isNumeric()) { + return new DoubleAnyVectorAggregator(selectorFactory.makeValueSelector(fieldName)); + } else { + return NumericNilVectorAggregator.doubleNilVectorAggregator(); + } + } + + @Override + public boolean canVectorize(ColumnInspector columnInspector) + { + return true; + } + @Override public Comparator getComparator() { diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/any/DoubleAnyVectorAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/any/DoubleAnyVectorAggregator.java new file mode 100644 index 00000000000..4236271d1dc --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/aggregation/any/DoubleAnyVectorAggregator.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.any; + +import org.apache.druid.segment.vector.VectorValueSelector; + +import java.nio.ByteBuffer; + +/** + * Vectorized implementation of the {@link DoubleAnyBufferAggregator} + */ +public class DoubleAnyVectorAggregator extends NumericAnyVectorAggregator +{ + public DoubleAnyVectorAggregator(VectorValueSelector selector) + { + super(selector); + } + + @Override + void initValue(ByteBuffer buf, int position) + { + buf.putDouble(position, 0); + } + + @Override + boolean putAnyValueFromRow(ByteBuffer buf, int position, int startRow, int endRow) + { + double[] values = vectorValueSelector.getDoubleVector(); + boolean isRowsWithinIndex = startRow < endRow && startRow < values.length; + if (isRowsWithinIndex) { + buf.putDouble(position, values[startRow]); + } + return isRowsWithinIndex; + } + + @Override + Object getNonNullObject(ByteBuffer buf, int position) + { + return buf.getDouble(position); + } +} diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/any/FloatAnyAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/any/FloatAnyAggregatorFactory.java index d7a912d8abd..b2a4f8e1205 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/any/FloatAnyAggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/any/FloatAnyAggregatorFactory.java @@ -28,11 +28,15 @@ import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.AggregatorUtil; import org.apache.druid.query.aggregation.BufferAggregator; +import org.apache.druid.query.aggregation.VectorAggregator; import org.apache.druid.query.cache.CacheKeyBuilder; import org.apache.druid.segment.BaseFloatColumnValueSelector; +import org.apache.druid.segment.ColumnInspector; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.NilColumnValueSelector; +import org.apache.druid.segment.column.ColumnCapabilities; import org.apache.druid.segment.column.ValueType; +import org.apache.druid.segment.vector.VectorColumnSelectorFactory; import javax.annotation.Nullable; import java.nio.ByteBuffer; @@ -109,6 +113,23 @@ public class FloatAnyAggregatorFactory extends AggregatorFactory } } + @Override + public VectorAggregator factorizeVector(VectorColumnSelectorFactory selectorFactory) + { + ColumnCapabilities capabilities = selectorFactory.getColumnCapabilities(fieldName); + if (capabilities == null || capabilities.getType().isNumeric()) { + return new FloatAnyVectorAggregator(selectorFactory.makeValueSelector(fieldName)); + } else { + return NumericNilVectorAggregator.floatNilVectorAggregator(); + } + } + + @Override + public boolean canVectorize(ColumnInspector columnInspector) + { + return true; + } + @Override public Comparator getComparator() { diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/any/FloatAnyVectorAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/any/FloatAnyVectorAggregator.java new file mode 100644 index 00000000000..2b3ec32b76e --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/aggregation/any/FloatAnyVectorAggregator.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.any; + +import org.apache.druid.segment.vector.VectorValueSelector; + +import java.nio.ByteBuffer; + +/** + * Vectorized implementation of the {@link FloatAnyBufferAggregator} + */ +public class FloatAnyVectorAggregator extends NumericAnyVectorAggregator +{ + public FloatAnyVectorAggregator(VectorValueSelector vectorValueSelector) + { + super(vectorValueSelector); + } + + @Override + void initValue(ByteBuffer buf, int position) + { + buf.putFloat(position, 0F); + } + + @Override + boolean putAnyValueFromRow(ByteBuffer buf, int position, int startRow, int endRow) + { + float[] values = vectorValueSelector.getFloatVector(); + boolean isRowsWithinIndex = startRow < endRow && startRow < values.length; + if (isRowsWithinIndex) { + buf.putFloat(position, values[startRow]); + } + return isRowsWithinIndex; + } + + @Override + Object getNonNullObject(ByteBuffer buf, int position) + { + return buf.getFloat(position); + } +} diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/any/LongAnyAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/any/LongAnyAggregatorFactory.java index 10bf526a7db..9826570d121 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/any/LongAnyAggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/any/LongAnyAggregatorFactory.java @@ -28,11 +28,15 @@ import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.AggregatorUtil; import org.apache.druid.query.aggregation.BufferAggregator; +import org.apache.druid.query.aggregation.VectorAggregator; import org.apache.druid.query.cache.CacheKeyBuilder; import org.apache.druid.segment.BaseLongColumnValueSelector; +import org.apache.druid.segment.ColumnInspector; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.NilColumnValueSelector; +import org.apache.druid.segment.column.ColumnCapabilities; import org.apache.druid.segment.column.ValueType; +import org.apache.druid.segment.vector.VectorColumnSelectorFactory; import javax.annotation.Nullable; import java.nio.ByteBuffer; @@ -108,6 +112,23 @@ public class LongAnyAggregatorFactory extends AggregatorFactory } } + @Override + public VectorAggregator factorizeVector(VectorColumnSelectorFactory selectorFactory) + { + ColumnCapabilities capabilities = selectorFactory.getColumnCapabilities(fieldName); + if (capabilities == null || capabilities.getType().isNumeric()) { + return new LongAnyVectorAggregator(selectorFactory.makeValueSelector(fieldName)); + } else { + return NumericNilVectorAggregator.longNilVectorAggregator(); + } + } + + @Override + public boolean canVectorize(ColumnInspector columnInspector) + { + return true; + } + @Override public Comparator getComparator() { diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/any/LongAnyVectorAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/any/LongAnyVectorAggregator.java new file mode 100644 index 00000000000..076ce27f681 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/aggregation/any/LongAnyVectorAggregator.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.any; + +import org.apache.druid.segment.vector.VectorValueSelector; + +import java.nio.ByteBuffer; + +/** + * Vectorized implementation of the {@link LongAnyBufferAggregator} + */ +public class LongAnyVectorAggregator extends NumericAnyVectorAggregator +{ + + public LongAnyVectorAggregator(VectorValueSelector vectorValueSelector) + { + super(vectorValueSelector); + } + + @Override + void initValue(ByteBuffer buf, int position) + { + buf.putLong(position, 0L); + } + + @Override + boolean putAnyValueFromRow(ByteBuffer buf, int position, int startRow, int endRow) + { + long[] values = vectorValueSelector.getLongVector(); + boolean isRowsWithinIndex = startRow < endRow && startRow < values.length; + if (isRowsWithinIndex) { + buf.putLong(position, values[startRow]); + } + return isRowsWithinIndex; + } + + @Override + Object getNonNullObject(ByteBuffer buf, int position) + { + return buf.getLong(position); + } +} diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/any/NumericAnyVectorAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/any/NumericAnyVectorAggregator.java new file mode 100644 index 00000000000..27df30bff31 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/aggregation/any/NumericAnyVectorAggregator.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.any; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.query.aggregation.VectorAggregator; +import org.apache.druid.segment.vector.VectorValueSelector; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; + +public abstract class NumericAnyVectorAggregator implements VectorAggregator +{ + // Rightmost bit for is null check (0 for is null and 1 for not null) + // Second rightmost bit for is found check (0 for not found and 1 for found) + @VisibleForTesting + static final byte BYTE_FLAG_FOUND_MASK = 0x02; + private static final byte BYTE_FLAG_NULL_MASK = 0x01; + private static final int FOUND_VALUE_OFFSET = Byte.BYTES; + + private final boolean replaceWithDefault = NullHandling.replaceWithDefault(); + protected final VectorValueSelector vectorValueSelector; + + public NumericAnyVectorAggregator(VectorValueSelector vectorValueSelector) + { + this.vectorValueSelector = vectorValueSelector; + } + + /** + * Initialize the buffer value given the initial offset position within the byte buffer for initialization + */ + abstract void initValue(ByteBuffer buf, int position); + + /** + * Place any primitive value from the rows, starting at {@param startRow}(inclusive) up to {@param endRow}(exclusive), + * in the buffer given the initial offset position within the byte buffer at which the current aggregate value + * is stored. + * @return true if a value was added, false otherwise + */ + abstract boolean putAnyValueFromRow(ByteBuffer buf, int position, int startRow, int endRow); + + /** + * @return The primitive object stored at the position in the buffer. + */ + abstract Object getNonNullObject(ByteBuffer buf, int position); + + @Override + public void init(ByteBuffer buf, int position) + { + buf.put(position, replaceWithDefault ? NullHandling.IS_NOT_NULL_BYTE : NullHandling.IS_NULL_BYTE); + initValue(buf, position + FOUND_VALUE_OFFSET); + } + + @Override + public void aggregate(ByteBuffer buf, int position, int startRow, int endRow) + { + if ((buf.get(position) & BYTE_FLAG_FOUND_MASK) != BYTE_FLAG_FOUND_MASK) { + boolean[] nulls = vectorValueSelector.getNullVector(); + // check if there are any nulls + if (nulls != null) { + for (int i = startRow; i < endRow && i < nulls.length; i++) { + // And there is actually a null + if (nulls[i]) { + putNull(buf, position); + return; + } + } + } + // There are no nulls, so try to put a value from the value selector + if (putAnyValueFromRow(buf, position + FOUND_VALUE_OFFSET, startRow, endRow)) { + buf.put(position, (byte) (BYTE_FLAG_FOUND_MASK | NullHandling.IS_NOT_NULL_BYTE)); + } + } + } + + @Override + public void aggregate( + ByteBuffer buf, + int numRows, + int[] positions, + @Nullable int[] rows, + int positionOffset + ) + { + for (int i = 0; i < numRows; i++) { + int position = positions[i] + positionOffset; + int row = rows == null ? i : rows[i]; + aggregate(buf, position, row, row + 1); + } + } + + @Nullable + @Override + public Object get(ByteBuffer buf, int position) + { + final boolean isNull = isValueNull(buf, position); + return isNull ? null : getNonNullObject(buf, position + FOUND_VALUE_OFFSET); + } + + @Override + public void close() + { + // No resources to cleanup. + } + + @VisibleForTesting + boolean isValueNull(ByteBuffer buf, int position) + { + return (buf.get(position) & BYTE_FLAG_NULL_MASK) == NullHandling.IS_NULL_BYTE; + } + + private void putNull(ByteBuffer buf, int position) + { + if (!replaceWithDefault) { + buf.put(position, (byte) (BYTE_FLAG_FOUND_MASK | NullHandling.IS_NULL_BYTE)); + } else { + initValue(buf, position + FOUND_VALUE_OFFSET); + buf.put(position, (byte) (BYTE_FLAG_FOUND_MASK | NullHandling.IS_NOT_NULL_BYTE)); + } + } +} diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/any/NumericNilVectorAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/any/NumericNilVectorAggregator.java new file mode 100644 index 00000000000..3034524c185 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/aggregation/any/NumericNilVectorAggregator.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.any; + +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.query.aggregation.VectorAggregator; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; + +/** + * A vector aggregator that returns the default numeric value. + */ +public class NumericNilVectorAggregator implements VectorAggregator +{ + private static final NumericNilVectorAggregator DOUBLE_NIL_VECTOR_AGGREGATOR = new NumericNilVectorAggregator( + NullHandling.defaultDoubleValue() + ); + + private static final NumericNilVectorAggregator FLOAT_NIL_VECTOR_AGGREGATOR = new NumericNilVectorAggregator( + NullHandling.defaultFloatValue() + ); + + private static final NumericNilVectorAggregator LONG_NIL_VECTOR_AGGREGATOR = new NumericNilVectorAggregator( + NullHandling.defaultLongValue() + ); + + /** + * @return A vectorized aggregator that returns the default double value. + */ + public static NumericNilVectorAggregator doubleNilVectorAggregator() + { + return DOUBLE_NIL_VECTOR_AGGREGATOR; + } + + /** + * @return A vectorized aggregator that returns the default float value. + */ + public static NumericNilVectorAggregator floatNilVectorAggregator() + { + return FLOAT_NIL_VECTOR_AGGREGATOR; + } + + /** + * @return A vectorized aggregator that returns the default long value. + */ + public static NumericNilVectorAggregator longNilVectorAggregator() + { + return LONG_NIL_VECTOR_AGGREGATOR; + } + + @Nullable + private final Object returnValue; + + private NumericNilVectorAggregator(@Nullable Object returnValue) + { + this.returnValue = returnValue; + } + + @Override + public void init(ByteBuffer buf, int position) + { + // Do nothing + } + + @Override + public void aggregate(ByteBuffer buf, int position, int startRow, int endRow) + { + // Do nothing. + } + + @Override + public void aggregate( + ByteBuffer buf, + int numRows, + int[] positions, + @Nullable int[] rows, + int positionOffset + ) + { + // Do nothing. + } + + @Nullable + @Override + public Object get(ByteBuffer buf, int position) + { + return returnValue; + } + + @Override + public void close() + { + // Do nothing. + } +} diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/any/StringAnyAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/any/StringAnyAggregatorFactory.java index 97296ec18d5..3d521313586 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/any/StringAnyAggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/any/StringAnyAggregatorFactory.java @@ -29,8 +29,12 @@ import org.apache.druid.query.aggregation.AggregatorUtil; import org.apache.druid.query.aggregation.BufferAggregator; import org.apache.druid.query.aggregation.first.StringFirstAggregatorFactory; import org.apache.druid.query.cache.CacheKeyBuilder; +import org.apache.druid.query.dimension.DefaultDimensionSpec; +import org.apache.druid.segment.ColumnInspector; import org.apache.druid.segment.ColumnSelectorFactory; +import org.apache.druid.segment.column.ColumnCapabilities; import org.apache.druid.segment.column.ValueType; +import org.apache.druid.segment.vector.VectorColumnSelectorFactory; import javax.annotation.Nullable; import java.util.Collections; @@ -77,6 +81,32 @@ public class StringAnyAggregatorFactory extends AggregatorFactory return new StringAnyBufferAggregator(metricFactory.makeColumnValueSelector(fieldName), maxStringBytes); } + @Override + public StringAnyVectorAggregator factorizeVector(VectorColumnSelectorFactory selectorFactory) + { + + ColumnCapabilities capabilities = selectorFactory.getColumnCapabilities(fieldName); + if (capabilities == null || capabilities.hasMultipleValues().isMaybeTrue()) { + return new StringAnyVectorAggregator( + null, + selectorFactory.makeMultiValueDimensionSelector(DefaultDimensionSpec.of(fieldName)), + maxStringBytes + ); + } else { + return new StringAnyVectorAggregator( + selectorFactory.makeSingleValueDimensionSelector(DefaultDimensionSpec.of(fieldName)), + null, + maxStringBytes + ); + } + } + + @Override + public boolean canVectorize(ColumnInspector columnInspector) + { + return true; + } + @Override public Comparator getComparator() { diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/any/StringAnyVectorAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/any/StringAnyVectorAggregator.java new file mode 100644 index 00000000000..620801bafa3 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/aggregation/any/StringAnyVectorAggregator.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.any; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.query.aggregation.VectorAggregator; +import org.apache.druid.segment.data.IndexedInts; +import org.apache.druid.segment.vector.MultiValueDimensionVectorSelector; +import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; + +public class StringAnyVectorAggregator implements VectorAggregator +{ + private static final int FOUND_AND_NULL_FLAG_VALUE = -1; + @VisibleForTesting + static final int NOT_FOUND_FLAG_VALUE = -2; + @VisibleForTesting + static final int FOUND_VALUE_OFFSET = Integer.BYTES; + + @Nullable + private final SingleValueDimensionVectorSelector singleValueSelector; + @Nullable + private final MultiValueDimensionVectorSelector multiValueSelector; + private final int maxStringBytes; + + public StringAnyVectorAggregator( + SingleValueDimensionVectorSelector singleValueSelector, + MultiValueDimensionVectorSelector multiValueSelector, + int maxStringBytes + ) + { + Preconditions.checkState( + singleValueSelector != null || multiValueSelector != null, + "At least one selector must be non null" + ); + Preconditions.checkState( + singleValueSelector == null || multiValueSelector == null, + "Only one selector must be non null" + ); + this.multiValueSelector = multiValueSelector; + this.singleValueSelector = singleValueSelector; + this.maxStringBytes = maxStringBytes; + } + + @Override + public void init(ByteBuffer buf, int position) + { + buf.putInt(position, NOT_FOUND_FLAG_VALUE); + } + + @Override + public void aggregate(ByteBuffer buf, int position, int startRow, int endRow) + { + if (buf.getInt(position) == NOT_FOUND_FLAG_VALUE && startRow < endRow) { + if (multiValueSelector != null) { + final IndexedInts[] rows = multiValueSelector.getRowVector(); + if (startRow < rows.length) { + IndexedInts row = rows[startRow]; + @Nullable + String foundValue = row.size() == 0 ? null : multiValueSelector.lookupName(row.get(0)); + putValue(buf, position, foundValue); + } + } else if (singleValueSelector != null) { + final int[] rows = singleValueSelector.getRowVector(); + if (startRow < rows.length) { + int row = rows[startRow]; + @Nullable + String foundValue = singleValueSelector.lookupName(row); + putValue(buf, position, foundValue); + } + } + } + } + + @Override + public void aggregate( + ByteBuffer buf, + int numRows, + int[] positions, @Nullable int[] rows, + int positionOffset + ) + { + for (int i = 0; i < numRows; i++) { + int position = positions[i] + positionOffset; + int row = rows == null ? i : rows[i]; + aggregate(buf, position, row, row + 1); + } + } + + @Nullable + @Override + public String get(ByteBuffer buf, int position) + { + ByteBuffer copyBuffer = buf.duplicate(); + copyBuffer.position(position); + int stringSizeBytes = copyBuffer.getInt(); + if (stringSizeBytes >= 0) { + byte[] valueBytes = new byte[stringSizeBytes]; + copyBuffer.get(valueBytes, 0, stringSizeBytes); + return StringUtils.fromUtf8(valueBytes); + } else { + return null; + } + } + + @Override + public void close() + { + // Nothing to close. + } + + private void putValue(ByteBuffer buf, int position, @Nullable String foundValue) + { + if (foundValue != null) { + ByteBuffer mutationBuffer = buf.duplicate(); + mutationBuffer.position(position + FOUND_VALUE_OFFSET); + mutationBuffer.limit(position + FOUND_VALUE_OFFSET + maxStringBytes); + final int len = StringUtils.toUtf8WithLimit(foundValue, mutationBuffer); + mutationBuffer.putInt(position, len); + } else { + buf.putInt(position, FOUND_AND_NULL_FLAG_VALUE); + } + } +} diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/any/DoubleAnyAggregatorFactoryTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/any/DoubleAnyAggregatorFactoryTest.java new file mode 100644 index 00000000000..c923d40c8f9 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/aggregation/any/DoubleAnyAggregatorFactoryTest.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.any; + +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.query.aggregation.VectorAggregator; +import org.apache.druid.segment.ColumnInspector; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.column.ValueType; +import org.apache.druid.segment.vector.VectorColumnSelectorFactory; +import org.apache.druid.segment.vector.VectorValueSelector; +import org.apache.druid.testing.InitializedNullHandlingTest; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Answers; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +import java.nio.ByteBuffer; + +@RunWith(MockitoJUnitRunner.class) +public class DoubleAnyAggregatorFactoryTest extends InitializedNullHandlingTest +{ + private static final String NAME = "NAME"; + private static final String FIELD_NAME = "FIELD_NAME"; + private static final int POSITION = 2; + private static final ByteBuffer BUFFER = ByteBuffer.allocate(128); + + @Mock + private ColumnCapabilities capabilities; + @Mock + private ColumnInspector columnInspector; + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private VectorColumnSelectorFactory selectorFactory; + @Mock + private VectorValueSelector valueSelector; + + private DoubleAnyAggregatorFactory target; + + @Before + public void setUp() + { + Mockito.doReturn(null).when(selectorFactory).getColumnCapabilities(FIELD_NAME); + Mockito.doReturn(valueSelector).when(selectorFactory).makeValueSelector(FIELD_NAME); + target = new DoubleAnyAggregatorFactory(NAME, FIELD_NAME); + } + + @Test + public void canVectorizeShouldReturnTrue() + { + Assert.assertTrue(target.canVectorize(columnInspector)); + } + + @Test + public void factorizeVectorShouldReturnDoubleVectorAggregator() + { + VectorAggregator aggregator = target.factorizeVector(selectorFactory); + Assert.assertNotNull(aggregator); + Assert.assertEquals(DoubleAnyVectorAggregator.class, aggregator.getClass()); + } + + @Test + public void factorizeVectorForNumericTypeShouldReturnDoubleVectorAggregator() + { + Mockito.doReturn(capabilities).when(selectorFactory).getColumnCapabilities(FIELD_NAME); + Mockito.doReturn(ValueType.DOUBLE).when(capabilities).getType(); + VectorAggregator aggregator = target.factorizeVector(selectorFactory); + Assert.assertNotNull(aggregator); + Assert.assertEquals(DoubleAnyVectorAggregator.class, aggregator.getClass()); + } + + @Test + public void factorizeVectorForStringTypeShouldReturnDoubleVectorAggregatorWithNilSelector() + { + Mockito.doReturn(capabilities).when(selectorFactory).getColumnCapabilities(FIELD_NAME); + Mockito.doReturn(ValueType.STRING).when(capabilities).getType(); + VectorAggregator aggregator = target.factorizeVector(selectorFactory); + Assert.assertNotNull(aggregator); + Assert.assertEquals(NullHandling.defaultDoubleValue(), aggregator.get(BUFFER, POSITION)); + } +} diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/any/DoubleAnyVectorAggregatorTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/any/DoubleAnyVectorAggregatorTest.java new file mode 100644 index 00000000000..4fad1eac3af --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/aggregation/any/DoubleAnyVectorAggregatorTest.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.any; + +import org.apache.druid.segment.vector.VectorValueSelector; +import org.apache.druid.testing.InitializedNullHandlingTest; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +import java.nio.ByteBuffer; +import java.util.concurrent.ThreadLocalRandom; + +import static org.mockito.Mockito.spy; + +@RunWith(MockitoJUnitRunner.class) +public class DoubleAnyVectorAggregatorTest extends InitializedNullHandlingTest +{ + private static final int NULL_POSITION = 32; + private static final int POSITION = 2; + private static final double EPSILON = 1e-15; + private static final double[] VALUES = new double[]{7.8d, 11, 23.67, 60, 123}; + + private ByteBuffer buf; + @Mock + private VectorValueSelector selector; + + private DoubleAnyVectorAggregator target; + + @Before + public void setUp() + { + byte[] randomBytes = new byte[128]; + ThreadLocalRandom.current().nextBytes(randomBytes); + buf = ByteBuffer.wrap(randomBytes); + Mockito.doReturn(VALUES).when(selector).getDoubleVector(); + + target = spy(new DoubleAnyVectorAggregator(selector)); + Mockito.when(target.isValueNull(buf, NULL_POSITION)).thenReturn(true); + Mockito.when(target.isValueNull(buf, POSITION)).thenReturn(false); + } + + @Test + public void initValueShouldInitZero() + { + target.initValue(buf, POSITION); + Assert.assertEquals(0, buf.getDouble(POSITION), EPSILON); + } + + @Test + public void getAtPositionIsNullShouldReturnNull() + { + Assert.assertNull(target.get(buf, NULL_POSITION)); + } + + @Test + public void getAtPositionShouldReturnValue() + { + buf.putDouble(POSITION + 1, VALUES[3]); + Assert.assertEquals(VALUES[3], (double) target.get(buf, POSITION), EPSILON); + } + + @Test + public void putValueShouldAddToBuffer() + { + Assert.assertTrue(target.putAnyValueFromRow(buf, POSITION, 2, 3)); + Assert.assertEquals(VALUES[2], buf.getDouble(POSITION), EPSILON); + } + + @Test + public void putValueStartAfterEndShouldNotAddToBuffer() + { + Assert.assertFalse(target.putAnyValueFromRow(buf, POSITION, 2, 2)); + Assert.assertNotEquals(VALUES[2], buf.getDouble(POSITION), EPSILON); + } + + @Test + public void putValueStartOutsideRangeShouldNotAddToBuffer() + { + Assert.assertFalse(target.putAnyValueFromRow(buf, POSITION, 5, 6)); + Assert.assertNotEquals(VALUES[2], buf.getDouble(POSITION), EPSILON); + } +} diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/any/FloatAnyAggregatorFactoryTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/any/FloatAnyAggregatorFactoryTest.java new file mode 100644 index 00000000000..3ae93be2e7f --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/aggregation/any/FloatAnyAggregatorFactoryTest.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.any; + +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.query.aggregation.VectorAggregator; +import org.apache.druid.segment.ColumnInspector; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.column.ValueType; +import org.apache.druid.segment.vector.VectorColumnSelectorFactory; +import org.apache.druid.segment.vector.VectorValueSelector; +import org.apache.druid.testing.InitializedNullHandlingTest; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Answers; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +import java.nio.ByteBuffer; + +@RunWith(MockitoJUnitRunner.class) +public class FloatAnyAggregatorFactoryTest extends InitializedNullHandlingTest +{ + private static final String NAME = "NAME"; + private static final String FIELD_NAME = "FIELD_NAME"; + private static final int POSITION = 2; + private static final ByteBuffer BUFFER = ByteBuffer.allocate(128); + + @Mock + private ColumnCapabilities capabilities; + @Mock + private ColumnInspector columnInspector; + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private VectorColumnSelectorFactory selectorFactory; + @Mock + private VectorValueSelector valueSelector; + + private FloatAnyAggregatorFactory target; + + @Before + public void setUp() + { + Mockito.doReturn(null).when(selectorFactory).getColumnCapabilities(FIELD_NAME); + Mockito.doReturn(valueSelector).when(selectorFactory).makeValueSelector(FIELD_NAME); + target = new FloatAnyAggregatorFactory(NAME, FIELD_NAME); + } + + @Test + public void canVectorizeShouldReturnTrue() + { + Assert.assertTrue(target.canVectorize(columnInspector)); + } + + @Test + public void factorizeVectorShouldReturnFloatVectorAggregator() + { + VectorAggregator aggregator = target.factorizeVector(selectorFactory); + Assert.assertNotNull(aggregator); + Assert.assertEquals(FloatAnyVectorAggregator.class, aggregator.getClass()); + } + + @Test + public void factorizeVectorForNumericTypeShouldReturnFloatVectorAggregator() + { + Mockito.doReturn(capabilities).when(selectorFactory).getColumnCapabilities(FIELD_NAME); + Mockito.doReturn(ValueType.FLOAT).when(capabilities).getType(); + VectorAggregator aggregator = target.factorizeVector(selectorFactory); + Assert.assertNotNull(aggregator); + Assert.assertEquals(FloatAnyVectorAggregator.class, aggregator.getClass()); + } + + @Test + public void factorizeVectorForStringTypeShouldReturnFloatVectorAggregatorWithNilSelector() + { + Mockito.doReturn(capabilities).when(selectorFactory).getColumnCapabilities(FIELD_NAME); + Mockito.doReturn(ValueType.STRING).when(capabilities).getType(); + VectorAggregator aggregator = target.factorizeVector(selectorFactory); + Assert.assertNotNull(aggregator); + Assert.assertEquals(NullHandling.defaultFloatValue(), aggregator.get(BUFFER, POSITION)); + } +} diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/any/FloatAnyVectorAggregatorTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/any/FloatAnyVectorAggregatorTest.java new file mode 100644 index 00000000000..cdf5a87c263 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/aggregation/any/FloatAnyVectorAggregatorTest.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.any; + +import org.apache.druid.segment.vector.VectorValueSelector; +import org.apache.druid.testing.InitializedNullHandlingTest; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +import java.nio.ByteBuffer; +import java.util.concurrent.ThreadLocalRandom; + +import static org.mockito.Mockito.spy; + +@RunWith(MockitoJUnitRunner.class) +public class FloatAnyVectorAggregatorTest extends InitializedNullHandlingTest +{ + private static final int NULL_POSITION = 32; + private static final int POSITION = 2; + private static final double EPSILON = 1e-15; + private static final float[] VALUES = new float[]{7.8f, 11, 23.67f, 60, 123}; + + private ByteBuffer buf; + @Mock + private VectorValueSelector selector; + + private FloatAnyVectorAggregator target; + + @Before + public void setUp() + { + byte[] randomBytes = new byte[128]; + ThreadLocalRandom.current().nextBytes(randomBytes); + buf = ByteBuffer.wrap(randomBytes); + Mockito.doReturn(VALUES).when(selector).getFloatVector(); + + target = spy(new FloatAnyVectorAggregator(selector)); + Mockito.when(target.isValueNull(buf, NULL_POSITION)).thenReturn(true); + Mockito.when(target.isValueNull(buf, POSITION)).thenReturn(false); + } + + @Test + public void initValueShouldInitZero() + { + target.initValue(buf, POSITION); + Assert.assertEquals(0, buf.getFloat(POSITION), EPSILON); + } + + @Test + public void getAtPositionIsNullShouldReturnNull() + { + Assert.assertNull(target.get(buf, NULL_POSITION)); + } + + @Test + public void getAtPositionShouldReturnValue() + { + buf.putFloat(POSITION + 1, VALUES[3]); + Assert.assertEquals(VALUES[3], (float) target.get(buf, POSITION), EPSILON); + } + + @Test + public void putValueShouldAddToBuffer() + { + Assert.assertTrue(target.putAnyValueFromRow(buf, POSITION, 2, 3)); + Assert.assertEquals(VALUES[2], buf.getFloat(POSITION), EPSILON); + } + + @Test + public void putValueStartAfterEndShouldNotAddToBuffer() + { + Assert.assertFalse(target.putAnyValueFromRow(buf, POSITION, 2, 2)); + Assert.assertNotEquals(VALUES[2], buf.getFloat(POSITION), EPSILON); + } + + @Test + public void putValueStartOutsideRangeShouldNotAddToBuffer() + { + Assert.assertFalse(target.putAnyValueFromRow(buf, POSITION, 5, 6)); + Assert.assertNotEquals(VALUES[2], buf.getFloat(POSITION), EPSILON); + } +} diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/any/LongAnyAggregatorFactoryTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/any/LongAnyAggregatorFactoryTest.java new file mode 100644 index 00000000000..d55b5ed039b --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/aggregation/any/LongAnyAggregatorFactoryTest.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.any; + +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.query.aggregation.VectorAggregator; +import org.apache.druid.segment.ColumnInspector; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.column.ValueType; +import org.apache.druid.segment.vector.VectorColumnSelectorFactory; +import org.apache.druid.segment.vector.VectorValueSelector; +import org.apache.druid.testing.InitializedNullHandlingTest; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Answers; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +import java.nio.ByteBuffer; + +@RunWith(MockitoJUnitRunner.class) +public class LongAnyAggregatorFactoryTest extends InitializedNullHandlingTest +{ + private static final String NAME = "NAME"; + private static final String FIELD_NAME = "FIELD_NAME"; + private static final int POSITION = 2; + private static final ByteBuffer BUFFER = ByteBuffer.allocate(128); + + @Mock + private ColumnCapabilities capabilities; + @Mock + private ColumnInspector columnInspector; + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private VectorColumnSelectorFactory selectorFactory; + @Mock + private VectorValueSelector valueSelector; + + private LongAnyAggregatorFactory target; + + @Before + public void setUp() + { + Mockito.doReturn(null).when(selectorFactory).getColumnCapabilities(FIELD_NAME); + Mockito.doReturn(valueSelector).when(selectorFactory).makeValueSelector(FIELD_NAME); + target = new LongAnyAggregatorFactory(NAME, FIELD_NAME); + } + + @Test + public void canVectorizeShouldReturnTrue() + { + Assert.assertTrue(target.canVectorize(columnInspector)); + } + + @Test + public void factorizeVectorShouldReturnLongVectorAggregator() + { + VectorAggregator aggregator = target.factorizeVector(selectorFactory); + Assert.assertNotNull(aggregator); + Assert.assertEquals(LongAnyVectorAggregator.class, aggregator.getClass()); + } + + @Test + public void factorizeVectorWithNumericColumnShouldReturnLongVectorAggregator() + { + Mockito.doReturn(capabilities).when(selectorFactory).getColumnCapabilities(FIELD_NAME); + Mockito.doReturn(ValueType.LONG).when(capabilities).getType(); + VectorAggregator aggregator = target.factorizeVector(selectorFactory); + Assert.assertNotNull(aggregator); + Assert.assertEquals(LongAnyVectorAggregator.class, aggregator.getClass()); + } + + @Test + public void factorizeVectorForStringTypeShouldReturnLongVectorAggregatorWithNilSelector() + { + Mockito.doReturn(capabilities).when(selectorFactory).getColumnCapabilities(FIELD_NAME); + Mockito.doReturn(ValueType.STRING).when(capabilities).getType(); + VectorAggregator aggregator = target.factorizeVector(selectorFactory); + Assert.assertNotNull(aggregator); + Assert.assertEquals(NullHandling.defaultLongValue(), aggregator.get(BUFFER, POSITION)); + } +} diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/any/LongAnyVectorAggregatorTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/any/LongAnyVectorAggregatorTest.java new file mode 100644 index 00000000000..a243fb1ed1a --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/aggregation/any/LongAnyVectorAggregatorTest.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.any; + +import org.apache.druid.segment.vector.VectorValueSelector; +import org.apache.druid.testing.InitializedNullHandlingTest; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +import java.nio.ByteBuffer; +import java.util.concurrent.ThreadLocalRandom; + +import static org.mockito.Mockito.spy; + +@RunWith(MockitoJUnitRunner.class) +public class LongAnyVectorAggregatorTest extends InitializedNullHandlingTest +{ + private static final int NULL_POSITION = 32; + private static final int POSITION = 2; + private static final long[] VALUES = new long[]{7L, 11, -892587293, 60, 123}; + + private ByteBuffer buf; + @Mock + private VectorValueSelector selector; + + private LongAnyVectorAggregator target; + + @Before + public void setUp() + { + byte[] randomBytes = new byte[128]; + ThreadLocalRandom.current().nextBytes(randomBytes); + buf = ByteBuffer.wrap(randomBytes); + Mockito.doReturn(VALUES).when(selector).getLongVector(); + + target = spy(new LongAnyVectorAggregator(selector)); + Mockito.when(target.isValueNull(buf, NULL_POSITION)).thenReturn(true); + Mockito.when(target.isValueNull(buf, POSITION)).thenReturn(false); + } + + @Test + public void initValueShouldInitZero() + { + target.initValue(buf, POSITION); + Assert.assertEquals(0, buf.getLong(POSITION)); + } + + @Test + public void getAtPositionIsNullShouldReturnNull() + { + Assert.assertNull(target.get(buf, NULL_POSITION)); + } + + @Test + public void getAtPositionShouldReturnValue() + { + buf.putLong(POSITION + 1, VALUES[3]); + Assert.assertEquals(VALUES[3], (long) target.get(buf, POSITION)); + } + + @Test + public void putValueShouldAddToBuffer() + { + Assert.assertTrue(target.putAnyValueFromRow(buf, POSITION, 2, 3)); + Assert.assertEquals(VALUES[2], buf.getLong(POSITION)); + } + + @Test + public void putValueStartAfterEndShouldNotAddToBuffer() + { + Assert.assertFalse(target.putAnyValueFromRow(buf, POSITION, 2, 2)); + Assert.assertNotEquals(VALUES[2], buf.getLong(POSITION)); + } + + @Test + public void putValueStartOutsideRangeShouldNotAddToBuffer() + { + Assert.assertFalse(target.putAnyValueFromRow(buf, POSITION, 5, 6)); + Assert.assertNotEquals(VALUES[2], buf.getLong(POSITION)); + } +} diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/any/NumericAnyVectorAggregatorTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/any/NumericAnyVectorAggregatorTest.java new file mode 100644 index 00000000000..cdc716a0739 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/aggregation/any/NumericAnyVectorAggregatorTest.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.any; + +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.segment.vector.VectorValueSelector; +import org.apache.druid.testing.InitializedNullHandlingTest; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +import java.nio.ByteBuffer; +import java.util.concurrent.ThreadLocalRandom; + +import static org.apache.druid.query.aggregation.any.NumericAnyVectorAggregator.BYTE_FLAG_FOUND_MASK; + +@RunWith(MockitoJUnitRunner.class) +public class NumericAnyVectorAggregatorTest extends InitializedNullHandlingTest +{ + private static final int NULL_POSITION = 10; + private static final int BUFFER_SIZE = 128; + private static final long FOUND_OBJECT = 23; + private static final int POSITION = 2; + private static final boolean[] NULLS = new boolean[] {false, false, true, false}; + + private ByteBuffer buf; + @Mock + private VectorValueSelector selector; + + private NumericAnyVectorAggregator target; + + @Before + public void setUp() + { + Mockito.doReturn(NULLS).when(selector).getNullVector(); + target = Mockito.spy(new NumericAnyVectorAggregator(selector) + { + @Override + void initValue(ByteBuffer buf, int position) + { + /* Do nothing. */ + } + + @Override + boolean putAnyValueFromRow(ByteBuffer buf, int position, int startRow, int endRow) + { + boolean isRowsWithinIndex = startRow < endRow && startRow < NULLS.length; + if (isRowsWithinIndex) { + buf.putLong(position, startRow); + } + return isRowsWithinIndex; + } + + @Override + Object getNonNullObject(ByteBuffer buf, int position) + { + if (position == POSITION + 1) { + return FOUND_OBJECT; + } + return -1; + } + + }); + byte[] randomBuffer = new byte[BUFFER_SIZE]; + ThreadLocalRandom.current().nextBytes(randomBuffer); + buf = ByteBuffer.wrap(randomBuffer); + clearBufferForPositions(0, POSITION); + buf.put(NULL_POSITION, (byte) 0x01); + } + + @Test + public void initShouldSetDoubleAfterPositionToZero() + { + target.init(buf, POSITION); + Assert.assertEquals(0, buf.get(POSITION) & BYTE_FLAG_FOUND_MASK); + Assert.assertEquals( + NullHandling.sqlCompatible() ? NullHandling.IS_NULL_BYTE : NullHandling.IS_NOT_NULL_BYTE, + buf.get(POSITION) + ); + } + + @Test + public void aggregateNotFoundAndHasNullsShouldPutNull() + { + target.aggregate(buf, POSITION, 0, 3); + if (NullHandling.sqlCompatible()) { + Assert.assertEquals(BYTE_FLAG_FOUND_MASK | NullHandling.IS_NULL_BYTE, buf.get(POSITION)); + } else { + Assert.assertEquals(BYTE_FLAG_FOUND_MASK | NullHandling.IS_NOT_NULL_BYTE, buf.get(POSITION)); + } + } + + @Test + public void aggregateNotFoundAndHasNullsOutsideRangeShouldPutValue() + { + target.aggregate(buf, POSITION, 0, 1); + Assert.assertEquals(BYTE_FLAG_FOUND_MASK | NullHandling.IS_NOT_NULL_BYTE, buf.get(POSITION)); + } + + @Test + public void aggregateNotFoundAndNoNullsShouldPutValue() + { + Mockito.doReturn(null).when(selector).getNullVector(); + target.aggregate(buf, POSITION, 0, 3); + Assert.assertEquals(BYTE_FLAG_FOUND_MASK | NullHandling.IS_NOT_NULL_BYTE, buf.get(POSITION)); + } + + @Test + public void aggregateNotFoundNoNullsAndValuesOutsideRangeShouldNotPutValue() + { + Mockito.doReturn(null).when(selector).getNullVector(); + target.aggregate(buf, POSITION, NULLS.length, NULLS.length + 1); + Assert.assertNotEquals(BYTE_FLAG_FOUND_MASK, (buf.get(POSITION) & BYTE_FLAG_FOUND_MASK)); + } + + @Test + public void aggregateBatchNoRowsShouldAggregateAllRows() + { + Mockito.doReturn(null).when(selector).getNullVector(); + int[] positions = new int[] {0, 43, 70}; + int positionOffset = 2; + clearBufferForPositions(positionOffset, positions); + target.aggregate(buf, 3, positions, null, positionOffset); + for (int i = 0; i < positions.length; i++) { + int position = positions[i] + positionOffset; + Assert.assertEquals( + BYTE_FLAG_FOUND_MASK | NullHandling.IS_NOT_NULL_BYTE, + buf.get(position) & (byte) (BYTE_FLAG_FOUND_MASK | NullHandling.IS_NOT_NULL_BYTE) + ); + Assert.assertEquals(i, buf.getLong(position + 1)); + } + } + + @Test + public void aggregateBatchWithRowsShouldAggregateAllRows() + { + Mockito.doReturn(null).when(selector).getNullVector(); + int[] positions = new int[] {0, 43, 70}; + int positionOffset = 2; + clearBufferForPositions(positionOffset, positions); + int[] rows = new int[] {3, 2, 0}; + target.aggregate(buf, 3, positions, rows, positionOffset); + for (int i = 0; i < positions.length; i++) { + int position = positions[i] + positionOffset; + Assert.assertEquals( + BYTE_FLAG_FOUND_MASK | NullHandling.IS_NOT_NULL_BYTE, + buf.get(position) & (byte) (BYTE_FLAG_FOUND_MASK | NullHandling.IS_NOT_NULL_BYTE) + ); + Assert.assertEquals(rows[i], buf.getLong(position + 1)); + } + } + + @Test + public void aggregateFoundShouldDoNothing() + { + long previous = buf.getLong(POSITION + 1); + buf.put(POSITION, BYTE_FLAG_FOUND_MASK); + target.aggregate(buf, POSITION, 0, 3); + Assert.assertEquals(previous, buf.getLong(POSITION + 1)); + } + + @Test + public void getNullShouldReturnNull() + { + Assert.assertNull(target.get(buf, NULL_POSITION)); + } + + @Test + public void getNotNullShouldReturnValue() + { + Assert.assertEquals(FOUND_OBJECT, target.get(buf, POSITION)); + } + + @Test + public void isValueNull() + { + buf.put(POSITION, (byte) 4); + Assert.assertFalse(target.isValueNull(buf, POSITION)); + buf.put(POSITION, (byte) 3); + Assert.assertTrue(target.isValueNull(buf, POSITION)); + } + + private void clearBufferForPositions(int offset, int... positions) + { + for (int position : positions) { + buf.put(position + offset, (byte) 0); + } + } +} diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/any/StringAnyAggregatorFactoryTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/any/StringAnyAggregatorFactoryTest.java new file mode 100644 index 00000000000..d666e5ed5bb --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/aggregation/any/StringAnyAggregatorFactoryTest.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.any; + +import org.apache.druid.segment.ColumnInspector; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.vector.MultiValueDimensionVectorSelector; +import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector; +import org.apache.druid.segment.vector.VectorColumnSelectorFactory; +import org.apache.druid.testing.InitializedNullHandlingTest; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +import static org.mockito.ArgumentMatchers.any; + +@RunWith(MockitoJUnitRunner.class) +public class StringAnyAggregatorFactoryTest extends InitializedNullHandlingTest +{ + private static final String NAME = "NAME"; + private static final String FIELD_NAME = "FIELD_NAME"; + private static final int MAX_STRING_BYTES = 10; + + @Mock + private ColumnInspector columnInspector; + @Mock + private ColumnCapabilities capabilities; + @Mock + private VectorColumnSelectorFactory vectorSelectorFactory; + @Mock + private SingleValueDimensionVectorSelector singleValueDimensionVectorSelector; + @Mock + private MultiValueDimensionVectorSelector multiValueDimensionVectorSelector; + + private StringAnyAggregatorFactory target; + + @Before + public void setUp() + { + Mockito.doReturn(capabilities).when(vectorSelectorFactory).getColumnCapabilities(FIELD_NAME); + Mockito.doReturn(ColumnCapabilities.Capable.UNKNOWN).when(capabilities).hasMultipleValues(); + target = new StringAnyAggregatorFactory(NAME, FIELD_NAME, MAX_STRING_BYTES); + } + + @Test + public void canVectorizeWithoutCapabilitiesShouldReturnTrue() + { + Assert.assertTrue(target.canVectorize(columnInspector)); + } + + @Test + public void factorizeVectorWithoutCapabilitiesShouldReturnAggregatorWithMultiDimensionSelector() + { + Mockito.doReturn(null).when(vectorSelectorFactory).getColumnCapabilities(FIELD_NAME); + Mockito.doReturn(multiValueDimensionVectorSelector) + .when(vectorSelectorFactory).makeMultiValueDimensionSelector(any()); + StringAnyVectorAggregator aggregator = target.factorizeVector(vectorSelectorFactory); + Assert.assertNotNull(aggregator); + } + + @Test + public void factorizeVectorWithUnknownCapabilitiesShouldReturnAggregatorWithMultiDimensionSelector() + { + Mockito.doReturn(multiValueDimensionVectorSelector) + .when(vectorSelectorFactory).makeMultiValueDimensionSelector(any()); + StringAnyVectorAggregator aggregator = target.factorizeVector(vectorSelectorFactory); + Assert.assertNotNull(aggregator); + } + + @Test + public void factorizeVectorWithMultipleValuesCapabilitiesShouldReturnAggregatorWithMultiDimensionSelector() + { + Mockito.doReturn(ColumnCapabilities.Capable.TRUE).when(capabilities).hasMultipleValues(); + Mockito.doReturn(multiValueDimensionVectorSelector) + .when(vectorSelectorFactory).makeMultiValueDimensionSelector(any()); + StringAnyVectorAggregator aggregator = target.factorizeVector(vectorSelectorFactory); + Assert.assertNotNull(aggregator); + } + + @Test + public void factorizeVectorWithoutMultipleValuesCapabilitiesShouldReturnAggregatorWithSingleDimensionSelector() + { + Mockito.doReturn(ColumnCapabilities.Capable.FALSE).when(capabilities).hasMultipleValues(); + Mockito.doReturn(singleValueDimensionVectorSelector) + .when(vectorSelectorFactory).makeSingleValueDimensionSelector(any()); + StringAnyVectorAggregator aggregator = target.factorizeVector(vectorSelectorFactory); + Assert.assertNotNull(aggregator); + } +} diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/any/StringAnyVectorAggregatorTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/any/StringAnyVectorAggregatorTest.java new file mode 100644 index 00000000000..bb9ca74dfb4 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/aggregation/any/StringAnyVectorAggregatorTest.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.any; + +import org.apache.druid.segment.data.ArrayBasedIndexedInts; +import org.apache.druid.segment.data.IndexedInts; +import org.apache.druid.segment.vector.MultiValueDimensionVectorSelector; +import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector; +import org.apache.druid.testing.InitializedNullHandlingTest; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +import java.nio.ByteBuffer; +import java.util.concurrent.ThreadLocalRandom; + +import static org.apache.druid.query.aggregation.any.StringAnyVectorAggregator.NOT_FOUND_FLAG_VALUE; +import static org.mockito.ArgumentMatchers.anyInt; + +@RunWith(MockitoJUnitRunner.class) +public class StringAnyVectorAggregatorTest extends InitializedNullHandlingTest +{ + private static final int MAX_STRING_BYTES = 32; + private static final int BUFFER_SIZE = 1024; + private static final int POSITION = 2; + private static final IndexedInts[] MULTI_VALUE_ROWS = new IndexedInts[]{ + new ArrayBasedIndexedInts(new int[]{1, 0}), + new ArrayBasedIndexedInts(new int[]{1}), + new ArrayBasedIndexedInts(), + new ArrayBasedIndexedInts(new int[]{2}) + }; + private static final int[] SINGLE_VALUE_ROWS = new int[]{1, 1, 3, 2}; + private static final String[] DICTIONARY = new String[]{"Zero", "One", "TwoThisStringIsLongerThanThirtyTwoBytes"}; + + private ByteBuffer buf; + @Mock + private SingleValueDimensionVectorSelector singleValueSelector; + @Mock + private MultiValueDimensionVectorSelector multiValueSelector; + + private StringAnyVectorAggregator singleValueTarget; + private StringAnyVectorAggregator multiValueTarget; + + @Before + public void setUp() + { + Mockito.doReturn(MULTI_VALUE_ROWS).when(multiValueSelector).getRowVector(); + Mockito.doAnswer(invocation -> DICTIONARY[(int) invocation.getArgument(0)]) + .when(multiValueSelector).lookupName(anyInt()); + Mockito.doReturn(SINGLE_VALUE_ROWS).when(singleValueSelector).getRowVector(); + Mockito.doAnswer(invocation -> { + int index = invocation.getArgument(0); + return index >= DICTIONARY.length ? null : DICTIONARY[index]; + }).when(singleValueSelector).lookupName(anyInt()); + initializeRandomBuffer(); + singleValueTarget = new StringAnyVectorAggregator(singleValueSelector, null, MAX_STRING_BYTES); + multiValueTarget = new StringAnyVectorAggregator(null, multiValueSelector, MAX_STRING_BYTES); + } + + @Test(expected = IllegalStateException.class) + public void initWithBothSingleAndMultiValueSelectorShouldThrowException() + { + new StringAnyVectorAggregator(singleValueSelector, multiValueSelector, MAX_STRING_BYTES); + } + + @Test(expected = IllegalStateException.class) + public void initWithNeitherSingleNorMultiValueSelectorShouldThrowException() + { + new StringAnyVectorAggregator(null, null, MAX_STRING_BYTES); + } + + @Test + public void initSingleValueTargetShouldMarkPositionAsNotFound() + { + singleValueTarget.init(buf, POSITION + 1); + Assert.assertEquals(NOT_FOUND_FLAG_VALUE, buf.getInt(POSITION + 1)); + } + + @Test + public void initMultiValueTargetShouldMarkPositionAsNotFound() + { + multiValueTarget.init(buf, POSITION + 1); + Assert.assertEquals(NOT_FOUND_FLAG_VALUE, buf.getInt(POSITION + 1)); + } + + @Test + public void aggregatePositionNotFoundShouldPutFirstValue() + { + singleValueTarget.aggregate(buf, POSITION, 0, 2); + Assert.assertEquals(DICTIONARY[1], singleValueTarget.get(buf, POSITION)); + } + + @Test + public void aggregateEmptyShouldPutNull() + { + singleValueTarget.aggregate(buf, POSITION, 2, 3); + Assert.assertNull(singleValueTarget.get(buf, POSITION)); + } + + @Test + public void aggregateMultiValuePositionNotFoundShouldPutFirstValue() + { + multiValueTarget.aggregate(buf, POSITION, 0, 2); + Assert.assertEquals(DICTIONARY[1], multiValueTarget.get(buf, POSITION)); + } + + @Test + public void aggregateMultiValueEmptyShouldPutNull() + { + multiValueTarget.aggregate(buf, POSITION, 2, 3); + Assert.assertNull(multiValueTarget.get(buf, POSITION)); + } + + @Test + public void aggregateValueLongerThanLimitShouldPutTruncatedValue() + { + singleValueTarget.aggregate(buf, POSITION, 3, 4); + Assert.assertEquals(DICTIONARY[2].substring(0, 32), singleValueTarget.get(buf, POSITION)); + } + + @Test + public void aggregateBatchNoRowsShouldAggregateAllRows() + { + int[] positions = new int[] {0, 43, 100}; + int positionOffset = 2; + clearBufferForPositions(positionOffset, positions); + singleValueTarget.aggregate(buf, 3, positions, null, positionOffset); + for (int i = 0; i < positions.length; i++) { + int position = positions[i] + positionOffset; + Assert.assertEquals(singleValueSelector.lookupName(SINGLE_VALUE_ROWS[i]), singleValueTarget.get(buf, position)); + } + } + + @Test + public void aggregateBatchWithRowsShouldAggregateAllRows() + { + int[] positions = new int[] {0, 43, 100}; + int positionOffset = 2; + int[] rows = new int[] {2, 1, 0}; + clearBufferForPositions(positionOffset, positions); + multiValueTarget.aggregate(buf, 3, positions, rows, positionOffset); + for (int i = 0; i < positions.length; i++) { + int position = positions[i] + positionOffset; + int row = rows[i]; + IndexedInts rowIndex = MULTI_VALUE_ROWS[row]; + if (rowIndex.size() == 0) { + Assert.assertNull(multiValueTarget.get(buf, position)); + } else { + Assert.assertEquals(multiValueSelector.lookupName(rowIndex.get(0)), multiValueTarget.get(buf, position)); + } + } + } + + private void initializeRandomBuffer() + { + byte[] randomBuffer = new byte[BUFFER_SIZE]; + ThreadLocalRandom.current().nextBytes(randomBuffer); + buf = ByteBuffer.wrap(randomBuffer); + clearBufferForPositions(0, POSITION); + } + + private void clearBufferForPositions(int offset, int... positions) + { + for (int position : positions) { + buf.putInt(position + offset, NOT_FOUND_FLAG_VALUE); + } + } +} diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java index d1e9c700190..eb7fa4ceed3 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java @@ -1910,7 +1910,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest @Test public void testAnyAggregator() throws Exception { - // Cannot vectorize ANY aggregator. + // Cannot vectorize virtual expressions. skipVectorize(); testQuery( @@ -1951,9 +1951,6 @@ public class CalciteQueryTest extends BaseCalciteQueryTest @Test public void testAnyAggregatorsOnHeapNumericNulls() throws Exception { - // Cannot vectorize ANY aggregator. - skipVectorize(); - testQuery( "SELECT ANY_VALUE(l1), ANY_VALUE(d1), ANY_VALUE(f1) FROM druid.numfoo", ImmutableList.of( @@ -1982,8 +1979,6 @@ public class CalciteQueryTest extends BaseCalciteQueryTest @Test public void testAnyAggregatorsOffHeapNumericNulls() throws Exception { - // Cannot vectorize ANY aggregator. - skipVectorize(); testQuery( "SELECT ANY_VALUE(l1), ANY_VALUE(d1), ANY_VALUE(f1) FROM druid.numfoo GROUP BY dim2", ImmutableList.of( @@ -2203,9 +2198,12 @@ public class CalciteQueryTest extends BaseCalciteQueryTest @Test public void testPrimitiveAnyInSubquery() throws Exception { - // Cannot vectorize ANY aggregator. - skipVectorize(); - + // The grouping works like this + // dim2 -> m1 | m2 + // a -> [1,4] | [1,4] + // null -> [2,3,6] | [2,3,6] + // abc -> [5] | [5] + // So the acceptable response can be any combination of these values testQuery( "SELECT SUM(val1), SUM(val2), SUM(val3) FROM (SELECT dim2, ANY_VALUE(m1) AS val1, ANY_VALUE(cnt) AS val2, ANY_VALUE(m2) AS val3 FROM foo GROUP BY dim2)", ImmutableList.of( @@ -2251,9 +2249,6 @@ public class CalciteQueryTest extends BaseCalciteQueryTest @Test public void testStringAnyInSubquery() throws Exception { - // Cannot vectorize ANY aggregator. - skipVectorize(); - testQuery( "SELECT SUM(val) FROM (SELECT dim2, ANY_VALUE(dim1, 10) AS val FROM foo GROUP BY dim2)", ImmutableList.of( @@ -2408,9 +2403,6 @@ public class CalciteQueryTest extends BaseCalciteQueryTest @Test public void testAnyAggregatorsDoesNotSkipNulls() throws Exception { - // Cannot vectorize ANY aggregator. - skipVectorize(); - testQuery( "SELECT ANY_VALUE(dim1, 32), ANY_VALUE(l2), ANY_VALUE(d2), ANY_VALUE(f2) FROM druid.numfoo", ImmutableList.of( @@ -2439,9 +2431,6 @@ public class CalciteQueryTest extends BaseCalciteQueryTest @Test public void testAnyAggregatorsSkipNullsWithFilter() throws Exception { - // Cannot vectorize ANY aggregator. - skipVectorize(); - final DimFilter filter; if (useDefault) { filter = not(selector("dim1", null, null)); @@ -2767,8 +2756,6 @@ public class CalciteQueryTest extends BaseCalciteQueryTest @Test public void testOrderByAnyFloat() throws Exception { - // Cannot vectorize ANY aggregator. - skipVectorize(); List expected; if (NullHandling.replaceWithDefault()) { expected = ImmutableList.of( @@ -2817,8 +2804,6 @@ public class CalciteQueryTest extends BaseCalciteQueryTest @Test public void testOrderByAnyDouble() throws Exception { - // Cannot vectorize ANY aggregator. - skipVectorize(); List expected; if (NullHandling.replaceWithDefault()) { expected = ImmutableList.of( @@ -2866,8 +2851,6 @@ public class CalciteQueryTest extends BaseCalciteQueryTest @Test public void testOrderByAnyLong() throws Exception { - // Cannot vectorize ANY aggregator. - skipVectorize(); List expected; if (NullHandling.replaceWithDefault()) { expected = ImmutableList.of( diff --git a/website/.spelling b/website/.spelling index d9698fb54ee..75a9497a845 100644 --- a/website/.spelling +++ b/website/.spelling @@ -538,6 +538,7 @@ MeanNoNulls P1D cycleSize doubleMax +doubleAny doubleMean doubleMeanNoNulls doubleMin @@ -545,6 +546,7 @@ doubleSum druid.generic.useDefaultValueForNull limitSpec longMax +longAny longMean longMeanNoNulls longMin @@ -1107,6 +1109,7 @@ signum str1 str2 string_to_array +stringAny strlen strpos timestamp_ceil @@ -1672,6 +1675,7 @@ file.encoding fillCapacity first_location floatMax +floatAny floatMin floatSum freeSpacePercent