Vectorize earliest aggregator for both numeric and string types (#14408)

* Vectorizing earliest for numeric

* Vectorizing earliest string aggregator

* checkstyle fix

* Removing unnecessary exceptions

* Ignoring tests in MSQ as earliest is not supported for numeric there

* Fixing benchmarks

* Updating tests as MSQ does not support earliest for some cases

* Addressing review comments by adding the following:
1. Checking capabilities first before creating selectors
2. Removing mockito in tests for numeric first aggs
3. Removing unnecessary tests

* Addressing issues for dictionary encoded single string columns where we can use the dictionary ids instead of the entire string

* Adding a flag for multi value dimension selector

* Addressing comments

* 1 more change

* Handling review comments part 1

* Handling review comments and correctness fix for latest_by when the time expression need not be in sorted order

* Updating numeric first vector agg

* Revert "Updating numeric first vector agg"

This reverts commit 429170990192883e51812311c49d2e461e6db732.

* Updating code for correctness issues

* fixing an issue with latest agg

* Adding more comments and removing an unnecessary check

* Addressing null checks for tie selector and only vectorize false for quantile sketches
This commit is contained in:
Soumyava 2023-09-05 08:41:42 -07:00 committed by GitHub
parent 9d6ca61ac1
commit 8088a763a6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 1813 additions and 44 deletions

View File

@ -197,7 +197,7 @@ public class SqlExpressionBenchmark
"SELECT TIME_SHIFT(MILLIS_TO_TIMESTAMP(long4), 'PT1H', 1), string2, SUM(long1 * double4) FROM foo GROUP BY 1,2 ORDER BY 3", "SELECT TIME_SHIFT(MILLIS_TO_TIMESTAMP(long4), 'PT1H', 1), string2, SUM(long1 * double4) FROM foo GROUP BY 1,2 ORDER BY 3",
// 37: time shift + expr agg (group by), uniform distribution high cardinality // 37: time shift + expr agg (group by), uniform distribution high cardinality
"SELECT TIME_SHIFT(MILLIS_TO_TIMESTAMP(long5), 'PT1H', 1), string2, SUM(long1 * double4) FROM foo GROUP BY 1,2 ORDER BY 3", "SELECT TIME_SHIFT(MILLIS_TO_TIMESTAMP(long5), 'PT1H', 1), string2, SUM(long1 * double4) FROM foo GROUP BY 1,2 ORDER BY 3",
// 38: LATEST aggregator // 38: LATEST aggregator long
"SELECT LATEST(long1) FROM foo", "SELECT LATEST(long1) FROM foo",
// 39: LATEST aggregator double // 39: LATEST aggregator double
"SELECT LATEST(double4) FROM foo", "SELECT LATEST(double4) FROM foo",
@ -207,7 +207,13 @@ public class SqlExpressionBenchmark
"SELECT LATEST(float3), LATEST(long1), LATEST(double4) FROM foo", "SELECT LATEST(float3), LATEST(long1), LATEST(double4) FROM foo",
// 42,43: filter numeric nulls // 42,43: filter numeric nulls
"SELECT SUM(long5) FROM foo WHERE long5 IS NOT NULL", "SELECT SUM(long5) FROM foo WHERE long5 IS NOT NULL",
"SELECT string2, SUM(long5) FROM foo WHERE long5 IS NOT NULL GROUP BY 1" "SELECT string2, SUM(long5) FROM foo WHERE long5 IS NOT NULL GROUP BY 1",
// 44: EARLIEST aggregator long
"SELECT EARLIEST(long1) FROM foo",
// 45: EARLIEST aggregator double
"SELECT EARLIEST(double4) FROM foo",
// 46: EARLIEST aggregator float
"SELECT EARLIEST(float3) FROM foo"
); );
@Param({"5000000"}) @Param({"5000000"})
@ -265,7 +271,11 @@ public class SqlExpressionBenchmark
"40", "40",
"41", "41",
"42", "42",
"43" "43",
"44",
"45",
"46",
"47"
}) })
private String query; private String query;

View File

@ -119,7 +119,8 @@
} }
], ],
"context": { "context": {
"useCache": "true", "useCache": "true",
"vectorize": "false",
"populateCache": "true", "populateCache": "true",
"timeout": 360000 "timeout": 360000
} }
@ -270,7 +271,8 @@
} }
], ],
"context": { "context": {
"useCache": "true", "useCache": "true",
"vectorize": "false",
"populateCache": "true", "populateCache": "true",
"timeout": 360000 "timeout": 360000
} }
@ -514,7 +516,8 @@
"metric": "unique_users", "metric": "unique_users",
"threshold": 3, "threshold": 3,
"context": { "context": {
"useCache": "true", "useCache": "true",
"vectorize": "false",
"populateCache": "true", "populateCache": "true",
"timeout": 360000 "timeout": 360000
} }
@ -693,7 +696,8 @@
"metric": "count", "metric": "count",
"threshold": 3, "threshold": 3,
"context": { "context": {
"useCache": "true", "useCache": "true",
"vectorize": "false",
"populateCache": "true", "populateCache": "true",
"timeout": 360000 "timeout": 360000
} }
@ -878,7 +882,8 @@
"metric": "count", "metric": "count",
"threshold": 3, "threshold": 3,
"context": { "context": {
"useCache": "true", "useCache": "true",
"vectorize": "false",
"populateCache": "true", "populateCache": "true",
"timeout": 360000 "timeout": 360000
} }
@ -1243,7 +1248,8 @@
"orderBy": ["robot", "namespace"] "orderBy": ["robot", "namespace"]
}, },
"context": { "context": {
"useCache": "true", "useCache": "true",
"vectorize": "false",
"populateCache": "true", "populateCache": "true",
"timeout": 360000 "timeout": 360000
} }

View File

