From 8088a763a69e54cca9b4c1d350e0b68b1eccd243 Mon Sep 17 00:00:00 2001 From: Soumyava <93540295+somu-imply@users.noreply.github.com> Date: Tue, 5 Sep 2023 08:41:42 -0700 Subject: [PATCH] Vectorize earliest aggregator for both numeric and string types (#14408) * Vectorizing earliest for numeric * Vectorizing earliest string aggregator * checkstyle fix * Removing unnecessary exceptions * Ignoring tests in MSQ as earliest is not supported for numeric there * Fixing benchmarks * Updating tests as MSQ does not support earliest for some cases * Addressing review comments by adding the following: 1. Checking capabilities first before creating selectors 2. Removing mockito in tests for numeric first aggs 3. Removing unnecessary tests * Addressing issues for dictionary encoded single string columns where we can use the dictionary ids instead of the entire string * Adding a flag for multi value dimension selector * Addressing comments * 1 more change * Handling review comments part 1 * Handling review comments and correctness fix for latest_by when the time expression need not be in sorted order * Updating numeric first vector agg * Revert "Updating numeric first vector agg" This reverts commit 429170990192883e51812311c49d2e461e6db732. * Updating code for correctness issues * fixing an issue with latest agg * Adding more comments and removing an unnecessary check * Addressing null checks for tie selector and only vectorize false for quantile sketches --- .../query/SqlExpressionBenchmark.java | 16 +- .../queries/wikipedia_editstream_queries.json | 18 +- .../first/DoubleFirstAggregatorFactory.java | 28 ++ .../first/DoubleFirstVectorAggregator.java | 61 +++++ .../first/FloatFirstAggregatorFactory.java | 25 ++ .../first/FloatFirstVectorAggregator.java | 61 +++++ .../first/LongFirstAggregatorFactory.java | 26 ++ .../first/LongFirstVectorAggregator.java | 60 +++++ .../first/NumericFirstVectorAggregator.java | 174 ++++++++++++ ...eStringFirstDimensionVectorAggregator.java | 125 +++++++++ .../first/StringFirstAggregatorFactory.java | 64 +++++ .../first/StringFirstVectorAggregator.java | 185 +++++++++++++ .../last/StringLastVectorAggregator.java | 4 +- .../DoubleFirstVectorAggregationTest.java | 246 +++++++++++++++++ .../FloatFirstVectorAggregationTest.java | 250 ++++++++++++++++++ .../first/LongFirstVectorAggregationTest.java | 241 +++++++++++++++++ .../StringFirstVectorAggregatorTest.java | 167 ++++++++++++ .../query/groupby/GroupByQueryRunnerTest.java | 9 - .../timeseries/TimeseriesQueryRunnerTest.java | 6 - .../druid/sql/calcite/CalciteQueryTest.java | 91 +++++-- 20 files changed, 1813 insertions(+), 44 deletions(-) create mode 100644 processing/src/main/java/org/apache/druid/query/aggregation/first/DoubleFirstVectorAggregator.java create mode 100644 processing/src/main/java/org/apache/druid/query/aggregation/first/FloatFirstVectorAggregator.java create mode 100644 processing/src/main/java/org/apache/druid/query/aggregation/first/LongFirstVectorAggregator.java create mode 100644 processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstVectorAggregator.java create mode 100644 processing/src/main/java/org/apache/druid/query/aggregation/first/SingleStringFirstDimensionVectorAggregator.java create mode 100644 processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstVectorAggregator.java create mode 100644 processing/src/test/java/org/apache/druid/query/aggregation/first/DoubleFirstVectorAggregationTest.java create mode 100644 processing/src/test/java/org/apache/druid/query/aggregation/first/FloatFirstVectorAggregationTest.java create mode 100644 processing/src/test/java/org/apache/druid/query/aggregation/first/LongFirstVectorAggregationTest.java create mode 100644 processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstVectorAggregatorTest.java diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlExpressionBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlExpressionBenchmark.java index 6f96a2cde56..498a9c2bdac 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlExpressionBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlExpressionBenchmark.java @@ -197,7 +197,7 @@ public class SqlExpressionBenchmark "SELECT TIME_SHIFT(MILLIS_TO_TIMESTAMP(long4), 'PT1H', 1), string2, SUM(long1 * double4) FROM foo GROUP BY 1,2 ORDER BY 3", // 37: time shift + expr agg (group by), uniform distribution high cardinality "SELECT TIME_SHIFT(MILLIS_TO_TIMESTAMP(long5), 'PT1H', 1), string2, SUM(long1 * double4) FROM foo GROUP BY 1,2 ORDER BY 3", - // 38: LATEST aggregator + // 38: LATEST aggregator long "SELECT LATEST(long1) FROM foo", // 39: LATEST aggregator double "SELECT LATEST(double4) FROM foo", @@ -207,7 +207,13 @@ public class SqlExpressionBenchmark "SELECT LATEST(float3), LATEST(long1), LATEST(double4) FROM foo", // 42,43: filter numeric nulls "SELECT SUM(long5) FROM foo WHERE long5 IS NOT NULL", - "SELECT string2, SUM(long5) FROM foo WHERE long5 IS NOT NULL GROUP BY 1" + "SELECT string2, SUM(long5) FROM foo WHERE long5 IS NOT NULL GROUP BY 1", + // 44: EARLIEST aggregator long + "SELECT EARLIEST(long1) FROM foo", + // 45: EARLIEST aggregator double + "SELECT EARLIEST(double4) FROM foo", + // 46: EARLIEST aggregator float + "SELECT EARLIEST(float3) FROM foo" ); @Param({"5000000"}) @@ -265,7 +271,11 @@ public class SqlExpressionBenchmark "40", "41", "42", - "43" + "43", + "44", + "45", + "46", + "47" }) private String query; diff --git a/integration-tests/src/test/resources/queries/wikipedia_editstream_queries.json b/integration-tests/src/test/resources/queries/wikipedia_editstream_queries.json index 0d0290d2324..4cb4c0ec485 100644 --- a/integration-tests/src/test/resources/queries/wikipedia_editstream_queries.json +++ b/integration-tests/src/test/resources/queries/wikipedia_editstream_queries.json @@ -119,7 +119,8 @@ } ], "context": { - "useCache": "true", + "useCache": "true", + "vectorize": "false", "populateCache": "true", "timeout": 360000 } @@ -270,7 +271,8 @@ } ], "context": { - "useCache": "true", + "useCache": "true", + "vectorize": "false", "populateCache": "true", "timeout": 360000 } @@ -514,7 +516,8 @@ "metric": "unique_users", "threshold": 3, "context": { - "useCache": "true", + "useCache": "true", + "vectorize": "false", "populateCache": "true", "timeout": 360000 } @@ -693,7 +696,8 @@ "metric": "count", "threshold": 3, "context": { - "useCache": "true", + "useCache": "true", + "vectorize": "false", "populateCache": "true", "timeout": 360000 } @@ -878,7 +882,8 @@ "metric": "count", "threshold": 3, "context": { - "useCache": "true", + "useCache": "true", + "vectorize": "false", "populateCache": "true", "timeout": 360000 } @@ -1243,7 +1248,8 @@ "orderBy": ["robot", "namespace"] }, "context": { - "useCache": "true", + "useCache": "true", + "vectorize": "false", "populateCache": "true", "timeout": 360000 } diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/DoubleFirstAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/DoubleFirstAggregatorFactory.java index d3b0e444953..7cee6f5ca64 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/first/DoubleFirstAggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/DoubleFirstAggregatorFactory.java @@ -29,14 +29,21 @@ import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.AggregatorUtil; import org.apache.druid.query.aggregation.BufferAggregator; +import org.apache.druid.query.aggregation.VectorAggregator; +import org.apache.druid.query.aggregation.any.NumericNilVectorAggregator; import org.apache.druid.query.cache.CacheKeyBuilder; import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; import org.apache.druid.segment.BaseDoubleColumnValueSelector; +import org.apache.druid.segment.ColumnInspector; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.segment.NilColumnValueSelector; +import org.apache.druid.segment.column.ColumnCapabilities; import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.Types; +import org.apache.druid.segment.vector.VectorColumnSelectorFactory; +import org.apache.druid.segment.vector.VectorValueSelector; import javax.annotation.Nullable; import java.nio.ByteBuffer; @@ -96,6 +103,12 @@ public class DoubleFirstAggregatorFactory extends AggregatorFactory this.storeDoubleAsFloat = ColumnHolder.storeDoubleAsFloat(); } + @Override + public boolean canVectorize(ColumnInspector columnInspector) + { + return true; + } + @Override public Aggregator factorize(ColumnSelectorFactory metricFactory) { @@ -124,6 +137,21 @@ public class DoubleFirstAggregatorFactory extends AggregatorFactory } } + @Override + public VectorAggregator factorizeVector( + VectorColumnSelectorFactory columnSelectorFactory + ) + { + ColumnCapabilities capabilities = columnSelectorFactory.getColumnCapabilities(fieldName); + if (Types.isNumeric(capabilities)) { + VectorValueSelector valueSelector = columnSelectorFactory.makeValueSelector(fieldName); + VectorValueSelector timeSelector = columnSelectorFactory.makeValueSelector( + timeColumn); + return new DoubleFirstVectorAggregator(timeSelector, valueSelector); + } + return NumericNilVectorAggregator.doubleNilVectorAggregator(); + } + @Override public Comparator getComparator() { diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/DoubleFirstVectorAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/DoubleFirstVectorAggregator.java new file mode 100644 index 00000000000..562f14547a6 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/DoubleFirstVectorAggregator.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.first; + +import org.apache.druid.collections.SerializablePair; +import org.apache.druid.segment.vector.VectorValueSelector; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; + +public class DoubleFirstVectorAggregator extends NumericFirstVectorAggregator +{ + + public DoubleFirstVectorAggregator(VectorValueSelector timeSelector, VectorValueSelector valueSelector) + { + super(timeSelector, valueSelector); + } + + @Override + public void initValue(ByteBuffer buf, int position) + { + buf.putDouble(position, 0); + } + + + @Override + void putValue(ByteBuffer buf, int position, int index) + { + double firstValue = valueSelector.getDoubleVector()[index]; + buf.putDouble(position, firstValue); + } + + + /** + * @return The object as a pair with the position and the value stored at the position in the buffer. + */ + @Nullable + @Override + public Object get(ByteBuffer buf, int position) + { + final boolean rhsNull = isValueNull(buf, position); + return new SerializablePair<>(buf.getLong(position), rhsNull ? null : buf.getDouble(position + VALUE_OFFSET)); + } +} diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/FloatFirstAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/FloatFirstAggregatorFactory.java index 809cc8183ed..68826bc2c0a 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/first/FloatFirstAggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/FloatFirstAggregatorFactory.java @@ -29,14 +29,21 @@ import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.AggregatorUtil; import org.apache.druid.query.aggregation.BufferAggregator; +import org.apache.druid.query.aggregation.VectorAggregator; +import org.apache.druid.query.aggregation.any.NumericNilVectorAggregator; import org.apache.druid.query.cache.CacheKeyBuilder; import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; import org.apache.druid.segment.BaseFloatColumnValueSelector; +import org.apache.druid.segment.ColumnInspector; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.segment.NilColumnValueSelector; +import org.apache.druid.segment.column.ColumnCapabilities; import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.Types; +import org.apache.druid.segment.vector.VectorColumnSelectorFactory; +import org.apache.druid.segment.vector.VectorValueSelector; import javax.annotation.Nullable; import java.nio.ByteBuffer; @@ -122,6 +129,24 @@ public class FloatFirstAggregatorFactory extends AggregatorFactory } } + @Override + public VectorAggregator factorizeVector(VectorColumnSelectorFactory columnSelectorFactory) + { + ColumnCapabilities capabilities = columnSelectorFactory.getColumnCapabilities(fieldName); + if (Types.isNumeric(capabilities)) { + VectorValueSelector valueSelector = columnSelectorFactory.makeValueSelector(fieldName); + VectorValueSelector timeSelector = columnSelectorFactory.makeValueSelector(timeColumn); + return new FloatFirstVectorAggregator(timeSelector, valueSelector); + } + return NumericNilVectorAggregator.floatNilVectorAggregator(); + } + + @Override + public boolean canVectorize(ColumnInspector columnInspector) + { + return true; + } + @Override public Comparator getComparator() { diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/FloatFirstVectorAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/FloatFirstVectorAggregator.java new file mode 100644 index 00000000000..7c8c4e76f6b --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/FloatFirstVectorAggregator.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.first; + +import org.apache.druid.collections.SerializablePair; +import org.apache.druid.segment.vector.VectorValueSelector; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; + +public class FloatFirstVectorAggregator extends NumericFirstVectorAggregator +{ + + public FloatFirstVectorAggregator(VectorValueSelector timeSelector, VectorValueSelector valueSelector) + { + super(timeSelector, valueSelector); + } + + @Override + public void initValue(ByteBuffer buf, int position) + { + buf.putFloat(position, 0); + } + + + @Override + void putValue(ByteBuffer buf, int position, int index) + { + float firstValue = valueSelector.getFloatVector()[index]; + buf.putFloat(position, firstValue); + } + + + /** + * @return The object as a pair with the position and the value stored at the position in the buffer. + */ + @Nullable + @Override + public Object get(ByteBuffer buf, int position) + { + final boolean rhsNull = isValueNull(buf, position); + return new SerializablePair<>(buf.getLong(position), rhsNull ? null : buf.getFloat(position + VALUE_OFFSET)); + } +} diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/LongFirstAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/LongFirstAggregatorFactory.java index c767cf0e09c..729a1bef26e 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/first/LongFirstAggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/LongFirstAggregatorFactory.java @@ -29,14 +29,21 @@ import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.AggregatorUtil; import org.apache.druid.query.aggregation.BufferAggregator; +import org.apache.druid.query.aggregation.VectorAggregator; +import org.apache.druid.query.aggregation.any.NumericNilVectorAggregator; import org.apache.druid.query.cache.CacheKeyBuilder; import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; import org.apache.druid.segment.BaseLongColumnValueSelector; +import org.apache.druid.segment.ColumnInspector; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.segment.NilColumnValueSelector; +import org.apache.druid.segment.column.ColumnCapabilities; import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.Types; +import org.apache.druid.segment.vector.VectorColumnSelectorFactory; +import org.apache.druid.segment.vector.VectorValueSelector; import javax.annotation.Nullable; import java.nio.ByteBuffer; @@ -121,6 +128,25 @@ public class LongFirstAggregatorFactory extends AggregatorFactory } } + @Override + public VectorAggregator factorizeVector(VectorColumnSelectorFactory columnSelectorFactory) + { + ColumnCapabilities capabilities = columnSelectorFactory.getColumnCapabilities(fieldName); + if (Types.isNumeric(capabilities)) { + VectorValueSelector valueSelector = columnSelectorFactory.makeValueSelector(fieldName); + VectorValueSelector timeSelector = columnSelectorFactory.makeValueSelector( + timeColumn); + return new LongFirstVectorAggregator(timeSelector, valueSelector); + } + return NumericNilVectorAggregator.longNilVectorAggregator(); + } + + @Override + public boolean canVectorize(ColumnInspector columnInspector) + { + return true; + } + @Override public Comparator getComparator() { diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/LongFirstVectorAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/LongFirstVectorAggregator.java new file mode 100644 index 00000000000..769b148b5e4 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/LongFirstVectorAggregator.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.first; + +import org.apache.druid.collections.SerializablePair; +import org.apache.druid.segment.vector.VectorValueSelector; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; + +public class LongFirstVectorAggregator extends NumericFirstVectorAggregator +{ + public LongFirstVectorAggregator(VectorValueSelector timeSelector, VectorValueSelector valueSelector) + { + super(timeSelector, valueSelector); + } + + @Override + public void initValue(ByteBuffer buf, int position) + { + buf.putLong(position, 0); + } + + + @Override + void putValue(ByteBuffer buf, int position, int index) + { + long firstValue = valueSelector.getLongVector()[index]; + buf.putLong(position, firstValue); + } + + + /** + * @return The object as a pair with the position and the value stored at the position in the buffer. + */ + @Nullable + @Override + public Object get(ByteBuffer buf, int position) + { + final boolean rhsNull = isValueNull(buf, position); + return new SerializablePair<>(buf.getLong(position), rhsNull ? null : buf.getLong(position + VALUE_OFFSET)); + } +} diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstVectorAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstVectorAggregator.java new file mode 100644 index 00000000000..7fcd10352da --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstVectorAggregator.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.first; + +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.query.aggregation.VectorAggregator; +import org.apache.druid.segment.vector.VectorValueSelector; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; + +/** + * Class for vectorized version of first/earliest aggregator over numeric types + */ +public abstract class NumericFirstVectorAggregator implements VectorAggregator +{ + static final int NULL_OFFSET = Long.BYTES; + static final int VALUE_OFFSET = NULL_OFFSET + Byte.BYTES; + final VectorValueSelector valueSelector; + private final boolean useDefault = NullHandling.replaceWithDefault(); + private final VectorValueSelector timeSelector; + private long firstTime; + + public NumericFirstVectorAggregator(VectorValueSelector timeSelector, VectorValueSelector valueSelector) + { + this.timeSelector = timeSelector; + this.valueSelector = valueSelector; + firstTime = Long.MAX_VALUE; + } + + @Override + public void init(ByteBuffer buf, int position) + { + buf.putLong(position, Long.MAX_VALUE); + buf.put(position + NULL_OFFSET, useDefault ? NullHandling.IS_NOT_NULL_BYTE : NullHandling.IS_NULL_BYTE); + initValue(buf, position + VALUE_OFFSET); + } + + @Override + public void aggregate(ByteBuffer buf, int position, int startRow, int endRow) + { + final long[] timeVector = timeSelector.getLongVector(); + final boolean[] nullTimeVector = timeSelector.getNullVector(); + final boolean[] nullValueVector = valueSelector.getNullVector(); + firstTime = buf.getLong(position); + + // the time vector is already sorted + // if earliest is on the default time dimension + // but if earliest uses earliest_by it might use a secondary timestamp + // which is not sorted. For correctness, we need to go over all elements. + // A possible optimization here is to have 2 paths one for earliest where + // we can take advantage of the sorted nature of time + // and the earliest_by where we have to go over all elements. + int index; + + for (int i = startRow; i < endRow; i++) { + index = i; + if (nullTimeVector != null && nullTimeVector[index]) { + continue; + } + final long earliestTime = timeVector[index]; + if (earliestTime >= firstTime) { + continue; + } + firstTime = earliestTime; + if (useDefault || nullValueVector == null || !nullValueVector[index]) { + updateTimeWithValue(buf, position, firstTime, index); + } else { + updateTimeWithNull(buf, position, firstTime); + } + } + } + + /** + * + * Checks if the aggregated value at a position in the buffer is null or not + * + * @param buf byte buffer storing the byte array representation of the aggregate + * @param position offset within the byte buffer at which the current aggregate value is stored + * @return + */ + boolean isValueNull(ByteBuffer buf, int position) + { + return buf.get(position + NULL_OFFSET) == NullHandling.IS_NULL_BYTE; + } + + @Override + public void aggregate( + ByteBuffer buf, + int numRows, + int[] positions, + @Nullable int[] rows, + int positionOffset + ) + { + boolean[] nulls = useDefault ? null : valueSelector.getNullVector(); + long[] timeVector = timeSelector.getLongVector(); + + for (int i = 0; i < numRows; i++) { + int position = positions[i] + positionOffset; + int row = rows == null ? i : rows[i]; + long firstTime = buf.getLong(position); + if (timeVector[row] < firstTime) { + if (useDefault || nulls == null || !nulls[row]) { + updateTimeWithValue(buf, position, timeVector[row], row); + } else { + updateTimeWithNull(buf, position, timeVector[row]); + } + } + } + } + + /** + * Updates the time and the non null values to the appropriate position in buffer + * + * @param buf byte buffer storing the byte array representation of the aggregate + * @param position offset within the byte buffer at which the current aggregate value is stored + * @param time the time to be updated in the buffer as the last time + * @param index the index of the vectorized vector which is the last value + */ + void updateTimeWithValue(ByteBuffer buf, int position, long time, int index) + { + buf.putLong(position, time); + buf.put(position + NULL_OFFSET, NullHandling.IS_NOT_NULL_BYTE); + putValue(buf, position + VALUE_OFFSET, index); + } + + /** + * Updates the time only to the appropriate position in buffer as the value is null + * + * @param buf byte buffer storing the byte array representation of the aggregate + * @param position offset within the byte buffer at which the current aggregate value is stored + * @param time the time to be updated in the buffer as the last time + */ + void updateTimeWithNull(ByteBuffer buf, int position, long time) + { + buf.putLong(position, time); + buf.put(position + NULL_OFFSET, NullHandling.IS_NULL_BYTE); + } + + /** + *Abstract function which needs to be overridden by subclasses to set the initial value + */ + abstract void initValue(ByteBuffer buf, int position); + + /** + * Abstract function which needs to be overridden by subclasses to set the + * latest value in the buffer depending on the datatype + */ + abstract void putValue(ByteBuffer buf, int position, int index); + + @Override + public void close() + { + // no resources to cleanup + } +} diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/SingleStringFirstDimensionVectorAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/SingleStringFirstDimensionVectorAggregator.java new file mode 100644 index 00000000000..d5aa31444e0 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/SingleStringFirstDimensionVectorAggregator.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.first; + +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.query.aggregation.SerializablePairLongString; +import org.apache.druid.query.aggregation.VectorAggregator; +import org.apache.druid.segment.vector.BaseLongVectorValueSelector; +import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; + +public class SingleStringFirstDimensionVectorAggregator implements VectorAggregator +{ + private final BaseLongVectorValueSelector timeSelector; + private final SingleValueDimensionVectorSelector valueDimensionVectorSelector; + private long firstTime; + private final int maxStringBytes; + private final boolean useDefault = NullHandling.replaceWithDefault(); + + public SingleStringFirstDimensionVectorAggregator( + BaseLongVectorValueSelector timeSelector, + SingleValueDimensionVectorSelector valueDimensionVectorSelector, + int maxStringBytes + ) + { + this.timeSelector = timeSelector; + this.valueDimensionVectorSelector = valueDimensionVectorSelector; + this.maxStringBytes = maxStringBytes; + this.firstTime = Long.MAX_VALUE; + } + + @Override + public void init(ByteBuffer buf, int position) + { + buf.putLong(position, Long.MAX_VALUE); + buf.put( + position + NumericFirstVectorAggregator.NULL_OFFSET, + useDefault ? NullHandling.IS_NOT_NULL_BYTE : NullHandling.IS_NULL_BYTE + ); + buf.putLong(position + NumericFirstVectorAggregator.VALUE_OFFSET, 0); + } + + @Override + public void aggregate(ByteBuffer buf, int position, int startRow, int endRow) + { + final long[] timeVector = timeSelector.getLongVector(); + final boolean[] nullTimeVector = timeSelector.getNullVector(); + final int[] valueVector = valueDimensionVectorSelector.getRowVector(); + firstTime = buf.getLong(position); + int index; + + long earliestTime; + for (index = startRow; index < endRow; index++) { + if (nullTimeVector != null && nullTimeVector[index]) { + continue; + } + earliestTime = timeVector[index]; + if (earliestTime < firstTime) { + firstTime = earliestTime; + buf.putLong(position, firstTime); + buf.put(position + NumericFirstVectorAggregator.NULL_OFFSET, NullHandling.IS_NOT_NULL_BYTE); + buf.putInt(position + NumericFirstVectorAggregator.VALUE_OFFSET, valueVector[index]); + } + } + } + + @Override + public void aggregate(ByteBuffer buf, int numRows, int[] positions, @Nullable int[] rows, int positionOffset) + { + final long[] timeVector = timeSelector.getLongVector(); + final boolean[] nullTimeVector = timeSelector.getNullVector(); + final int[] values = valueDimensionVectorSelector.getRowVector(); + for (int i = 0; i < numRows; i++) { + if (nullTimeVector != null && nullTimeVector[i]) { + continue; + } + int position = positions[i] + positionOffset; + int row = rows == null ? i : rows[i]; + long firstTime = buf.getLong(position); + if (timeVector[row] < firstTime) { + firstTime = timeVector[row]; + buf.putLong(position, firstTime); + buf.put(position + NumericFirstVectorAggregator.NULL_OFFSET, NullHandling.IS_NOT_NULL_BYTE); + buf.putInt(position + NumericFirstVectorAggregator.VALUE_OFFSET, values[row]); + } + } + + } + + @Nullable + @Override + public Object get(ByteBuffer buf, int position) + { + int index = buf.getInt(position + NumericFirstVectorAggregator.VALUE_OFFSET); + long earliest = buf.getLong(position); + String strValue = valueDimensionVectorSelector.lookupName(index); + return new SerializablePairLongString(earliest, StringUtils.chop(strValue, maxStringBytes)); + } + + @Override + public void close() + { + // nothing to close + } +} diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregatorFactory.java index 63e48e878a8..33dafa86109 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregatorFactory.java @@ -32,12 +32,21 @@ import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.AggregatorUtil; import org.apache.druid.query.aggregation.BufferAggregator; import org.apache.druid.query.aggregation.SerializablePairLongString; +import org.apache.druid.query.aggregation.VectorAggregator; import org.apache.druid.query.cache.CacheKeyBuilder; +import org.apache.druid.query.dimension.DefaultDimensionSpec; import org.apache.druid.segment.BaseObjectColumnValueSelector; +import org.apache.druid.segment.ColumnInspector; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.NilColumnValueSelector; +import org.apache.druid.segment.column.ColumnCapabilities; import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.ValueType; +import org.apache.druid.segment.vector.BaseLongVectorValueSelector; +import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector; +import org.apache.druid.segment.vector.VectorColumnSelectorFactory; +import org.apache.druid.segment.vector.VectorObjectSelector; import javax.annotation.Nullable; import java.nio.ByteBuffer; @@ -66,6 +75,7 @@ public class StringFirstAggregatorFactory extends AggregatorFactory } }; + private static final BufferAggregator NIL_BUFFER_AGGREGATOR = new StringFirstBufferAggregator( NilColumnValueSelector.instance(), NilColumnValueSelector.instance(), @@ -80,6 +90,25 @@ public class StringFirstAggregatorFactory extends AggregatorFactory } }; + public static final VectorAggregator NIL_VECTOR_AGGREGATOR = new StringFirstVectorAggregator( + null, + null, + 0 + ) + { + @Override + public void aggregate(ByteBuffer buf, int position, int startRow, int endRow) + { + // no-op + } + + @Override + public void aggregate(ByteBuffer buf, int numRows, int[] positions, @Nullable int[] rows, int positionOffset) + { + // no-op + } + }; + public static final int DEFAULT_MAX_STRING_SIZE = 1024; public static final Comparator TIME_COMPARATOR = (o1, o2) -> Longs.compare( @@ -121,6 +150,7 @@ public class StringFirstAggregatorFactory extends AggregatorFactory : maxStringBytes; } + @Override public Aggregator factorize(ColumnSelectorFactory metricFactory) { @@ -153,6 +183,40 @@ public class StringFirstAggregatorFactory extends AggregatorFactory } } + @Override + public VectorAggregator factorizeVector(VectorColumnSelectorFactory selectorFactory) + { + BaseLongVectorValueSelector timeSelector = (BaseLongVectorValueSelector) selectorFactory.makeValueSelector( + timeColumn); + ColumnCapabilities capabilities = selectorFactory.getColumnCapabilities(fieldName); + if (capabilities != null) { + if (capabilities.is(ValueType.STRING) && capabilities.isDictionaryEncoded().isTrue()) { + // Case 1: Single value string with dimension selector + // For multivalue string we need to iterate a list of indexedInts which is also similar to iterating + // over elements for an ARRAY typed column. These two which requires an iteration will be done together. + if (!capabilities.hasMultipleValues().isTrue()) { + SingleValueDimensionVectorSelector sSelector = selectorFactory.makeSingleValueDimensionSelector( + DefaultDimensionSpec.of( + fieldName)); + return new SingleStringFirstDimensionVectorAggregator(timeSelector, sSelector, maxStringBytes); + } + } + } + // Case 2: return vector object selector + VectorObjectSelector vSelector = selectorFactory.makeObjectSelector(fieldName); + if (capabilities != null) { + return new StringFirstVectorAggregator(timeSelector, vSelector, maxStringBytes); + } else { + return NIL_VECTOR_AGGREGATOR; + } + } + + @Override + public boolean canVectorize(ColumnInspector columnInspector) + { + return true; + } + @Override public Comparator getComparator() { diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstVectorAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstVectorAggregator.java new file mode 100644 index 00000000000..7082b4c3dd0 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstVectorAggregator.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.first; + +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.query.aggregation.SerializablePairLongString; +import org.apache.druid.query.aggregation.VectorAggregator; +import org.apache.druid.segment.DimensionHandlerUtils; +import org.apache.druid.segment.vector.BaseLongVectorValueSelector; +import org.apache.druid.segment.vector.VectorObjectSelector; +import org.apache.druid.segment.vector.VectorValueSelector; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; + +public class StringFirstVectorAggregator implements VectorAggregator +{ + private static final SerializablePairLongString INIT = new SerializablePairLongString( + DateTimes.MAX.getMillis(), + null + ); + private final VectorValueSelector timeSelector; + private final VectorObjectSelector valueSelector; + private final int maxStringBytes; + + + public StringFirstVectorAggregator( + BaseLongVectorValueSelector timeSelector, + VectorObjectSelector valueSelector, + int maxStringBytes + ) + { + this.timeSelector = timeSelector; + this.valueSelector = valueSelector; + this.maxStringBytes = maxStringBytes; + } + + @Override + public void init(ByteBuffer buf, int position) + { + StringFirstLastUtils.writePair(buf, position, INIT, maxStringBytes); + } + + @Override + public void aggregate(ByteBuffer buf, int position, int startRow, int endRow) + { + if (timeSelector == null) { + return; + } + final long[] times = timeSelector.getLongVector(); + final boolean[] nullTimeVector = timeSelector.getNullVector(); + final Object[] objectsWhichMightBeStrings = valueSelector.getObjectVector(); + long firstTime = buf.getLong(position); + int index; + for (int i = startRow; i < endRow; i++) { + if (times[i] > firstTime) { + continue; + } + if (nullTimeVector != null && nullTimeVector[i]) { + continue; + } + index = i; + final boolean foldNeeded = StringFirstLastUtils.objectNeedsFoldCheck(objectsWhichMightBeStrings[index]); + if (foldNeeded) { + final SerializablePairLongString inPair = StringFirstLastUtils.readPairFromVectorSelectorsAtIndex( + timeSelector, + valueSelector, + index + ); + if (inPair != null) { + firstTime = buf.getLong(position); + if (inPair.lhs < firstTime) { + StringFirstLastUtils.writePair( + buf, + position, + new SerializablePairLongString(inPair.lhs, inPair.rhs), + maxStringBytes + ); + } + } + } else { + final long time = times[index]; + if (time < firstTime) { + final String value = DimensionHandlerUtils.convertObjectToString(objectsWhichMightBeStrings[index]); + firstTime = time; + StringFirstLastUtils.writePair( + buf, + position, + new SerializablePairLongString(time, value), + maxStringBytes + ); + } + } + } + + } + + @Override + public void aggregate(ByteBuffer buf, int numRows, int[] positions, @Nullable int[] rows, int positionOffset) + { + final long[] timeVector = timeSelector.getLongVector(); + final boolean[] nullTimeVector = timeSelector.getNullVector(); + final Object[] objectsWhichMightBeStrings = valueSelector.getObjectVector(); + + // iterate once over the object vector to find first non null element and + // determine if the type is Pair or not + boolean foldNeeded = false; + for (Object obj : objectsWhichMightBeStrings) { + if (obj == null) { + continue; + } else { + foldNeeded = StringFirstLastUtils.objectNeedsFoldCheck(obj); + break; + } + } + + for (int i = 0; i < numRows; i++) { + if (nullTimeVector != null && nullTimeVector[i]) { + continue; + } + int position = positions[i] + positionOffset; + int row = rows == null ? i : rows[i]; + long firstTime = buf.getLong(position); + if (timeVector[row] < firstTime) { + if (foldNeeded) { + final SerializablePairLongString inPair = StringFirstLastUtils.readPairFromVectorSelectorsAtIndex( + timeSelector, + valueSelector, + row + ); + if (inPair != null) { + if (inPair.lhs < firstTime) { + StringFirstLastUtils.writePair( + buf, + position, + new SerializablePairLongString(inPair.lhs, inPair.rhs), + maxStringBytes + ); + } + } + } else { + final String value = DimensionHandlerUtils.convertObjectToString(objectsWhichMightBeStrings[row]); + firstTime = timeVector[row]; + StringFirstLastUtils.writePair( + buf, + position, + new SerializablePairLongString(firstTime, value), + maxStringBytes + ); + } + } + } + + } + + @Nullable + @Override + public Object get(ByteBuffer buf, int position) + { + return StringFirstLastUtils.readPair(buf, position); + } + + @Override + public void close() + { + // nothing to close + } +} diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastVectorAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastVectorAggregator.java index a9c0b1e9ade..a18a1d4c963 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastVectorAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastVectorAggregator.java @@ -73,8 +73,8 @@ public class StringLastVectorAggregator implements VectorAggregator if (objectsWhichMightBeStrings[i] == null) { continue; } - if (times[i] < lastTime) { - break; + if (times[i] <= lastTime) { + continue; } index = i; final boolean foldNeeded = StringFirstLastUtils.objectNeedsFoldCheck(objectsWhichMightBeStrings[index]); diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/first/DoubleFirstVectorAggregationTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/first/DoubleFirstVectorAggregationTest.java new file mode 100644 index 00000000000..49c15fa22a5 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/aggregation/first/DoubleFirstVectorAggregationTest.java @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.first; + +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.query.aggregation.VectorAggregator; +import org.apache.druid.query.dimension.DimensionSpec; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.column.ColumnCapabilitiesImpl; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.vector.BaseDoubleVectorValueSelector; +import org.apache.druid.segment.vector.BaseLongVectorValueSelector; +import org.apache.druid.segment.vector.MultiValueDimensionVectorSelector; +import org.apache.druid.segment.vector.NoFilterVectorOffset; +import org.apache.druid.segment.vector.ReadableVectorInspector; +import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector; +import org.apache.druid.segment.vector.VectorColumnSelectorFactory; +import org.apache.druid.segment.vector.VectorObjectSelector; +import org.apache.druid.segment.vector.VectorValueSelector; +import org.apache.druid.testing.InitializedNullHandlingTest; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; +import java.util.concurrent.ThreadLocalRandom; + +public class DoubleFirstVectorAggregationTest extends InitializedNullHandlingTest +{ + private static final double EPSILON = 1e-5; + private static final double[] VALUES = new double[]{7.8d, 11, 23.67, 60}; + private static final boolean[] NULLS = new boolean[]{false, false, true, false}; + private long[] times = {2436, 6879, 7888, 8224}; + + private static final String NAME = "NAME"; + private static final String FIELD_NAME = "FIELD_NAME"; + private static final String TIME_COL = "__time"; + + private VectorValueSelector selector; + + private BaseLongVectorValueSelector timeSelector; + private ByteBuffer buf; + + private DoubleFirstVectorAggregator target; + + private DoubleFirstAggregatorFactory doubleFirstAggregatorFactory; + + private VectorColumnSelectorFactory selectorFactory; + + @Before + public void setup() + { + byte[] randomBytes = new byte[1024]; + ThreadLocalRandom.current().nextBytes(randomBytes); + buf = ByteBuffer.wrap(randomBytes); + timeSelector = new BaseLongVectorValueSelector(new NoFilterVectorOffset(times.length, 0, times.length) + { + }) + { + @Override + public long[] getLongVector() + { + return times; + } + + @Nullable + @Override + public boolean[] getNullVector() + { + return NULLS; + } + }; + selector = new BaseDoubleVectorValueSelector(new NoFilterVectorOffset(VALUES.length, 0, VALUES.length) + { + + }) + { + @Override + public double[] getDoubleVector() + { + return VALUES; + } + + @Nullable + @Override + public boolean[] getNullVector() + { + if (!NullHandling.replaceWithDefault()) { + return NULLS; + } + return null; + } + }; + + target = new DoubleFirstVectorAggregator(timeSelector, selector); + clearBufferForPositions(0, 0); + selectorFactory = new VectorColumnSelectorFactory() + { + @Override + public ReadableVectorInspector getReadableVectorInspector() + { + return null; + } + + @Override + public SingleValueDimensionVectorSelector makeSingleValueDimensionSelector(DimensionSpec dimensionSpec) + { + return null; + } + + @Override + public MultiValueDimensionVectorSelector makeMultiValueDimensionSelector(DimensionSpec dimensionSpec) + { + return null; + } + + @Override + public VectorValueSelector makeValueSelector(String column) + { + if (TIME_COL.equals(column)) { + return timeSelector; + } else if (FIELD_NAME.equals(column)) { + return selector; + } else { + return null; + } + } + + @Override + public VectorObjectSelector makeObjectSelector(String column) + { + return null; + } + + @Nullable + @Override + public ColumnCapabilities getColumnCapabilities(String column) + { + if (FIELD_NAME.equals(column)) { + return ColumnCapabilitiesImpl.createSimpleNumericColumnCapabilities(ColumnType.DOUBLE); + } + return null; + } + }; + + doubleFirstAggregatorFactory = new DoubleFirstAggregatorFactory(NAME, FIELD_NAME, TIME_COL); + } + + @Test + public void testFactory() + { + Assert.assertTrue(doubleFirstAggregatorFactory.canVectorize(selectorFactory)); + VectorAggregator vectorAggregator = doubleFirstAggregatorFactory.factorizeVector(selectorFactory); + Assert.assertNotNull(vectorAggregator); + Assert.assertEquals(DoubleFirstVectorAggregator.class, vectorAggregator.getClass()); + } + + @Test + public void initValueShouldInitZero() + { + target.initValue(buf, 0); + double initVal = buf.getDouble(0); + Assert.assertEquals(0, initVal, EPSILON); + } + + @Test + public void aggregate() + { + target.aggregate(buf, 0, 0, VALUES.length); + Pair result = (Pair) target.get(buf, 0); + Assert.assertEquals(times[0], result.lhs.longValue()); + Assert.assertEquals(VALUES[0], result.rhs, EPSILON); + } + + @Test + public void aggregateWithNulls() + { + target.aggregate(buf, 0, 0, VALUES.length); + Pair result = (Pair) target.get(buf, 0); + Assert.assertEquals(times[0], result.lhs.longValue()); + Assert.assertEquals(VALUES[0], result.rhs, EPSILON); + } + + @Test + public void aggregateBatchWithoutRows() + { + int[] positions = new int[]{0, 43, 70}; + int positionOffset = 2; + clearBufferForPositions(positionOffset, positions); + target.aggregate(buf, 3, positions, null, positionOffset); + for (int i = 0; i < positions.length; i++) { + Pair result = (Pair) target.get(buf, positions[i] + positionOffset); + Assert.assertEquals(times[i], result.lhs.longValue()); + if (!NullHandling.replaceWithDefault() && NULLS[i]) { + Assert.assertNull(result.rhs); + } else { + Assert.assertEquals(VALUES[i], result.rhs, EPSILON); + } + } + } + + @Test + public void aggregateBatchWithRows() + { + int[] positions = new int[]{0, 43, 70}; + int[] rows = new int[]{3, 2, 0}; + int positionOffset = 2; + clearBufferForPositions(positionOffset, positions); + target.aggregate(buf, 3, positions, rows, positionOffset); + for (int i = 0; i < positions.length; i++) { + Pair result = (Pair) target.get(buf, positions[i] + positionOffset); + Assert.assertEquals(times[rows[i]], result.lhs.longValue()); + if (!NullHandling.replaceWithDefault() && NULLS[rows[i]]) { + Assert.assertNull(result.rhs); + } else { + Assert.assertEquals(VALUES[rows[i]], result.rhs, EPSILON); + } + } + } + + private void clearBufferForPositions(int offset, int... positions) + { + for (int position : positions) { + target.init(buf, offset + position); + } + } +} diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/first/FloatFirstVectorAggregationTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/first/FloatFirstVectorAggregationTest.java new file mode 100644 index 00000000000..6b02037824a --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/aggregation/first/FloatFirstVectorAggregationTest.java @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.first; + +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.query.aggregation.VectorAggregator; +import org.apache.druid.query.dimension.DimensionSpec; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.column.ColumnCapabilitiesImpl; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.vector.BaseFloatVectorValueSelector; +import org.apache.druid.segment.vector.BaseLongVectorValueSelector; +import org.apache.druid.segment.vector.MultiValueDimensionVectorSelector; +import org.apache.druid.segment.vector.NoFilterVectorOffset; +import org.apache.druid.segment.vector.ReadableVectorInspector; +import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector; +import org.apache.druid.segment.vector.VectorColumnSelectorFactory; +import org.apache.druid.segment.vector.VectorObjectSelector; +import org.apache.druid.segment.vector.VectorValueSelector; +import org.apache.druid.testing.InitializedNullHandlingTest; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; +import java.util.concurrent.ThreadLocalRandom; + +public class FloatFirstVectorAggregationTest extends InitializedNullHandlingTest +{ + private static final double EPSILON = 1e-5; + private static final float[] VALUES = new float[]{7.2f, 15.6f, 2.1f, 150.0f}; + private static final boolean[] NULLS = new boolean[]{false, false, true, false}; + private long[] times = {2436, 6879, 7888, 8224}; + + private static final String NAME = "NAME"; + private static final String FIELD_NAME = "FIELD_NAME"; + private static final String TIME_COL = "__time"; + + + private VectorValueSelector selector; + + private BaseLongVectorValueSelector timeSelector; + private ByteBuffer buf; + + private FloatFirstVectorAggregator target; + + private FloatFirstAggregatorFactory floatFirstAggregatorFactory; + + private VectorColumnSelectorFactory selectorFactory; + + @Before + public void setup() + { + byte[] randomBytes = new byte[1024]; + ThreadLocalRandom.current().nextBytes(randomBytes); + buf = ByteBuffer.wrap(randomBytes); + timeSelector = new BaseLongVectorValueSelector(new NoFilterVectorOffset(times.length, 0, times.length) + { + }) + { + @Override + public long[] getLongVector() + { + return times; + } + + @Nullable + @Override + public boolean[] getNullVector() + { + return NULLS; + } + }; + selector = new BaseFloatVectorValueSelector(new NoFilterVectorOffset(VALUES.length, 0, VALUES.length) + { + + }) + { + + @Override + public float[] getFloatVector() + { + return VALUES; + } + + @Nullable + @Override + public boolean[] getNullVector() + { + if (!NullHandling.replaceWithDefault()) { + return NULLS; + } + return null; + } + }; + + target = new FloatFirstVectorAggregator(timeSelector, selector); + clearBufferForPositions(0, 0); + + selectorFactory = new VectorColumnSelectorFactory() + { + @Override + public ReadableVectorInspector getReadableVectorInspector() + { + return null; + } + + @Override + public SingleValueDimensionVectorSelector makeSingleValueDimensionSelector(DimensionSpec dimensionSpec) + { + return null; + } + + @Override + public MultiValueDimensionVectorSelector makeMultiValueDimensionSelector(DimensionSpec dimensionSpec) + { + return null; + } + + @Override + public VectorValueSelector makeValueSelector(String column) + { + if (TIME_COL.equals(column)) { + return timeSelector; + } else if (FIELD_NAME.equals(column)) { + return selector; + } else { + return null; + } + } + + @Override + public VectorObjectSelector makeObjectSelector(String column) + { + return null; + } + + @Nullable + @Override + public ColumnCapabilities getColumnCapabilities(String column) + { + if (FIELD_NAME.equals(column)) { + return ColumnCapabilitiesImpl.createSimpleNumericColumnCapabilities(ColumnType.FLOAT); + } + return null; + } + }; + floatFirstAggregatorFactory = new FloatFirstAggregatorFactory(NAME, FIELD_NAME, TIME_COL); + + } + + @Test + public void testFactory() + { + Assert.assertTrue(floatFirstAggregatorFactory.canVectorize(selectorFactory)); + VectorAggregator vectorAggregator = floatFirstAggregatorFactory.factorizeVector(selectorFactory); + Assert.assertNotNull(vectorAggregator); + Assert.assertEquals(FloatFirstVectorAggregator.class, vectorAggregator.getClass()); + } + + @Test + public void initValueShouldBeZero() + { + target.initValue(buf, 0); + float initVal = buf.getFloat(0); + Assert.assertEquals(0.0f, initVal, EPSILON); + } + + @Test + public void aggregate() + { + target.init(buf, 0); + target.aggregate(buf, 0, 0, VALUES.length); + Pair result = (Pair) target.get(buf, 0); + Assert.assertEquals(times[0], result.lhs.longValue()); + Assert.assertEquals(VALUES[0], result.rhs, EPSILON); + } + + @Test + public void aggregateWithNulls() + { + target.aggregate(buf, 0, 0, VALUES.length); + Pair result = (Pair) target.get(buf, 0); + Assert.assertEquals(times[0], result.lhs.longValue()); + Assert.assertEquals(VALUES[0], result.rhs, EPSILON); + } + + @Test + public void aggregateBatchWithoutRows() + { + int[] positions = new int[]{0, 43, 70}; + int positionOffset = 2; + clearBufferForPositions(positionOffset, positions); + target.aggregate(buf, 3, positions, null, positionOffset); + for (int i = 0; i < positions.length; i++) { + Pair result = (Pair) target.get(buf, positions[i] + positionOffset); + Assert.assertEquals(times[i], result.lhs.longValue()); + if (!NullHandling.replaceWithDefault() && NULLS[i]) { + Assert.assertNull(result.rhs); + } else { + Assert.assertEquals(VALUES[i], result.rhs, EPSILON); + } + } + } + + @Test + public void aggregateBatchWithRows() + { + int[] positions = new int[]{0, 43, 70}; + int[] rows = new int[]{3, 2, 0}; + int positionOffset = 2; + clearBufferForPositions(positionOffset, positions); + target.aggregate(buf, 3, positions, rows, positionOffset); + for (int i = 0; i < positions.length; i++) { + Pair result = (Pair) target.get(buf, positions[i] + positionOffset); + Assert.assertEquals(times[rows[i]], result.lhs.longValue()); + if (!NullHandling.replaceWithDefault() && NULLS[rows[i]]) { + Assert.assertNull(result.rhs); + } else { + Assert.assertEquals(VALUES[rows[i]], result.rhs, EPSILON); + } + } + } + + private void clearBufferForPositions(int offset, int... positions) + { + for (int position : positions) { + target.init(buf, offset + position); + } + } +} diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/first/LongFirstVectorAggregationTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/first/LongFirstVectorAggregationTest.java new file mode 100644 index 00000000000..ec401760062 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/aggregation/first/LongFirstVectorAggregationTest.java @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.first; + +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.query.aggregation.VectorAggregator; +import org.apache.druid.query.dimension.DimensionSpec; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.column.ColumnCapabilitiesImpl; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.vector.BaseLongVectorValueSelector; +import org.apache.druid.segment.vector.MultiValueDimensionVectorSelector; +import org.apache.druid.segment.vector.NoFilterVectorOffset; +import org.apache.druid.segment.vector.ReadableVectorInspector; +import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector; +import org.apache.druid.segment.vector.VectorColumnSelectorFactory; +import org.apache.druid.segment.vector.VectorObjectSelector; +import org.apache.druid.segment.vector.VectorValueSelector; +import org.apache.druid.testing.InitializedNullHandlingTest; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; +import java.util.concurrent.ThreadLocalRandom; + + +public class LongFirstVectorAggregationTest extends InitializedNullHandlingTest +{ + private static final double EPSILON = 1e-5; + private static final long[] VALUES = new long[]{7, 15, 2, 150}; + private static final boolean[] NULLS = new boolean[]{false, false, true, false}; + private static final String NAME = "NAME"; + private static final String FIELD_NAME = "FIELD_NAME"; + private static final String TIME_COL = "__time"; + private long[] times = {2436, 6879, 7888, 8224}; + private VectorValueSelector selector; + private BaseLongVectorValueSelector timeSelector; + private ByteBuffer buf; + private LongFirstVectorAggregator target; + + private LongFirstAggregatorFactory longFirstAggregatorFactory; + private VectorColumnSelectorFactory selectorFactory; + + @Before + public void setup() + { + byte[] randomBytes = new byte[1024]; + ThreadLocalRandom.current().nextBytes(randomBytes); + buf = ByteBuffer.wrap(randomBytes); + timeSelector = new BaseLongVectorValueSelector(new NoFilterVectorOffset(times.length, 0, times.length) + { + }) + { + @Override + public long[] getLongVector() + { + return times; + } + + @Nullable + @Override + public boolean[] getNullVector() + { + return NULLS; + } + }; + selector = new BaseLongVectorValueSelector(new NoFilterVectorOffset(VALUES.length, 0, VALUES.length) + { + + }) + { + @Override + public long[] getLongVector() + { + return VALUES; + } + + @Nullable + @Override + public boolean[] getNullVector() + { + if (!NullHandling.replaceWithDefault()) { + return NULLS; + } + return null; + } + }; + + target = new LongFirstVectorAggregator(timeSelector, selector); + clearBufferForPositions(0, 0); + + selectorFactory = new VectorColumnSelectorFactory() + { + @Override + public ReadableVectorInspector getReadableVectorInspector() + { + return null; + } + + @Override + public SingleValueDimensionVectorSelector makeSingleValueDimensionSelector(DimensionSpec dimensionSpec) + { + return null; + } + + @Override + public MultiValueDimensionVectorSelector makeMultiValueDimensionSelector(DimensionSpec dimensionSpec) + { + return null; + } + + @Override + public VectorValueSelector makeValueSelector(String column) + { + if (TIME_COL.equals(column)) { + return timeSelector; + } else if (FIELD_NAME.equals(column)) { + return selector; + } else { + return null; + } + } + + @Override + public VectorObjectSelector makeObjectSelector(String column) + { + return null; + } + + @Nullable + @Override + public ColumnCapabilities getColumnCapabilities(String column) + { + if (FIELD_NAME.equals(column)) { + return ColumnCapabilitiesImpl.createSimpleNumericColumnCapabilities(ColumnType.LONG); + } + return null; + } + }; + longFirstAggregatorFactory = new LongFirstAggregatorFactory(NAME, FIELD_NAME, TIME_COL); + } + + @Test + public void testFactory() + { + Assert.assertTrue(longFirstAggregatorFactory.canVectorize(selectorFactory)); + VectorAggregator vectorAggregator = longFirstAggregatorFactory.factorizeVector(selectorFactory); + Assert.assertNotNull(vectorAggregator); + Assert.assertEquals(LongFirstVectorAggregator.class, vectorAggregator.getClass()); + } + + @Test + public void initValueShouldInitZero() + { + target.initValue(buf, 0); + long initVal = buf.getLong(0); + Assert.assertEquals(0, initVal); + } + + @Test + public void aggregate() + { + target.aggregate(buf, 0, 0, VALUES.length); + Pair result = (Pair) target.get(buf, 0); + Assert.assertEquals(times[0], result.lhs.longValue()); + Assert.assertEquals(VALUES[0], result.rhs, EPSILON); + } + + @Test + public void aggregateWithNulls() + { + target.aggregate(buf, 0, 0, VALUES.length); + Pair result = (Pair) target.get(buf, 0); + Assert.assertEquals(times[0], result.lhs.longValue()); + Assert.assertEquals(VALUES[0], result.rhs, EPSILON); + } + + @Test + public void aggregateBatchWithoutRows() + { + int[] positions = new int[]{0, 43, 70}; + int positionOffset = 2; + clearBufferForPositions(positionOffset, positions); + target.aggregate(buf, 3, positions, null, positionOffset); + for (int i = 0; i < positions.length; i++) { + Pair result = (Pair) target.get(buf, positions[i] + positionOffset); + Assert.assertEquals(times[i], result.lhs.longValue()); + if (!NullHandling.replaceWithDefault() && NULLS[i]) { + Assert.assertNull(result.rhs); + } else { + Assert.assertEquals(VALUES[i], result.rhs, EPSILON); + } + } + } + + @Test + public void aggregateBatchWithRows() + { + int[] positions = new int[]{0, 43, 70}; + int[] rows = new int[]{3, 2, 0}; + int positionOffset = 2; + clearBufferForPositions(positionOffset, positions); + target.aggregate(buf, 3, positions, rows, positionOffset); + for (int i = 0; i < positions.length; i++) { + Pair result = (Pair) target.get(buf, positions[i] + positionOffset); + Assert.assertEquals(times[rows[i]], result.lhs.longValue()); + if (!NullHandling.replaceWithDefault() && NULLS[rows[i]]) { + Assert.assertNull(result.rhs); + } else { + Assert.assertEquals(VALUES[rows[i]], result.rhs, EPSILON); + } + } + } + + private void clearBufferForPositions(int offset, int... positions) + { + for (int position : positions) { + target.init(buf, offset + position); + } + } +} diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstVectorAggregatorTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstVectorAggregatorTest.java new file mode 100644 index 00000000000..148f4f95937 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstVectorAggregatorTest.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.first; + +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.query.aggregation.SerializablePairLongString; +import org.apache.druid.query.aggregation.VectorAggregator; +import org.apache.druid.segment.vector.BaseLongVectorValueSelector; +import org.apache.druid.segment.vector.VectorColumnSelectorFactory; +import org.apache.druid.segment.vector.VectorObjectSelector; +import org.apache.druid.testing.InitializedNullHandlingTest; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Answers; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +import java.nio.ByteBuffer; +import java.util.concurrent.ThreadLocalRandom; + + +@RunWith(MockitoJUnitRunner.class) +public class StringFirstVectorAggregatorTest extends InitializedNullHandlingTest +{ + private static final double EPSILON = 1e-5; + private static final String[] VALUES = new String[]{"a", "b", null, "c"}; + private static final boolean[] NULLS = new boolean[]{false, false, true, false}; + private static final String NAME = "NAME"; + private static final String FIELD_NAME = "FIELD_NAME"; + private static final String TIME_COL = "__time"; + private long[] times = {2436, 6879, 7888, 8224}; + private long[] timesSame = {2436, 2436}; + private SerializablePairLongString[] pairs = { + new SerializablePairLongString(2345001L, "first"), + new SerializablePairLongString(2345100L, "notFirst") + }; + + @Mock + private VectorObjectSelector selector; + @Mock + private VectorObjectSelector selectorForPairs; + @Mock + private BaseLongVectorValueSelector timeSelector; + @Mock + private BaseLongVectorValueSelector timeSelectorForPairs; + private ByteBuffer buf; + private StringFirstVectorAggregator target; + private StringFirstVectorAggregator targetWithPairs; + + private StringFirstAggregatorFactory stringFirstAggregatorFactory; + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private VectorColumnSelectorFactory selectorFactory; + + @Before + public void setup() + { + byte[] randomBytes = new byte[1024]; + ThreadLocalRandom.current().nextBytes(randomBytes); + buf = ByteBuffer.wrap(randomBytes); + Mockito.doReturn(VALUES).when(selector).getObjectVector(); + Mockito.doReturn(times).when(timeSelector).getLongVector(); + Mockito.doReturn(timesSame).when(timeSelectorForPairs).getLongVector(); + Mockito.doReturn(pairs).when(selectorForPairs).getObjectVector(); + target = new StringFirstVectorAggregator(timeSelector, selector, 10); + targetWithPairs = new StringFirstVectorAggregator(timeSelectorForPairs, selectorForPairs, 10); + clearBufferForPositions(0, 0); + + + Mockito.doReturn(selector).when(selectorFactory).makeObjectSelector(FIELD_NAME); + Mockito.doReturn(timeSelector).when(selectorFactory).makeValueSelector(TIME_COL); + stringFirstAggregatorFactory = new StringFirstAggregatorFactory(NAME, FIELD_NAME, TIME_COL, 10); + + } + + @Test + public void testAggregateWithPairs() + { + targetWithPairs.aggregate(buf, 0, 0, pairs.length); + Pair result = (Pair) targetWithPairs.get(buf, 0); + //Should come 0 as the last value as the left of the pair is greater + Assert.assertEquals(pairs[0].lhs.longValue(), result.lhs.longValue()); + Assert.assertEquals(pairs[0].rhs, result.rhs); + } + + @Test + public void testFactory() + { + Assert.assertTrue(stringFirstAggregatorFactory.canVectorize(selectorFactory)); + VectorAggregator vectorAggregator = stringFirstAggregatorFactory.factorizeVector(selectorFactory); + Assert.assertNotNull(vectorAggregator); + Assert.assertEquals(StringFirstVectorAggregator.class, vectorAggregator.getClass()); + } + + @Test + public void initValueShouldBeMaxDate() + { + target.init(buf, 0); + long initVal = buf.getLong(0); + Assert.assertEquals(DateTimes.MAX.getMillis(), initVal); + } + + @Test + public void aggregate() + { + target.aggregate(buf, 0, 0, VALUES.length); + Pair result = (Pair) target.get(buf, 0); + Assert.assertEquals(times[0], result.lhs.longValue()); + Assert.assertEquals(VALUES[0], result.rhs); + } + + @Test + public void aggregateBatchWithoutRows() + { + int[] positions = new int[]{0, 43, 70}; + int positionOffset = 2; + clearBufferForPositions(positionOffset, positions); + target.aggregate(buf, 3, positions, null, positionOffset); + for (int i = 0; i < positions.length; i++) { + Pair result = (Pair) target.get(buf, positions[i] + positionOffset); + Assert.assertEquals(times[i], result.lhs.longValue()); + Assert.assertEquals(VALUES[i], result.rhs); + } + } + + @Test + public void aggregateBatchWithRows() + { + int[] positions = new int[]{0, 43, 70}; + int[] rows = new int[]{3, 2, 0}; + int positionOffset = 2; + clearBufferForPositions(positionOffset, positions); + target.aggregate(buf, 3, positions, rows, positionOffset); + for (int i = 0; i < positions.length; i++) { + Pair result = (Pair) target.get(buf, positions[i] + positionOffset); + Assert.assertEquals(times[rows[i]], result.lhs.longValue()); + Assert.assertEquals(VALUES[rows[i]], result.rhs); + } + } + + private void clearBufferForPositions(int offset, int... positions) + { + for (int position : positions) { + target.init(buf, offset + position); + } + } +} diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java index 269067ff1f9..033f64bcb9c 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java @@ -3280,9 +3280,6 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest @Test public void testGroupByWithFirstLast() { - // Cannot vectorize due to "first", "last" aggregators. - cannotVectorize(); - GroupByQuery query = makeQueryBuilder() .setDataSource(QueryRunnerTestHelper.DATA_SOURCE) .setQuerySegmentSpec(QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC) @@ -3370,9 +3367,6 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest @Test public void testGroupByWithNoResult() { - // Cannot vectorize due to first, last aggregators. - cannotVectorize(); - GroupByQuery query = makeQueryBuilder() .setDataSource(QueryRunnerTestHelper.DATA_SOURCE) .setQuerySegmentSpec(QueryRunnerTestHelper.EMPTY_INTERVAL) @@ -7198,9 +7192,6 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest @Test public void testSubqueryWithFirstLast() { - // Cannot vectorize due to "first", "last" aggregators. - cannotVectorize(); - GroupByQuery subquery = makeQueryBuilder() .setDataSource(QueryRunnerTestHelper.DATA_SOURCE) .setQuerySegmentSpec(QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC) diff --git a/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryRunnerTest.java index 8fee0e213fc..a89103ac44a 100644 --- a/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryRunnerTest.java @@ -169,9 +169,6 @@ public class TimeseriesQueryRunnerTest extends InitializedNullHandlingTest @Test public void testEmptyTimeseries() { - // Cannot vectorize due to "doubleFirst" aggregator. - cannotVectorize(); - TimeseriesQuery query = Druids.newTimeseriesQueryBuilder() .dataSource(QueryRunnerTestHelper.DATA_SOURCE) .granularity(QueryRunnerTestHelper.ALL_GRAN) @@ -1960,9 +1957,6 @@ public class TimeseriesQueryRunnerTest extends InitializedNullHandlingTest @Test public void testTimeseriesWithFirstLastAggregator() { - // Cannot vectorize due to "doubleFirst", "doubleLast" aggregators. - cannotVectorize(); - TimeseriesQuery query = Druids.newTimeseriesQueryBuilder() .dataSource(QueryRunnerTestHelper.DATA_SOURCE) .granularity(QueryRunnerTestHelper.MONTH_GRAN) diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java index e1aab8de41c..58240800371 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java @@ -630,8 +630,6 @@ public class CalciteQueryTest extends BaseCalciteQueryTest public void testEarliestAggregators() { notMsqCompatible(); - // Cannot vectorize EARLIEST aggregator. - skipVectorize(); testQuery( "SELECT " @@ -1096,8 +1094,6 @@ public class CalciteQueryTest extends BaseCalciteQueryTest public void testPrimitiveEarliestInSubquery() { notMsqCompatible(); - // Cannot vectorize EARLIEST aggregator. - skipVectorize(); testQuery( "SELECT SUM(val1), SUM(val2), SUM(val3) FROM (SELECT dim2, EARLIEST(m1) AS val1, EARLIEST(cnt) AS val2, EARLIEST(m2) AS val3 FROM foo GROUP BY dim2)", @@ -1195,11 +1191,8 @@ public class CalciteQueryTest extends BaseCalciteQueryTest @Test public void testStringEarliestInSubquery() { - // Cannot vectorize EARLIEST aggregator. - skipVectorize(); - testQuery( - "SELECT SUM(val) FROM (SELECT dim2, EARLIEST(dim1, 10) AS val FROM foo GROUP BY dim2)", + "SELECT SUM(val) FROM (SELECT dim2, EARLIEST(dim1,10) AS val FROM foo GROUP BY dim2)", ImmutableList.of( GroupByQuery.builder() .setDataSource( @@ -1305,6 +1298,75 @@ public class CalciteQueryTest extends BaseCalciteQueryTest ); } + @Test + public void testStringEarliestSingleStringDim() + { + notMsqCompatible(); + testQuery( + "SELECT dim2, EARLIEST(dim1,10) AS val FROM foo GROUP BY dim2", + ImmutableList.of( + GroupByQuery.builder() + .setDataSource(CalciteTests.DATASOURCE1) + .setDimensions(dimensions(new DefaultDimensionSpec("dim2", "d0"))) + .setAggregatorSpecs(aggregators(new StringFirstAggregatorFactory( + "a0", + "dim1", + null, + 10 + ))) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setContext(QUERY_CONTEXT_DEFAULT) + .build() + ), + NullHandling.sqlCompatible() ? + ImmutableList.of( + new Object[]{null, "10.1"}, + new Object[]{"", "2"}, + new Object[]{"a", ""}, + new Object[]{"abc", "def"} + ) : ImmutableList.of( + new Object[]{"", "10.1"}, + new Object[]{"a", ""}, + new Object[]{"abc", "def"} + ) + ); + } + + @Test + public void testStringEarliestMultiStringDim() + { + testQuery( + "SELECT dim2, EARLIEST(dim3,10) AS val FROM foo GROUP BY dim2", + ImmutableList.of( + GroupByQuery.builder() + .setDataSource(CalciteTests.DATASOURCE1) + .setDimensions(dimensions(new DefaultDimensionSpec("dim2", "d0"))) + .setAggregatorSpecs(aggregators(new StringFirstAggregatorFactory( + "a0", + "dim3", + null, + 10 + ))) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setContext(QUERY_CONTEXT_DEFAULT) + .build() + ), + NullHandling.sqlCompatible() ? + ImmutableList.of( + new Object[]{null, "[b, c]"}, + new Object[]{"", "d"}, + new Object[]{"a", "[a, b]"}, + new Object[]{"abc", null} + ) : ImmutableList.of( + new Object[]{"", "[b, c]"}, + new Object[]{"a", "[a, b]"}, + new Object[]{"abc", ""} + ) + ); + } + // This test the off-heap (buffer) version of the AnyAggregator (String) @Test public void testStringAnyInSubquery() @@ -1356,8 +1418,6 @@ public class CalciteQueryTest extends BaseCalciteQueryTest public void testEarliestAggregatorsNumericNulls() { notMsqCompatible(); - // Cannot vectorize EARLIEST aggregator. - skipVectorize(); testQuery( "SELECT EARLIEST(l1), EARLIEST(d1), EARLIEST(f1) FROM druid.numfoo", @@ -1417,8 +1477,6 @@ public class CalciteQueryTest extends BaseCalciteQueryTest public void testFirstLatestAggregatorsSkipNulls() { notMsqCompatible(); - // Cannot vectorize EARLIEST aggregator. - skipVectorize(); final DimFilter filter; if (useDefault) { @@ -1533,8 +1591,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest public void testOrderByEarliestFloat() { notMsqCompatible(); - // Cannot vectorize EARLIEST aggregator. - skipVectorize(); + List expected; if (NullHandling.replaceWithDefault()) { expected = ImmutableList.of( @@ -1581,8 +1638,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest public void testOrderByEarliestDouble() { notMsqCompatible(); - // Cannot vectorize EARLIEST aggregator. - skipVectorize(); + List expected; if (NullHandling.replaceWithDefault()) { expected = ImmutableList.of( @@ -1629,8 +1685,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest public void testOrderByEarliestLong() { notMsqCompatible(); - // Cannot vectorize EARLIEST aggregator. - skipVectorize(); + List expected; if (NullHandling.replaceWithDefault()) { expected = ImmutableList.of(