@ -29,14 +29,21 @@ import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.AggregatorUtil; import org.apache.druid.query.aggregation.AggregatorUtil;
import org.apache.druid.query.aggregation.BufferAggregator; import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.aggregation.VectorAggregator;
import org.apache.druid.query.aggregation.any.NumericNilVectorAggregator;
import org.apache.druid.query.cache.CacheKeyBuilder; import org.apache.druid.query.cache.CacheKeyBuilder;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.BaseDoubleColumnValueSelector; import org.apache.druid.segment.BaseDoubleColumnValueSelector;
import org.apache.druid.segment.ColumnInspector;
import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.NilColumnValueSelector; import org.apache.druid.segment.NilColumnValueSelector;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.Types;
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
import org.apache.druid.segment.vector.VectorValueSelector;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
@ -96,6 +103,12 @@ public class DoubleFirstAggregatorFactory extends AggregatorFactory
this.storeDoubleAsFloat = ColumnHolder.storeDoubleAsFloat(); this.storeDoubleAsFloat = ColumnHolder.storeDoubleAsFloat();
} }
@Override
public boolean canVectorize(ColumnInspector columnInspector)
{
return true;
}
@Override @Override
public Aggregator factorize(ColumnSelectorFactory metricFactory) public Aggregator factorize(ColumnSelectorFactory metricFactory)
{ {
@ -124,6 +137,21 @@ public class DoubleFirstAggregatorFactory extends AggregatorFactory
} }
} }
@Override
public VectorAggregator factorizeVector(
VectorColumnSelectorFactory columnSelectorFactory
)
{
ColumnCapabilities capabilities = columnSelectorFactory.getColumnCapabilities(fieldName);
if (Types.isNumeric(capabilities)) {
VectorValueSelector valueSelector = columnSelectorFactory.makeValueSelector(fieldName);
VectorValueSelector timeSelector = columnSelectorFactory.makeValueSelector(
timeColumn);
return new DoubleFirstVectorAggregator(timeSelector, valueSelector);
}
return NumericNilVectorAggregator.doubleNilVectorAggregator();
}
@Override @Override
public Comparator getComparator() public Comparator getComparator()
{ {

View File

@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.first;
import org.apache.druid.collections.SerializablePair;
import org.apache.druid.segment.vector.VectorValueSelector;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
public class DoubleFirstVectorAggregator extends NumericFirstVectorAggregator
{
public DoubleFirstVectorAggregator(VectorValueSelector timeSelector, VectorValueSelector valueSelector)
{
super(timeSelector, valueSelector);
}
@Override
public void initValue(ByteBuffer buf, int position)
{
buf.putDouble(position, 0);
}
@Override
void putValue(ByteBuffer buf, int position, int index)
{
double firstValue = valueSelector.getDoubleVector()[index];
buf.putDouble(position, firstValue);
}
/**
* @return The object as a pair with the position and the value stored at the position in the buffer.
*/
@Nullable
@Override
public Object get(ByteBuffer buf, int position)
{
final boolean rhsNull = isValueNull(buf, position);
return new SerializablePair<>(buf.getLong(position), rhsNull ? null : buf.getDouble(position + VALUE_OFFSET));
}
}

View File

@ -29,14 +29,21 @@ import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.AggregatorUtil; import org.apache.druid.query.aggregation.AggregatorUtil;
import org.apache.druid.query.aggregation.BufferAggregator; import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.aggregation.VectorAggregator;
import org.apache.druid.query.aggregation.any.NumericNilVectorAggregator;
import org.apache.druid.query.cache.CacheKeyBuilder; import org.apache.druid.query.cache.CacheKeyBuilder;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.BaseFloatColumnValueSelector; import org.apache.druid.segment.BaseFloatColumnValueSelector;
import org.apache.druid.segment.ColumnInspector;
import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.NilColumnValueSelector; import org.apache.druid.segment.NilColumnValueSelector;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.Types;
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
import org.apache.druid.segment.vector.VectorValueSelector;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
@ -122,6 +129,24 @@ public class FloatFirstAggregatorFactory extends AggregatorFactory
} }
} }
@Override
public VectorAggregator factorizeVector(VectorColumnSelectorFactory columnSelectorFactory)
{
ColumnCapabilities capabilities = columnSelectorFactory.getColumnCapabilities(fieldName);
if (Types.isNumeric(capabilities)) {
VectorValueSelector valueSelector = columnSelectorFactory.makeValueSelector(fieldName);
VectorValueSelector timeSelector = columnSelectorFactory.makeValueSelector(timeColumn);
return new FloatFirstVectorAggregator(timeSelector, valueSelector);
}
return NumericNilVectorAggregator.floatNilVectorAggregator();
}
@Override
public boolean canVectorize(ColumnInspector columnInspector)
{
return true;
}
@Override @Override
public Comparator getComparator() public Comparator getComparator()
{ {

View File

@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.first;
import org.apache.druid.collections.SerializablePair;
import org.apache.druid.segment.vector.VectorValueSelector;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
public class FloatFirstVectorAggregator extends NumericFirstVectorAggregator
{
public FloatFirstVectorAggregator(VectorValueSelector timeSelector, VectorValueSelector valueSelector)
{
super(timeSelector, valueSelector);
}
@Override
public void initValue(ByteBuffer buf, int position)
{
buf.putFloat(position, 0);
}
@Override
void putValue(ByteBuffer buf, int position, int index)
{
float firstValue = valueSelector.getFloatVector()[index];
buf.putFloat(position, firstValue);
}
/**
* @return The object as a pair with the position and the value stored at the position in the buffer.
*/
@Nullable
@Override
public Object get(ByteBuffer buf, int position)
{
final boolean rhsNull = isValueNull(buf, position);
return new SerializablePair<>(buf.getLong(position), rhsNull ? null : buf.getFloat(position + VALUE_OFFSET));
}
}

View File

@ -29,14 +29,21 @@ import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.AggregatorUtil; import org.apache.druid.query.aggregation.AggregatorUtil;
import org.apache.druid.query.aggregation.BufferAggregator; import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.aggregation.VectorAggregator;
import org.apache.druid.query.aggregation.any.NumericNilVectorAggregator;
import org.apache.druid.query.cache.CacheKeyBuilder; import org.apache.druid.query.cache.CacheKeyBuilder;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.BaseLongColumnValueSelector; import org.apache.druid.segment.BaseLongColumnValueSelector;
import org.apache.druid.segment.ColumnInspector;
import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.NilColumnValueSelector; import org.apache.druid.segment.NilColumnValueSelector;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.Types;
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
import org.apache.druid.segment.vector.VectorValueSelector;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
@ -121,6 +128,25 @@ public class LongFirstAggregatorFactory extends AggregatorFactory
} }
} }
@Override
public VectorAggregator factorizeVector(VectorColumnSelectorFactory columnSelectorFactory)
{
ColumnCapabilities capabilities = columnSelectorFactory.getColumnCapabilities(fieldName);
if (Types.isNumeric(capabilities)) {
VectorValueSelector valueSelector = columnSelectorFactory.makeValueSelector(fieldName);
VectorValueSelector timeSelector = columnSelectorFactory.makeValueSelector(
timeColumn);
return new LongFirstVectorAggregator(timeSelector, valueSelector);
}
return NumericNilVectorAggregator.longNilVectorAggregator();
}
@Override
public boolean canVectorize(ColumnInspector columnInspector)
{
return true;
}
@Override @Override
public Comparator getComparator() public Comparator getComparator()
{ {

View File

@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.first;
import org.apache.druid.collections.SerializablePair;
import org.apache.druid.segment.vector.VectorValueSelector;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
public class LongFirstVectorAggregator extends NumericFirstVectorAggregator
{
public LongFirstVectorAggregator(VectorValueSelector timeSelector, VectorValueSelector valueSelector)
{
super(timeSelector, valueSelector);
}
@Override
public void initValue(ByteBuffer buf, int position)
{
buf.putLong(position, 0);
}
@Override
void putValue(ByteBuffer buf, int position, int index)
{
long firstValue = valueSelector.getLongVector()[index];
buf.putLong(position, firstValue);
}
/**
* @return The object as a pair with the position and the value stored at the position in the buffer.
*/
@Nullable
@Override
public Object get(ByteBuffer buf, int position)
{
final boolean rhsNull = isValueNull(buf, position);
return new SerializablePair<>(buf.getLong(position), rhsNull ? null : buf.getLong(position + VALUE_OFFSET));
}
}

View File

@ -0,0 +1,174 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.first;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.query.aggregation.VectorAggregator;
import org.apache.druid.segment.vector.VectorValueSelector;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
/**
* Class for vectorized version of first/earliest aggregator over numeric types
*/
public abstract class NumericFirstVectorAggregator implements VectorAggregator
{
static final int NULL_OFFSET = Long.BYTES;
static final int VALUE_OFFSET = NULL_OFFSET + Byte.BYTES;
final VectorValueSelector valueSelector;
private final boolean useDefault = NullHandling.replaceWithDefault();
private final VectorValueSelector timeSelector;
private long firstTime;
public NumericFirstVectorAggregator(VectorValueSelector timeSelector, VectorValueSelector valueSelector)
{
this.timeSelector = timeSelector;
this.valueSelector = valueSelector;
firstTime = Long.MAX_VALUE;
}
@Override
public void init(ByteBuffer buf, int position)
{
buf.putLong(position, Long.MAX_VALUE);
buf.put(position + NULL_OFFSET, useDefault ? NullHandling.IS_NOT_NULL_BYTE : NullHandling.IS_NULL_BYTE);
initValue(buf, position + VALUE_OFFSET);
}
@Override
public void aggregate(ByteBuffer buf, int position, int startRow, int endRow)
{
final long[] timeVector = timeSelector.getLongVector();
final boolean[] nullTimeVector = timeSelector.getNullVector();
final boolean[] nullValueVector = valueSelector.getNullVector();
firstTime = buf.getLong(position);
// the time vector is already sorted
// if earliest is on the default time dimension
// but if earliest uses earliest_by it might use a secondary timestamp
// which is not sorted. For correctness, we need to go over all elements.
// A possible optimization here is to have 2 paths one for earliest where
// we can take advantage of the sorted nature of time
// and the earliest_by where we have to go over all elements.
int index;
for (int i = startRow; i < endRow; i++) {
index = i;
if (nullTimeVector != null && nullTimeVector[index]) {
continue;
}
final long earliestTime = timeVector[index];
if (earliestTime >= firstTime) {
continue;
}
firstTime = earliestTime;
if (useDefault || nullValueVector == null || !nullValueVector[index]) {
updateTimeWithValue(buf, position, firstTime, index);
} else {
updateTimeWithNull(buf, position, firstTime);
}
}
}
/**
*
* Checks if the aggregated value at a position in the buffer is null or not
*
* @param buf byte buffer storing the byte array representation of the aggregate
* @param position offset within the byte buffer at which the current aggregate value is stored
* @return
*/
boolean isValueNull(ByteBuffer buf, int position)
{
return buf.get(position + NULL_OFFSET) == NullHandling.IS_NULL_BYTE;
}
@Override
public void aggregate(
ByteBuffer buf,
int numRows,
int[] positions,
@Nullable int[] rows,
int positionOffset
)
{
boolean[] nulls = useDefault ? null : valueSelector.getNullVector();
long[] timeVector = timeSelector.getLongVector();
for (int i = 0; i < numRows; i++) {
int position = positions[i] + positionOffset;
int row = rows == null ? i : rows[i];
long firstTime = buf.getLong(position);
if (timeVector[row] < firstTime) {
if (useDefault || nulls == null || !nulls[row]) {
updateTimeWithValue(buf, position, timeVector[row], row);
} else {
updateTimeWithNull(buf, position, timeVector[row]);
}
}
}
}
/**
* Updates the time and the non null values to the appropriate position in buffer
*
* @param buf byte buffer storing the byte array representation of the aggregate
* @param position offset within the byte buffer at which the current aggregate value is stored
* @param time the time to be updated in the buffer as the last time
* @param index the index of the vectorized vector which is the last value
*/
void updateTimeWithValue(ByteBuffer buf, int position, long time, int index)
{
buf.putLong(position, time);
buf.put(position + NULL_OFFSET, NullHandling.IS_NOT_NULL_BYTE);
putValue(buf, position + VALUE_OFFSET, index);
}
/**
* Updates the time only to the appropriate position in buffer as the value is null
*
* @param buf byte buffer storing the byte array representation of the aggregate
* @param position offset within the byte buffer at which the current aggregate value is stored
* @param time the time to be updated in the buffer as the last time
*/
void updateTimeWithNull(ByteBuffer buf, int position, long time)
{
buf.putLong(position, time);
buf.put(position + NULL_OFFSET, NullHandling.IS_NULL_BYTE);
}
/**
*Abstract function which needs to be overridden by subclasses to set the initial value
*/
abstract void initValue(ByteBuffer buf, int position);
/**
* Abstract function which needs to be overridden by subclasses to set the
* latest value in the buffer depending on the datatype
*/
abstract void putValue(ByteBuffer buf, int position, int index);
@Override
public void close()
{
// no resources to cleanup
}
}

View File

@ -0,0 +1,125 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.first;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.aggregation.SerializablePairLongString;
import org.apache.druid.query.aggregation.VectorAggregator;
import org.apache.druid.segment.vector.BaseLongVectorValueSelector;
import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
public class SingleStringFirstDimensionVectorAggregator implements VectorAggregator
{
private final BaseLongVectorValueSelector timeSelector;
private final SingleValueDimensionVectorSelector valueDimensionVectorSelector;
private long firstTime;
private final int maxStringBytes;
private final boolean useDefault = NullHandling.replaceWithDefault();
public SingleStringFirstDimensionVectorAggregator(
BaseLongVectorValueSelector timeSelector,
SingleValueDimensionVectorSelector valueDimensionVectorSelector,
int maxStringBytes
)
{
this.timeSelector = timeSelector;
this.valueDimensionVectorSelector = valueDimensionVectorSelector;
this.maxStringBytes = maxStringBytes;
this.firstTime = Long.MAX_VALUE;
}
@Override
public void init(ByteBuffer buf, int position)
{
buf.putLong(position, Long.MAX_VALUE);
buf.put(
position + NumericFirstVectorAggregator.NULL_OFFSET,
useDefault ? NullHandling.IS_NOT_NULL_BYTE : NullHandling.IS_NULL_BYTE
);
buf.putLong(position + NumericFirstVectorAggregator.VALUE_OFFSET, 0);
}
@Override
public void aggregate(ByteBuffer buf, int position, int startRow, int endRow)
{
final long[] timeVector = timeSelector.getLongVector();
final boolean[] nullTimeVector = timeSelector.getNullVector();
final int[] valueVector = valueDimensionVectorSelector.getRowVector();
firstTime = buf.getLong(position);
int index;
long earliestTime;
for (index = startRow; index < endRow; index++) {
if (nullTimeVector != null && nullTimeVector[index]) {
continue;
}
earliestTime = timeVector[index];
if (earliestTime < firstTime) {
firstTime = earliestTime;
buf.putLong(position, firstTime);
buf.put(position + NumericFirstVectorAggregator.NULL_OFFSET, NullHandling.IS_NOT_NULL_BYTE);
buf.putInt(position + NumericFirstVectorAggregator.VALUE_OFFSET, valueVector[index]);
}
}
}
@Override
public void aggregate(ByteBuffer buf, int numRows, int[] positions, @Nullable int[] rows, int positionOffset)
{
final long[] timeVector = timeSelector.getLongVector();
final boolean[] nullTimeVector = timeSelector.getNullVector();
final int[] values = valueDimensionVectorSelector.getRowVector();
for (int i = 0; i < numRows; i++) {
if (nullTimeVector != null && nullTimeVector[i]) {
continue;
}
int position = positions[i] + positionOffset;
int row = rows == null ? i : rows[i];
long firstTime = buf.getLong(position);
if (timeVector[row] < firstTime) {
firstTime = timeVector[row];
buf.putLong(position, firstTime);
buf.put(position + NumericFirstVectorAggregator.NULL_OFFSET, NullHandling.IS_NOT_NULL_BYTE);
buf.putInt(position + NumericFirstVectorAggregator.VALUE_OFFSET, values[row]);
}
}
}
@Nullable
@Override
public Object get(ByteBuffer buf, int position)
{
int index = buf.getInt(position + NumericFirstVectorAggregator.VALUE_OFFSET);
long earliest = buf.getLong(position);
String strValue = valueDimensionVectorSelector.lookupName(index);
return new SerializablePairLongString(earliest, StringUtils.chop(strValue, maxStringBytes));
}
@Override
public void close()
{
// nothing to close
}
}

View File

@ -32,12 +32,21 @@ import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.AggregatorUtil; import org.apache.druid.query.aggregation.AggregatorUtil;
import org.apache.druid.query.aggregation.BufferAggregator; import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.aggregation.SerializablePairLongString; import org.apache.druid.query.aggregation.SerializablePairLongString;
import org.apache.druid.query.aggregation.VectorAggregator;
import org.apache.druid.query.cache.CacheKeyBuilder; import org.apache.druid.query.cache.CacheKeyBuilder;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.segment.BaseObjectColumnValueSelector; import org.apache.druid.segment.BaseObjectColumnValueSelector;
import org.apache.druid.segment.ColumnInspector;
import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.NilColumnValueSelector; import org.apache.druid.segment.NilColumnValueSelector;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.vector.BaseLongVectorValueSelector;
import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector;
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
import org.apache.druid.segment.vector.VectorObjectSelector;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
@ -66,6 +75,7 @@ public class StringFirstAggregatorFactory extends AggregatorFactory
} }
}; };
private static final BufferAggregator NIL_BUFFER_AGGREGATOR = new StringFirstBufferAggregator( private static final BufferAggregator NIL_BUFFER_AGGREGATOR = new StringFirstBufferAggregator(
NilColumnValueSelector.instance(), NilColumnValueSelector.instance(),
NilColumnValueSelector.instance(), NilColumnValueSelector.instance(),
@ -80,6 +90,25 @@ public class StringFirstAggregatorFactory extends AggregatorFactory
} }
}; };
public static final VectorAggregator NIL_VECTOR_AGGREGATOR = new StringFirstVectorAggregator(
null,
null,
0
)
{
@Override
public void aggregate(ByteBuffer buf, int position, int startRow, int endRow)
{
// no-op
}
@Override
public void aggregate(ByteBuffer buf, int numRows, int[] positions, @Nullable int[] rows, int positionOffset)
{
// no-op
}
};
public static final int DEFAULT_MAX_STRING_SIZE = 1024; public static final int DEFAULT_MAX_STRING_SIZE = 1024;
public static final Comparator TIME_COMPARATOR = (o1, o2) -> Longs.compare( public static final Comparator TIME_COMPARATOR = (o1, o2) -> Longs.compare(
@ -121,6 +150,7 @@ public class StringFirstAggregatorFactory extends AggregatorFactory
: maxStringBytes; : maxStringBytes;
} }
@Override @Override
public Aggregator factorize(ColumnSelectorFactory metricFactory) public Aggregator factorize(ColumnSelectorFactory metricFactory)
{ {
@ -153,6 +183,40 @@ public class StringFirstAggregatorFactory extends AggregatorFactory
} }
} }
@Override
public VectorAggregator factorizeVector(VectorColumnSelectorFactory selectorFactory)
{
BaseLongVectorValueSelector timeSelector = (BaseLongVectorValueSelector) selectorFactory.makeValueSelector(
timeColumn);
ColumnCapabilities capabilities = selectorFactory.getColumnCapabilities(fieldName);
if (capabilities != null) {
if (capabilities.is(ValueType.STRING) && capabilities.isDictionaryEncoded().isTrue()) {
// Case 1: Single value string with dimension selector
// For multivalue string we need to iterate a list of indexedInts which is also similar to iterating
// over elements for an ARRAY typed column. These two which requires an iteration will be done together.
if (!capabilities.hasMultipleValues().isTrue()) {
SingleValueDimensionVectorSelector sSelector = selectorFactory.makeSingleValueDimensionSelector(
DefaultDimensionSpec.of(
fieldName));
return new SingleStringFirstDimensionVectorAggregator(timeSelector, sSelector, maxStringBytes);
}
}
}
// Case 2: return vector object selector
VectorObjectSelector vSelector = selectorFactory.makeObjectSelector(fieldName);
if (capabilities != null) {
return new StringFirstVectorAggregator(timeSelector, vSelector, maxStringBytes);
} else {
return NIL_VECTOR_AGGREGATOR;
}
}
@Override
public boolean canVectorize(ColumnInspector columnInspector)
{
return true;
}
@Override @Override
public Comparator getComparator() public Comparator getComparator()
{ {

View File

@ -0,0 +1,185 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.first;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.query.aggregation.SerializablePairLongString;
import org.apache.druid.query.aggregation.VectorAggregator;
import org.apache.druid.segment.DimensionHandlerUtils;
import org.apache.druid.segment.vector.BaseLongVectorValueSelector;
import org.apache.druid.segment.vector.VectorObjectSelector;
import org.apache.druid.segment.vector.VectorValueSelector;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
public class StringFirstVectorAggregator implements VectorAggregator
{
private static final SerializablePairLongString INIT = new SerializablePairLongString(
DateTimes.MAX.getMillis(),
null
);
private final VectorValueSelector timeSelector;
private final VectorObjectSelector valueSelector;
private final int maxStringBytes;
public StringFirstVectorAggregator(
BaseLongVectorValueSelector timeSelector,
VectorObjectSelector valueSelector,
int maxStringBytes
)
{
this.timeSelector = timeSelector;
this.valueSelector = valueSelector;
this.maxStringBytes = maxStringBytes;
}
@Override
public void init(ByteBuffer buf, int position)
{
StringFirstLastUtils.writePair(buf, position, INIT, maxStringBytes);
}
@Override
public void aggregate(ByteBuffer buf, int position, int startRow, int endRow)
{
if (timeSelector == null) {
return;
}
final long[] times = timeSelector.getLongVector();
final boolean[] nullTimeVector = timeSelector.getNullVector();
final Object[] objectsWhichMightBeStrings = valueSelector.getObjectVector();
long firstTime = buf.getLong(position);
int index;
for (int i = startRow; i < endRow; i++) {
if (times[i] > firstTime) {
continue;
}
if (nullTimeVector != null && nullTimeVector[i]) {
continue;
}
index = i;
final boolean foldNeeded = StringFirstLastUtils.objectNeedsFoldCheck(objectsWhichMightBeStrings[index]);
if (foldNeeded) {
final SerializablePairLongString inPair = StringFirstLastUtils.readPairFromVectorSelectorsAtIndex(
timeSelector,
valueSelector,
index
);
if (inPair != null) {
firstTime = buf.getLong(position);
if (inPair.lhs < firstTime) {
StringFirstLastUtils.writePair(
buf,
position,
new SerializablePairLongString(inPair.lhs, inPair.rhs),
maxStringBytes
);
}
}
} else {
final long time = times[index];
if (time < firstTime) {
final String value = DimensionHandlerUtils.convertObjectToString(objectsWhichMightBeStrings[index]);
firstTime = time;
StringFirstLastUtils.writePair(
buf,
position,
new SerializablePairLongString(time, value),
maxStringBytes
);
}
}
}
}
@Override
public void aggregate(ByteBuffer buf, int numRows, int[] positions, @Nullable int[] rows, int positionOffset)
{
final long[] timeVector = timeSelector.getLongVector();
final boolean[] nullTimeVector = timeSelector.getNullVector();
final Object[] objectsWhichMightBeStrings = valueSelector.getObjectVector();
// iterate once over the object vector to find first non null element and
// determine if the type is Pair or not
boolean foldNeeded = false;
for (Object obj : objectsWhichMightBeStrings) {
if (obj == null) {
continue;
} else {
foldNeeded = StringFirstLastUtils.objectNeedsFoldCheck(obj);
break;
}
}
for (int i = 0; i < numRows; i++) {
if (nullTimeVector != null && nullTimeVector[i]) {
continue;
}
int position = positions[i] + positionOffset;
int row = rows == null ? i : rows[i];
long firstTime = buf.getLong(position);
if (timeVector[row] < firstTime) {
if (foldNeeded) {
final SerializablePairLongString inPair = StringFirstLastUtils.readPairFromVectorSelectorsAtIndex(
timeSelector,
valueSelector,
row
);
if (inPair != null) {
if (inPair.lhs < firstTime) {
StringFirstLastUtils.writePair(
buf,
position,
new SerializablePairLongString(inPair.lhs, inPair.rhs),
maxStringBytes
);
}
}
} else {
final String value = DimensionHandlerUtils.convertObjectToString(objectsWhichMightBeStrings[row]);
firstTime = timeVector[row];
StringFirstLastUtils.writePair(
buf,
position,
new SerializablePairLongString(firstTime, value),
maxStringBytes
);
}
}
}
}
@Nullable
@Override
public Object get(ByteBuffer buf, int position)
{
return StringFirstLastUtils.readPair(buf, position);
}
@Override
public void close()
{
// nothing to close
}
}

View File

@ -73,8 +73,8 @@ public class StringLastVectorAggregator implements VectorAggregator
if (objectsWhichMightBeStrings[i] == null) { if (objectsWhichMightBeStrings[i] == null) {
continue; continue;
} }
if (times[i] < lastTime) { if (times[i] <= lastTime) {
break; continue;
} }
index = i; index = i;
final boolean foldNeeded = StringFirstLastUtils.objectNeedsFoldCheck(objectsWhichMightBeStrings[index]); final boolean foldNeeded = StringFirstLastUtils.objectNeedsFoldCheck(objectsWhichMightBeStrings[index]);

View File

@ -0,0 +1,246 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.first;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.query.aggregation.VectorAggregator;
import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.vector.BaseDoubleVectorValueSelector;
import org.apache.druid.segment.vector.BaseLongVectorValueSelector;
import org.apache.druid.segment.vector.MultiValueDimensionVectorSelector;
import org.apache.druid.segment.vector.NoFilterVectorOffset;
import org.apache.druid.segment.vector.ReadableVectorInspector;
import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector;
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
import org.apache.druid.segment.vector.VectorObjectSelector;
import org.apache.druid.segment.vector.VectorValueSelector;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.concurrent.ThreadLocalRandom;
public class DoubleFirstVectorAggregationTest extends InitializedNullHandlingTest
{
private static final double EPSILON = 1e-5;
private static final double[] VALUES = new double[]{7.8d, 11, 23.67, 60};
private static final boolean[] NULLS = new boolean[]{false, false, true, false};
private long[] times = {2436, 6879, 7888, 8224};
private static final String NAME = "NAME";
private static final String FIELD_NAME = "FIELD_NAME";
private static final String TIME_COL = "__time";
private VectorValueSelector selector;
private BaseLongVectorValueSelector timeSelector;
private ByteBuffer buf;
private DoubleFirstVectorAggregator target;
private DoubleFirstAggregatorFactory doubleFirstAggregatorFactory;
private VectorColumnSelectorFactory selectorFactory;
@Before
public void setup()
{
byte[] randomBytes = new byte[1024];
ThreadLocalRandom.current().nextBytes(randomBytes);
buf = ByteBuffer.wrap(randomBytes);
timeSelector = new BaseLongVectorValueSelector(new NoFilterVectorOffset(times.length, 0, times.length)
{
})
{
@Override
public long[] getLongVector()
{
return times;
}
@Nullable
@Override
public boolean[] getNullVector()
{
return NULLS;
}
};
selector = new BaseDoubleVectorValueSelector(new NoFilterVectorOffset(VALUES.length, 0, VALUES.length)
{
})
{
@Override
public double[] getDoubleVector()
{
return VALUES;
}
@Nullable
@Override
public boolean[] getNullVector()
{
if (!NullHandling.replaceWithDefault()) {
return NULLS;
}
return null;
}
};
target = new DoubleFirstVectorAggregator(timeSelector, selector);
clearBufferForPositions(0, 0);
selectorFactory = new VectorColumnSelectorFactory()
{
@Override
public ReadableVectorInspector getReadableVectorInspector()
{
return null;
}
@Override
public SingleValueDimensionVectorSelector makeSingleValueDimensionSelector(DimensionSpec dimensionSpec)
{
return null;
}
@Override
public MultiValueDimensionVectorSelector makeMultiValueDimensionSelector(DimensionSpec dimensionSpec)
{
return null;
}
@Override
public VectorValueSelector makeValueSelector(String column)
{
if (TIME_COL.equals(column)) {
return timeSelector;
} else if (FIELD_NAME.equals(column)) {
return selector;
} else {
return null;
}
}
@Override
public VectorObjectSelector makeObjectSelector(String column)
{
return null;
}
@Nullable
@Override
public ColumnCapabilities getColumnCapabilities(String column)
{
if (FIELD_NAME.equals(column)) {
return ColumnCapabilitiesImpl.createSimpleNumericColumnCapabilities(ColumnType.DOUBLE);
}
return null;
}
};
doubleFirstAggregatorFactory = new DoubleFirstAggregatorFactory(NAME, FIELD_NAME, TIME_COL);
}
@Test
public void testFactory()
{
Assert.assertTrue(doubleFirstAggregatorFactory.canVectorize(selectorFactory));
VectorAggregator vectorAggregator = doubleFirstAggregatorFactory.factorizeVector(selectorFactory);
Assert.assertNotNull(vectorAggregator);
Assert.assertEquals(DoubleFirstVectorAggregator.class, vectorAggregator.getClass());
}
@Test
public void initValueShouldInitZero()
{
target.initValue(buf, 0);
double initVal = buf.getDouble(0);
Assert.assertEquals(0, initVal, EPSILON);
}
@Test
public void aggregate()
{
target.aggregate(buf, 0, 0, VALUES.length);
Pair<Long, Double> result = (Pair<Long, Double>) target.get(buf, 0);
Assert.assertEquals(times[0], result.lhs.longValue());
Assert.assertEquals(VALUES[0], result.rhs, EPSILON);
}
@Test
public void aggregateWithNulls()
{
target.aggregate(buf, 0, 0, VALUES.length);
Pair<Long, Double> result = (Pair<Long, Double>) target.get(buf, 0);
Assert.assertEquals(times[0], result.lhs.longValue());
Assert.assertEquals(VALUES[0], result.rhs, EPSILON);
}
@Test
public void aggregateBatchWithoutRows()
{
int[] positions = new int[]{0, 43, 70};
int positionOffset = 2;
clearBufferForPositions(positionOffset, positions);
target.aggregate(buf, 3, positions, null, positionOffset);
for (int i = 0; i < positions.length; i++) {
Pair<Long, Double> result = (Pair<Long, Double>) target.get(buf, positions[i] + positionOffset);
Assert.assertEquals(times[i], result.lhs.longValue());
if (!NullHandling.replaceWithDefault() && NULLS[i]) {
Assert.assertNull(result.rhs);
} else {
Assert.assertEquals(VALUES[i], result.rhs, EPSILON);
}
}
}
@Test
public void aggregateBatchWithRows()
{
int[] positions = new int[]{0, 43, 70};
int[] rows = new int[]{3, 2, 0};
int positionOffset = 2;
clearBufferForPositions(positionOffset, positions);
target.aggregate(buf, 3, positions, rows, positionOffset);
for (int i = 0; i < positions.length; i++) {
Pair<Long, Double> result = (Pair<Long, Double>) target.get(buf, positions[i] + positionOffset);
Assert.assertEquals(times[rows[i]], result.lhs.longValue());
if (!NullHandling.replaceWithDefault() && NULLS[rows[i]]) {
Assert.assertNull(result.rhs);
} else {
Assert.assertEquals(VALUES[rows[i]], result.rhs, EPSILON);
}
}
}
private void clearBufferForPositions(int offset, int... positions)
{
for (int position : positions) {
target.init(buf, offset + position);
}
}
}

View File

@ -0,0 +1,250 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.first;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.query.aggregation.VectorAggregator;
import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.vector.BaseFloatVectorValueSelector;
import org.apache.druid.segment.vector.BaseLongVectorValueSelector;
import org.apache.druid.segment.vector.MultiValueDimensionVectorSelector;
import org.apache.druid.segment.vector.NoFilterVectorOffset;
import org.apache.druid.segment.vector.ReadableVectorInspector;
import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector;
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
import org.apache.druid.segment.vector.VectorObjectSelector;
import org.apache.druid.segment.vector.VectorValueSelector;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.concurrent.ThreadLocalRandom;
public class FloatFirstVectorAggregationTest extends InitializedNullHandlingTest
{
private static final double EPSILON = 1e-5;
private static final float[] VALUES = new float[]{7.2f, 15.6f, 2.1f, 150.0f};
private static final boolean[] NULLS = new boolean[]{false, false, true, false};
private long[] times = {2436, 6879, 7888, 8224};
private static final String NAME = "NAME";
private static final String FIELD_NAME = "FIELD_NAME";
private static final String TIME_COL = "__time";
private VectorValueSelector selector;
private BaseLongVectorValueSelector timeSelector;
private ByteBuffer buf;
private FloatFirstVectorAggregator target;
private FloatFirstAggregatorFactory floatFirstAggregatorFactory;
private VectorColumnSelectorFactory selectorFactory;
@Before
public void setup()
{
byte[] randomBytes = new byte[1024];
ThreadLocalRandom.current().nextBytes(randomBytes);
buf = ByteBuffer.wrap(randomBytes);
timeSelector = new BaseLongVectorValueSelector(new NoFilterVectorOffset(times.length, 0, times.length)
{
})
{
@Override
public long[] getLongVector()
{
return times;
}
@Nullable
@Override
public boolean[] getNullVector()
{
return NULLS;
}
};
selector = new BaseFloatVectorValueSelector(new NoFilterVectorOffset(VALUES.length, 0, VALUES.length)
{
})
{
@Override
public float[] getFloatVector()
{
return VALUES;
}
@Nullable
@Override
public boolean[] getNullVector()
{
if (!NullHandling.replaceWithDefault()) {
return NULLS;
}
return null;
}
};
target = new FloatFirstVectorAggregator(timeSelector, selector);
clearBufferForPositions(0, 0);
selectorFactory = new VectorColumnSelectorFactory()
{
@Override
public ReadableVectorInspector getReadableVectorInspector()
{
return null;
}
@Override
public SingleValueDimensionVectorSelector makeSingleValueDimensionSelector(DimensionSpec dimensionSpec)
{
return null;
}
@Override
public MultiValueDimensionVectorSelector makeMultiValueDimensionSelector(DimensionSpec dimensionSpec)
{
return null;
}
@Override
public VectorValueSelector makeValueSelector(String column)
{
if (TIME_COL.equals(column)) {
return timeSelector;
} else if (FIELD_NAME.equals(column)) {
return selector;
} else {
return null;
}
}
@Override
public VectorObjectSelector makeObjectSelector(String column)
{
return null;
}
@Nullable
@Override
public ColumnCapabilities getColumnCapabilities(String column)
{
if (FIELD_NAME.equals(column)) {
return ColumnCapabilitiesImpl.createSimpleNumericColumnCapabilities(ColumnType.FLOAT);
}
return null;
}
};
floatFirstAggregatorFactory = new FloatFirstAggregatorFactory(NAME, FIELD_NAME, TIME_COL);
}
@Test
public void testFactory()
{
Assert.assertTrue(floatFirstAggregatorFactory.canVectorize(selectorFactory));
VectorAggregator vectorAggregator = floatFirstAggregatorFactory.factorizeVector(selectorFactory);
Assert.assertNotNull(vectorAggregator);
Assert.assertEquals(FloatFirstVectorAggregator.class, vectorAggregator.getClass());
}
@Test
public void initValueShouldBeZero()
{
target.initValue(buf, 0);
float initVal = buf.getFloat(0);
Assert.assertEquals(0.0f, initVal, EPSILON);
}
@Test
public void aggregate()
{
target.init(buf, 0);
target.aggregate(buf, 0, 0, VALUES.length);
Pair<Long, Float> result = (Pair<Long, Float>) target.get(buf, 0);
Assert.assertEquals(times[0], result.lhs.longValue());
Assert.assertEquals(VALUES[0], result.rhs, EPSILON);
}
@Test
public void aggregateWithNulls()
{
target.aggregate(buf, 0, 0, VALUES.length);
Pair<Long, Float> result = (Pair<Long, Float>) target.get(buf, 0);
Assert.assertEquals(times[0], result.lhs.longValue());
Assert.assertEquals(VALUES[0], result.rhs, EPSILON);
}
@Test
public void aggregateBatchWithoutRows()
{
int[] positions = new int[]{0, 43, 70};
int positionOffset = 2;
clearBufferForPositions(positionOffset, positions);
target.aggregate(buf, 3, positions, null, positionOffset);
for (int i = 0; i < positions.length; i++) {
Pair<Long, Float> result = (Pair<Long, Float>) target.get(buf, positions[i] + positionOffset);
Assert.assertEquals(times[i], result.lhs.longValue());
if (!NullHandling.replaceWithDefault() && NULLS[i]) {
Assert.assertNull(result.rhs);
} else {
Assert.assertEquals(VALUES[i], result.rhs, EPSILON);
}
}
}
@Test
public void aggregateBatchWithRows()
{
int[] positions = new int[]{0, 43, 70};
int[] rows = new int[]{3, 2, 0};
int positionOffset = 2;
clearBufferForPositions(positionOffset, positions);
target.aggregate(buf, 3, positions, rows, positionOffset);
for (int i = 0; i < positions.length; i++) {
Pair<Long, Float> result = (Pair<Long, Float>) target.get(buf, positions[i] + positionOffset);
Assert.assertEquals(times[rows[i]], result.lhs.longValue());
if (!NullHandling.replaceWithDefault() && NULLS[rows[i]]) {
Assert.assertNull(result.rhs);
} else {
Assert.assertEquals(VALUES[rows[i]], result.rhs, EPSILON);
}
}
}
private void clearBufferForPositions(int offset, int... positions)
{
for (int position : positions) {
target.init(buf, offset + position);
}
}
}

View File

@ -0,0 +1,241 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.first;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.query.aggregation.VectorAggregator;
import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.vector.BaseLongVectorValueSelector;
import org.apache.druid.segment.vector.MultiValueDimensionVectorSelector;
import org.apache.druid.segment.vector.NoFilterVectorOffset;
import org.apache.druid.segment.vector.ReadableVectorInspector;
import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector;
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
import org.apache.druid.segment.vector.VectorObjectSelector;
import org.apache.druid.segment.vector.VectorValueSelector;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.concurrent.ThreadLocalRandom;
public class LongFirstVectorAggregationTest extends InitializedNullHandlingTest
{
private static final double EPSILON = 1e-5;
private static final long[] VALUES = new long[]{7, 15, 2, 150};
private static final boolean[] NULLS = new boolean[]{false, false, true, false};
private static final String NAME = "NAME";
private static final String FIELD_NAME = "FIELD_NAME";
private static final String TIME_COL = "__time";
private long[] times = {2436, 6879, 7888, 8224};
private VectorValueSelector selector;
private BaseLongVectorValueSelector timeSelector;
private ByteBuffer buf;
private LongFirstVectorAggregator target;
private LongFirstAggregatorFactory longFirstAggregatorFactory;
private VectorColumnSelectorFactory selectorFactory;
@Before
public void setup()
{
byte[] randomBytes = new byte[1024];
ThreadLocalRandom.current().nextBytes(randomBytes);
buf = ByteBuffer.wrap(randomBytes);
timeSelector = new BaseLongVectorValueSelector(new NoFilterVectorOffset(times.length, 0, times.length)
{
})
{
@Override
public long[] getLongVector()
{
return times;
}
@Nullable
@Override
public boolean[] getNullVector()
{
return NULLS;
}
};
selector = new BaseLongVectorValueSelector(new NoFilterVectorOffset(VALUES.length, 0, VALUES.length)
{
})
{
@Override
public long[] getLongVector()
{
return VALUES;
}
@Nullable
@Override
public boolean[] getNullVector()
{
if (!NullHandling.replaceWithDefault()) {
return NULLS;
}
return null;
}
};
target = new LongFirstVectorAggregator(timeSelector, selector);
clearBufferForPositions(0, 0);
selectorFactory = new VectorColumnSelectorFactory()
{
@Override
public ReadableVectorInspector getReadableVectorInspector()
{
return null;
}
@Override
public SingleValueDimensionVectorSelector makeSingleValueDimensionSelector(DimensionSpec dimensionSpec)
{
return null;
}
@Override
public MultiValueDimensionVectorSelector makeMultiValueDimensionSelector(DimensionSpec dimensionSpec)
{
return null;
}
@Override
public VectorValueSelector makeValueSelector(String column)
{
if (TIME_COL.equals(column)) {
return timeSelector;
} else if (FIELD_NAME.equals(column)) {
return selector;
} else {
return null;
}
}
@Override
public VectorObjectSelector makeObjectSelector(String column)
{
return null;
}
@Nullable
@Override
public ColumnCapabilities getColumnCapabilities(String column)
{
if (FIELD_NAME.equals(column)) {
return ColumnCapabilitiesImpl.createSimpleNumericColumnCapabilities(ColumnType.LONG);
}
return null;
}
};
longFirstAggregatorFactory = new LongFirstAggregatorFactory(NAME, FIELD_NAME, TIME_COL);
}
@Test
public void testFactory()
{
Assert.assertTrue(longFirstAggregatorFactory.canVectorize(selectorFactory));
VectorAggregator vectorAggregator = longFirstAggregatorFactory.factorizeVector(selectorFactory);
Assert.assertNotNull(vectorAggregator);
Assert.assertEquals(LongFirstVectorAggregator.class, vectorAggregator.getClass());
}
@Test
public void initValueShouldInitZero()
{
target.initValue(buf, 0);
long initVal = buf.getLong(0);
Assert.assertEquals(0, initVal);
}
@Test
public void aggregate()
{
target.aggregate(buf, 0, 0, VALUES.length);
Pair<Long, Long> result = (Pair<Long, Long>) target.get(buf, 0);
Assert.assertEquals(times[0], result.lhs.longValue());
Assert.assertEquals(VALUES[0], result.rhs, EPSILON);
}
@Test
public void aggregateWithNulls()
{
target.aggregate(buf, 0, 0, VALUES.length);
Pair<Long, Long> result = (Pair<Long, Long>) target.get(buf, 0);
Assert.assertEquals(times[0], result.lhs.longValue());
Assert.assertEquals(VALUES[0], result.rhs, EPSILON);
}
@Test
public void aggregateBatchWithoutRows()
{
int[] positions = new int[]{0, 43, 70};
int positionOffset = 2;
clearBufferForPositions(positionOffset, positions);
target.aggregate(buf, 3, positions, null, positionOffset);
for (int i = 0; i < positions.length; i++) {
Pair<Long, Long> result = (Pair<Long, Long>) target.get(buf, positions[i] + positionOffset);
Assert.assertEquals(times[i], result.lhs.longValue());
if (!NullHandling.replaceWithDefault() && NULLS[i]) {
Assert.assertNull(result.rhs);
} else {
Assert.assertEquals(VALUES[i], result.rhs, EPSILON);
}
}
}
@Test
public void aggregateBatchWithRows()
{
int[] positions = new int[]{0, 43, 70};
int[] rows = new int[]{3, 2, 0};
int positionOffset = 2;
clearBufferForPositions(positionOffset, positions);
target.aggregate(buf, 3, positions, rows, positionOffset);
for (int i = 0; i < positions.length; i++) {
Pair<Long, Long> result = (Pair<Long, Long>) target.get(buf, positions[i] + positionOffset);
Assert.assertEquals(times[rows[i]], result.lhs.longValue());
if (!NullHandling.replaceWithDefault() && NULLS[rows[i]]) {
Assert.assertNull(result.rhs);
} else {
Assert.assertEquals(VALUES[rows[i]], result.rhs, EPSILON);
}
}
}
private void clearBufferForPositions(int offset, int... positions)
{
for (int position : positions) {
target.init(buf, offset + position);
}
}
}

View File

@ -0,0 +1,167 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.first;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.query.aggregation.SerializablePairLongString;
import org.apache.druid.query.aggregation.VectorAggregator;
import org.apache.druid.segment.vector.BaseLongVectorValueSelector;
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
import org.apache.druid.segment.vector.VectorObjectSelector;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Answers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import java.nio.ByteBuffer;
import java.util.concurrent.ThreadLocalRandom;
@RunWith(MockitoJUnitRunner.class)
public class StringFirstVectorAggregatorTest extends InitializedNullHandlingTest
{
private static final double EPSILON = 1e-5;
private static final String[] VALUES = new String[]{"a", "b", null, "c"};
private static final boolean[] NULLS = new boolean[]{false, false, true, false};
private static final String NAME = "NAME";
private static final String FIELD_NAME = "FIELD_NAME";
private static final String TIME_COL = "__time";
private long[] times = {2436, 6879, 7888, 8224};
private long[] timesSame = {2436, 2436};
private SerializablePairLongString[] pairs = {
new SerializablePairLongString(2345001L, "first"),
new SerializablePairLongString(2345100L, "notFirst")
};
@Mock
private VectorObjectSelector selector;
@Mock
private VectorObjectSelector selectorForPairs;
@Mock
private BaseLongVectorValueSelector timeSelector;
@Mock
private BaseLongVectorValueSelector timeSelectorForPairs;
private ByteBuffer buf;
private StringFirstVectorAggregator target;
private StringFirstVectorAggregator targetWithPairs;
private StringFirstAggregatorFactory stringFirstAggregatorFactory;
@Mock(answer = Answers.RETURNS_DEEP_STUBS)
private VectorColumnSelectorFactory selectorFactory;
@Before
public void setup()
{
byte[] randomBytes = new byte[1024];
ThreadLocalRandom.current().nextBytes(randomBytes);
buf = ByteBuffer.wrap(randomBytes);
Mockito.doReturn(VALUES).when(selector).getObjectVector();
Mockito.doReturn(times).when(timeSelector).getLongVector();
Mockito.doReturn(timesSame).when(timeSelectorForPairs).getLongVector();
Mockito.doReturn(pairs).when(selectorForPairs).getObjectVector();
target = new StringFirstVectorAggregator(timeSelector, selector, 10);
targetWithPairs = new StringFirstVectorAggregator(timeSelectorForPairs, selectorForPairs, 10);
clearBufferForPositions(0, 0);
Mockito.doReturn(selector).when(selectorFactory).makeObjectSelector(FIELD_NAME);
Mockito.doReturn(timeSelector).when(selectorFactory).makeValueSelector(TIME_COL);
stringFirstAggregatorFactory = new StringFirstAggregatorFactory(NAME, FIELD_NAME, TIME_COL, 10);
}
@Test
public void testAggregateWithPairs()
{
targetWithPairs.aggregate(buf, 0, 0, pairs.length);
Pair<Long, String> result = (Pair<Long, String>) targetWithPairs.get(buf, 0);
//Should come 0 as the last value as the left of the pair is greater
Assert.assertEquals(pairs[0].lhs.longValue(), result.lhs.longValue());
Assert.assertEquals(pairs[0].rhs, result.rhs);
}
@Test
public void testFactory()
{
Assert.assertTrue(stringFirstAggregatorFactory.canVectorize(selectorFactory));
VectorAggregator vectorAggregator = stringFirstAggregatorFactory.factorizeVector(selectorFactory);
Assert.assertNotNull(vectorAggregator);
Assert.assertEquals(StringFirstVectorAggregator.class, vectorAggregator.getClass());
}
@Test
public void initValueShouldBeMaxDate()
{
target.init(buf, 0);
long initVal = buf.getLong(0);
Assert.assertEquals(DateTimes.MAX.getMillis(), initVal);
}
@Test
public void aggregate()
{
target.aggregate(buf, 0, 0, VALUES.length);
Pair<Long, String> result = (Pair<Long, String>) target.get(buf, 0);
Assert.assertEquals(times[0], result.lhs.longValue());
Assert.assertEquals(VALUES[0], result.rhs);
}
@Test
public void aggregateBatchWithoutRows()
{
int[] positions = new int[]{0, 43, 70};
int positionOffset = 2;
clearBufferForPositions(positionOffset, positions);
target.aggregate(buf, 3, positions, null, positionOffset);
for (int i = 0; i < positions.length; i++) {
Pair<Long, String> result = (Pair<Long, String>) target.get(buf, positions[i] + positionOffset);
Assert.assertEquals(times[i], result.lhs.longValue());
Assert.assertEquals(VALUES[i], result.rhs);
}
}
@Test
public void aggregateBatchWithRows()
{
int[] positions = new int[]{0, 43, 70};
int[] rows = new int[]{3, 2, 0};
int positionOffset = 2;
clearBufferForPositions(positionOffset, positions);
target.aggregate(buf, 3, positions, rows, positionOffset);
for (int i = 0; i < positions.length; i++) {
Pair<Long, String> result = (Pair<Long, String>) target.get(buf, positions[i] + positionOffset);
Assert.assertEquals(times[rows[i]], result.lhs.longValue());
Assert.assertEquals(VALUES[rows[i]], result.rhs);
}
}
private void clearBufferForPositions(int offset, int... positions)
{
for (int position : positions) {
target.init(buf, offset + position);
}
}
}

View File

@ -3280,9 +3280,6 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
@Test @Test
public void testGroupByWithFirstLast() public void testGroupByWithFirstLast()
{ {
// Cannot vectorize due to "first", "last" aggregators.
cannotVectorize();
GroupByQuery query = makeQueryBuilder() GroupByQuery query = makeQueryBuilder()
.setDataSource(QueryRunnerTestHelper.DATA_SOURCE) .setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
.setQuerySegmentSpec(QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC) .setQuerySegmentSpec(QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC)
@ -3370,9 +3367,6 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
@Test @Test
public void testGroupByWithNoResult() public void testGroupByWithNoResult()
{ {
// Cannot vectorize due to first, last aggregators.
cannotVectorize();
GroupByQuery query = makeQueryBuilder() GroupByQuery query = makeQueryBuilder()
.setDataSource(QueryRunnerTestHelper.DATA_SOURCE) .setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
.setQuerySegmentSpec(QueryRunnerTestHelper.EMPTY_INTERVAL) .setQuerySegmentSpec(QueryRunnerTestHelper.EMPTY_INTERVAL)
@ -7198,9 +7192,6 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
@Test @Test
public void testSubqueryWithFirstLast() public void testSubqueryWithFirstLast()
{ {
// Cannot vectorize due to "first", "last" aggregators.
cannotVectorize();
GroupByQuery subquery = makeQueryBuilder() GroupByQuery subquery = makeQueryBuilder()
.setDataSource(QueryRunnerTestHelper.DATA_SOURCE) .setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
.setQuerySegmentSpec(QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC) .setQuerySegmentSpec(QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC)

View File

@ -169,9 +169,6 @@ public class TimeseriesQueryRunnerTest extends InitializedNullHandlingTest
@Test @Test
public void testEmptyTimeseries() public void testEmptyTimeseries()
{ {
// Cannot vectorize due to "doubleFirst" aggregator.
cannotVectorize();
TimeseriesQuery query = Druids.newTimeseriesQueryBuilder() TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
.dataSource(QueryRunnerTestHelper.DATA_SOURCE) .dataSource(QueryRunnerTestHelper.DATA_SOURCE)
.granularity(QueryRunnerTestHelper.ALL_GRAN) .granularity(QueryRunnerTestHelper.ALL_GRAN)
@ -1960,9 +1957,6 @@ public class TimeseriesQueryRunnerTest extends InitializedNullHandlingTest
@Test @Test
public void testTimeseriesWithFirstLastAggregator() public void testTimeseriesWithFirstLastAggregator()
{ {
// Cannot vectorize due to "doubleFirst", "doubleLast" aggregators.
cannotVectorize();
TimeseriesQuery query = Druids.newTimeseriesQueryBuilder() TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
.dataSource(QueryRunnerTestHelper.DATA_SOURCE) .dataSource(QueryRunnerTestHelper.DATA_SOURCE)
.granularity(QueryRunnerTestHelper.MONTH_GRAN) .granularity(QueryRunnerTestHelper.MONTH_GRAN)

View File

@ -630,8 +630,6 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
public void testEarliestAggregators() public void testEarliestAggregators()
{ {
notMsqCompatible(); notMsqCompatible();
// Cannot vectorize EARLIEST aggregator.
skipVectorize();
testQuery( testQuery(
"SELECT " "SELECT "
@ -1096,8 +1094,6 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
public void testPrimitiveEarliestInSubquery() public void testPrimitiveEarliestInSubquery()
{ {
notMsqCompatible(); notMsqCompatible();
// Cannot vectorize EARLIEST aggregator.
skipVectorize();
testQuery( testQuery(
"SELECT SUM(val1), SUM(val2), SUM(val3) FROM (SELECT dim2, EARLIEST(m1) AS val1, EARLIEST(cnt) AS val2, EARLIEST(m2) AS val3 FROM foo GROUP BY dim2)", "SELECT SUM(val1), SUM(val2), SUM(val3) FROM (SELECT dim2, EARLIEST(m1) AS val1, EARLIEST(cnt) AS val2, EARLIEST(m2) AS val3 FROM foo GROUP BY dim2)",
@ -1195,11 +1191,8 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
@Test @Test
public void testStringEarliestInSubquery() public void testStringEarliestInSubquery()
{ {
// Cannot vectorize EARLIEST aggregator.
skipVectorize();
testQuery( testQuery(
"SELECT SUM(val) FROM (SELECT dim2, EARLIEST(dim1, 10) AS val FROM foo GROUP BY dim2)", "SELECT SUM(val) FROM (SELECT dim2, EARLIEST(dim1,10) AS val FROM foo GROUP BY dim2)",
ImmutableList.of( ImmutableList.of(
GroupByQuery.builder() GroupByQuery.builder()
.setDataSource( .setDataSource(
@ -1305,6 +1298,75 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
); );
} }
@Test
public void testStringEarliestSingleStringDim()
{
notMsqCompatible();
testQuery(
"SELECT dim2, EARLIEST(dim1,10) AS val FROM foo GROUP BY dim2",
ImmutableList.of(
GroupByQuery.builder()
.setDataSource(CalciteTests.DATASOURCE1)
.setDimensions(dimensions(new DefaultDimensionSpec("dim2", "d0")))
.setAggregatorSpecs(aggregators(new StringFirstAggregatorFactory(
"a0",
"dim1",
null,
10
)))
.setInterval(querySegmentSpec(Filtration.eternity()))
.setGranularity(Granularities.ALL)
.setContext(QUERY_CONTEXT_DEFAULT)
.build()
),
NullHandling.sqlCompatible() ?
ImmutableList.of(
new Object[]{null, "10.1"},
new Object[]{"", "2"},
new Object[]{"a", ""},
new Object[]{"abc", "def"}
) : ImmutableList.of(
new Object[]{"", "10.1"},
new Object[]{"a", ""},
new Object[]{"abc", "def"}
)
);
}
@Test
public void testStringEarliestMultiStringDim()
{
testQuery(
"SELECT dim2, EARLIEST(dim3,10) AS val FROM foo GROUP BY dim2",
ImmutableList.of(
GroupByQuery.builder()
.setDataSource(CalciteTests.DATASOURCE1)
.setDimensions(dimensions(new DefaultDimensionSpec("dim2", "d0")))
.setAggregatorSpecs(aggregators(new StringFirstAggregatorFactory(
"a0",
"dim3",
null,
10
)))
.setInterval(querySegmentSpec(Filtration.eternity()))
.setGranularity(Granularities.ALL)
.setContext(QUERY_CONTEXT_DEFAULT)
.build()
),
NullHandling.sqlCompatible() ?
ImmutableList.of(
new Object[]{null, "[b, c]"},
new Object[]{"", "d"},
new Object[]{"a", "[a, b]"},
new Object[]{"abc", null}
) : ImmutableList.of(
new Object[]{"", "[b, c]"},
new Object[]{"a", "[a, b]"},
new Object[]{"abc", ""}
)
);
}
// This test the off-heap (buffer) version of the AnyAggregator (String) // This test the off-heap (buffer) version of the AnyAggregator (String)
@Test @Test
public void testStringAnyInSubquery() public void testStringAnyInSubquery()
@ -1356,8 +1418,6 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
public void testEarliestAggregatorsNumericNulls() public void testEarliestAggregatorsNumericNulls()
{ {
notMsqCompatible(); notMsqCompatible();
// Cannot vectorize EARLIEST aggregator.
skipVectorize();
testQuery( testQuery(
"SELECT EARLIEST(l1), EARLIEST(d1), EARLIEST(f1) FROM druid.numfoo", "SELECT EARLIEST(l1), EARLIEST(d1), EARLIEST(f1) FROM druid.numfoo",
@ -1417,8 +1477,6 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
public void testFirstLatestAggregatorsSkipNulls() public void testFirstLatestAggregatorsSkipNulls()
{ {
notMsqCompatible(); notMsqCompatible();
// Cannot vectorize EARLIEST aggregator.
skipVectorize();
final DimFilter filter; final DimFilter filter;
if (useDefault) { if (useDefault) {
@ -1533,8 +1591,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
public void testOrderByEarliestFloat() public void testOrderByEarliestFloat()
{ {
notMsqCompatible(); notMsqCompatible();
// Cannot vectorize EARLIEST aggregator.
skipVectorize();
List<Object[]> expected; List<Object[]> expected;
if (NullHandling.replaceWithDefault()) { if (NullHandling.replaceWithDefault()) {
expected = ImmutableList.of( expected = ImmutableList.of(
@ -1581,8 +1638,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
public void testOrderByEarliestDouble() public void testOrderByEarliestDouble()
{ {
notMsqCompatible(); notMsqCompatible();
// Cannot vectorize EARLIEST aggregator.
skipVectorize();
List<Object[]> expected; List<Object[]> expected;
if (NullHandling.replaceWithDefault()) { if (NullHandling.replaceWithDefault()) {
expected = ImmutableList.of( expected = ImmutableList.of(
@ -1629,8 +1685,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
public void testOrderByEarliestLong() public void testOrderByEarliestLong()
{ {
notMsqCompatible(); notMsqCompatible();
// Cannot vectorize EARLIEST aggregator.
skipVectorize();
List<Object[]> expected; List<Object[]> expected;
if (NullHandling.replaceWithDefault()) { if (NullHandling.replaceWithDefault()) {
expected = ImmutableList.of( expected = ImmutableList.of(