mirror of https://github.com/apache/druid.git
Vectorized ANY aggregators (#10338)
* WIP vectorized ANY aggregators * tests * fix aggs * cleanup * code review + tests * docs * use NilVectorSelector when needed * fix spellcheck * dont instantiate vectors * cleanup
This commit is contained in:
parent
e012d5c41b
commit
f71ba6f2c2
|
@ -90,8 +90,8 @@ requirements:
|
|||
include "selector", "bound", "in", "like", "regex", "search", "and", "or", and "not".
|
||||
- All filters in filtered aggregators must offer vectorized row-matchers.
|
||||
- All aggregators must offer vectorized implementations. These include "count", "doubleSum", "floatSum", "longSum", "longMin",
|
||||
"longMax", "doubleMin", "doubleMax", "floatMin", "floatMax", "hyperUnique", "filtered", "approxHistogram",
|
||||
"approxHistogramFold", and "fixedBucketsHistogram" (with numerical input).
|
||||
"longMax", "doubleMin", "doubleMax", "floatMin", "floatMax", "longAny", "doubleAny", "floatAny", "stringAny",
|
||||
"hyperUnique", "filtered", "approxHistogram", "approxHistogramFold", and "fixedBucketsHistogram" (with numerical input).
|
||||
- No virtual columns.
|
||||
- For GroupBy: All dimension specs must be "default" (no extraction functions or filtered dimension specs).
|
||||
- For GroupBy: No multi-value dimensions.
|
||||
|
|
|
@ -28,12 +28,16 @@ import org.apache.druid.query.aggregation.Aggregator;
|
|||
import org.apache.druid.query.aggregation.AggregatorFactory;
|
||||
import org.apache.druid.query.aggregation.AggregatorUtil;
|
||||
import org.apache.druid.query.aggregation.BufferAggregator;
|
||||
import org.apache.druid.query.aggregation.VectorAggregator;
|
||||
import org.apache.druid.query.cache.CacheKeyBuilder;
|
||||
import org.apache.druid.segment.BaseDoubleColumnValueSelector;
|
||||
import org.apache.druid.segment.ColumnInspector;
|
||||
import org.apache.druid.segment.ColumnSelectorFactory;
|
||||
import org.apache.druid.segment.NilColumnValueSelector;
|
||||
import org.apache.druid.segment.column.ColumnCapabilities;
|
||||
import org.apache.druid.segment.column.ColumnHolder;
|
||||
import org.apache.druid.segment.column.ValueType;
|
||||
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.nio.ByteBuffer;
|
||||
|
@ -112,6 +116,23 @@ public class DoubleAnyAggregatorFactory extends AggregatorFactory
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public VectorAggregator factorizeVector(VectorColumnSelectorFactory selectorFactory)
|
||||
{
|
||||
ColumnCapabilities capabilities = selectorFactory.getColumnCapabilities(fieldName);
|
||||
if (capabilities == null || capabilities.getType().isNumeric()) {
|
||||
return new DoubleAnyVectorAggregator(selectorFactory.makeValueSelector(fieldName));
|
||||
} else {
|
||||
return NumericNilVectorAggregator.doubleNilVectorAggregator();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean canVectorize(ColumnInspector columnInspector)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Comparator getComparator()
|
||||
{
|
||||
|
|
|
@ -0,0 +1,58 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.query.aggregation.any;
|
||||
|
||||
import org.apache.druid.segment.vector.VectorValueSelector;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
/**
|
||||
* Vectorized implementation of the {@link DoubleAnyBufferAggregator}
|
||||
*/
|
||||
public class DoubleAnyVectorAggregator extends NumericAnyVectorAggregator
|
||||
{
|
||||
public DoubleAnyVectorAggregator(VectorValueSelector selector)
|
||||
{
|
||||
super(selector);
|
||||
}
|
||||
|
||||
@Override
|
||||
void initValue(ByteBuffer buf, int position)
|
||||
{
|
||||
buf.putDouble(position, 0);
|
||||
}
|
||||
|
||||
@Override
|
||||
boolean putAnyValueFromRow(ByteBuffer buf, int position, int startRow, int endRow)
|
||||
{
|
||||
double[] values = vectorValueSelector.getDoubleVector();
|
||||
boolean isRowsWithinIndex = startRow < endRow && startRow < values.length;
|
||||
if (isRowsWithinIndex) {
|
||||
buf.putDouble(position, values[startRow]);
|
||||
}
|
||||
return isRowsWithinIndex;
|
||||
}
|
||||
|
||||
@Override
|
||||
Object getNonNullObject(ByteBuffer buf, int position)
|
||||
{
|
||||
return buf.getDouble(position);
|
||||
}
|
||||
}
|
|
@ -28,11 +28,15 @@ import org.apache.druid.query.aggregation.Aggregator;
|
|||
import org.apache.druid.query.aggregation.AggregatorFactory;
|
||||
import org.apache.druid.query.aggregation.AggregatorUtil;
|
||||
import org.apache.druid.query.aggregation.BufferAggregator;
|
||||
import org.apache.druid.query.aggregation.VectorAggregator;
|
||||
import org.apache.druid.query.cache.CacheKeyBuilder;
|
||||
import org.apache.druid.segment.BaseFloatColumnValueSelector;
|
||||
import org.apache.druid.segment.ColumnInspector;
|
||||
import org.apache.druid.segment.ColumnSelectorFactory;
|
||||
import org.apache.druid.segment.NilColumnValueSelector;
|
||||
import org.apache.druid.segment.column.ColumnCapabilities;
|
||||
import org.apache.druid.segment.column.ValueType;
|
||||
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.nio.ByteBuffer;
|
||||
|
@ -109,6 +113,23 @@ public class FloatAnyAggregatorFactory extends AggregatorFactory
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public VectorAggregator factorizeVector(VectorColumnSelectorFactory selectorFactory)
|
||||
{
|
||||
ColumnCapabilities capabilities = selectorFactory.getColumnCapabilities(fieldName);
|
||||
if (capabilities == null || capabilities.getType().isNumeric()) {
|
||||
return new FloatAnyVectorAggregator(selectorFactory.makeValueSelector(fieldName));
|
||||
} else {
|
||||
return NumericNilVectorAggregator.floatNilVectorAggregator();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean canVectorize(ColumnInspector columnInspector)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Comparator getComparator()
|
||||
{
|
||||
|
|
|
@ -0,0 +1,58 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.query.aggregation.any;
|
||||
|
||||
import org.apache.druid.segment.vector.VectorValueSelector;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
/**
|
||||
* Vectorized implementation of the {@link FloatAnyBufferAggregator}
|
||||
*/
|
||||
public class FloatAnyVectorAggregator extends NumericAnyVectorAggregator
|
||||
{
|
||||
public FloatAnyVectorAggregator(VectorValueSelector vectorValueSelector)
|
||||
{
|
||||
super(vectorValueSelector);
|
||||
}
|
||||
|
||||
@Override
|
||||
void initValue(ByteBuffer buf, int position)
|
||||
{
|
||||
buf.putFloat(position, 0F);
|
||||
}
|
||||
|
||||
@Override
|
||||
boolean putAnyValueFromRow(ByteBuffer buf, int position, int startRow, int endRow)
|
||||
{
|
||||
float[] values = vectorValueSelector.getFloatVector();
|
||||
boolean isRowsWithinIndex = startRow < endRow && startRow < values.length;
|
||||
if (isRowsWithinIndex) {
|
||||
buf.putFloat(position, values[startRow]);
|
||||
}
|
||||
return isRowsWithinIndex;
|
||||
}
|
||||
|
||||
@Override
|
||||
Object getNonNullObject(ByteBuffer buf, int position)
|
||||
{
|
||||
return buf.getFloat(position);
|
||||
}
|
||||
}
|
|
@ -28,11 +28,15 @@ import org.apache.druid.query.aggregation.Aggregator;
|
|||
import org.apache.druid.query.aggregation.AggregatorFactory;
|
||||
import org.apache.druid.query.aggregation.AggregatorUtil;
|
||||
import org.apache.druid.query.aggregation.BufferAggregator;
|
||||
import org.apache.druid.query.aggregation.VectorAggregator;
|
||||
import org.apache.druid.query.cache.CacheKeyBuilder;
|
||||
import org.apache.druid.segment.BaseLongColumnValueSelector;
|
||||
import org.apache.druid.segment.ColumnInspector;
|
||||
import org.apache.druid.segment.ColumnSelectorFactory;
|
||||
import org.apache.druid.segment.NilColumnValueSelector;
|
||||
import org.apache.druid.segment.column.ColumnCapabilities;
|
||||
import org.apache.druid.segment.column.ValueType;
|
||||
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.nio.ByteBuffer;
|
||||
|
@ -108,6 +112,23 @@ public class LongAnyAggregatorFactory extends AggregatorFactory
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public VectorAggregator factorizeVector(VectorColumnSelectorFactory selectorFactory)
|
||||
{
|
||||
ColumnCapabilities capabilities = selectorFactory.getColumnCapabilities(fieldName);
|
||||
if (capabilities == null || capabilities.getType().isNumeric()) {
|
||||
return new LongAnyVectorAggregator(selectorFactory.makeValueSelector(fieldName));
|
||||
} else {
|
||||
return NumericNilVectorAggregator.longNilVectorAggregator();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean canVectorize(ColumnInspector columnInspector)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Comparator getComparator()
|
||||
{
|
||||
|
|
|
@ -0,0 +1,59 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.query.aggregation.any;
|
||||
|
||||
import org.apache.druid.segment.vector.VectorValueSelector;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
/**
|
||||
* Vectorized implementation of the {@link LongAnyBufferAggregator}
|
||||
*/
|
||||
public class LongAnyVectorAggregator extends NumericAnyVectorAggregator
|
||||
{
|
||||
|
||||
public LongAnyVectorAggregator(VectorValueSelector vectorValueSelector)
|
||||
{
|
||||
super(vectorValueSelector);
|
||||
}
|
||||
|
||||
@Override
|
||||
void initValue(ByteBuffer buf, int position)
|
||||
{
|
||||
buf.putLong(position, 0L);
|
||||
}
|
||||
|
||||
@Override
|
||||
boolean putAnyValueFromRow(ByteBuffer buf, int position, int startRow, int endRow)
|
||||
{
|
||||
long[] values = vectorValueSelector.getLongVector();
|
||||
boolean isRowsWithinIndex = startRow < endRow && startRow < values.length;
|
||||
if (isRowsWithinIndex) {
|
||||
buf.putLong(position, values[startRow]);
|
||||
}
|
||||
return isRowsWithinIndex;
|
||||
}
|
||||
|
||||
@Override
|
||||
Object getNonNullObject(ByteBuffer buf, int position)
|
||||
{
|
||||
return buf.getLong(position);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,139 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.query.aggregation.any;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.druid.common.config.NullHandling;
|
||||
import org.apache.druid.query.aggregation.VectorAggregator;
|
||||
import org.apache.druid.segment.vector.VectorValueSelector;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
public abstract class NumericAnyVectorAggregator implements VectorAggregator
|
||||
{
|
||||
// Rightmost bit for is null check (0 for is null and 1 for not null)
|
||||
// Second rightmost bit for is found check (0 for not found and 1 for found)
|
||||
@VisibleForTesting
|
||||
static final byte BYTE_FLAG_FOUND_MASK = 0x02;
|
||||
private static final byte BYTE_FLAG_NULL_MASK = 0x01;
|
||||
private static final int FOUND_VALUE_OFFSET = Byte.BYTES;
|
||||
|
||||
private final boolean replaceWithDefault = NullHandling.replaceWithDefault();
|
||||
protected final VectorValueSelector vectorValueSelector;
|
||||
|
||||
public NumericAnyVectorAggregator(VectorValueSelector vectorValueSelector)
|
||||
{
|
||||
this.vectorValueSelector = vectorValueSelector;
|
||||
}
|
||||
|
||||
/**
|
||||
* Initialize the buffer value given the initial offset position within the byte buffer for initialization
|
||||
*/
|
||||
abstract void initValue(ByteBuffer buf, int position);
|
||||
|
||||
/**
|
||||
* Place any primitive value from the rows, starting at {@param startRow}(inclusive) up to {@param endRow}(exclusive),
|
||||
* in the buffer given the initial offset position within the byte buffer at which the current aggregate value
|
||||
* is stored.
|
||||
* @return true if a value was added, false otherwise
|
||||
*/
|
||||
abstract boolean putAnyValueFromRow(ByteBuffer buf, int position, int startRow, int endRow);
|
||||
|
||||
/**
|
||||
* @return The primitive object stored at the position in the buffer.
|
||||
*/
|
||||
abstract Object getNonNullObject(ByteBuffer buf, int position);
|
||||
|
||||
@Override
|
||||
public void init(ByteBuffer buf, int position)
|
||||
{
|
||||
buf.put(position, replaceWithDefault ? NullHandling.IS_NOT_NULL_BYTE : NullHandling.IS_NULL_BYTE);
|
||||
initValue(buf, position + FOUND_VALUE_OFFSET);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void aggregate(ByteBuffer buf, int position, int startRow, int endRow)
|
||||
{
|
||||
if ((buf.get(position) & BYTE_FLAG_FOUND_MASK) != BYTE_FLAG_FOUND_MASK) {
|
||||
boolean[] nulls = vectorValueSelector.getNullVector();
|
||||
// check if there are any nulls
|
||||
if (nulls != null) {
|
||||
for (int i = startRow; i < endRow && i < nulls.length; i++) {
|
||||
// And there is actually a null
|
||||
if (nulls[i]) {
|
||||
putNull(buf, position);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
// There are no nulls, so try to put a value from the value selector
|
||||
if (putAnyValueFromRow(buf, position + FOUND_VALUE_OFFSET, startRow, endRow)) {
|
||||
buf.put(position, (byte) (BYTE_FLAG_FOUND_MASK | NullHandling.IS_NOT_NULL_BYTE));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void aggregate(
|
||||
ByteBuffer buf,
|
||||
int numRows,
|
||||
int[] positions,
|
||||
@Nullable int[] rows,
|
||||
int positionOffset
|
||||
)
|
||||
{
|
||||
for (int i = 0; i < numRows; i++) {
|
||||
int position = positions[i] + positionOffset;
|
||||
int row = rows == null ? i : rows[i];
|
||||
aggregate(buf, position, row, row + 1);
|
||||
}
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
public Object get(ByteBuffer buf, int position)
|
||||
{
|
||||
final boolean isNull = isValueNull(buf, position);
|
||||
return isNull ? null : getNonNullObject(buf, position + FOUND_VALUE_OFFSET);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close()
|
||||
{
|
||||
// No resources to cleanup.
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
boolean isValueNull(ByteBuffer buf, int position)
|
||||
{
|
||||
return (buf.get(position) & BYTE_FLAG_NULL_MASK) == NullHandling.IS_NULL_BYTE;
|
||||
}
|
||||
|
||||
private void putNull(ByteBuffer buf, int position)
|
||||
{
|
||||
if (!replaceWithDefault) {
|
||||
buf.put(position, (byte) (BYTE_FLAG_FOUND_MASK | NullHandling.IS_NULL_BYTE));
|
||||
} else {
|
||||
initValue(buf, position + FOUND_VALUE_OFFSET);
|
||||
buf.put(position, (byte) (BYTE_FLAG_FOUND_MASK | NullHandling.IS_NOT_NULL_BYTE));
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,113 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.query.aggregation.any;
|
||||
|
||||
import org.apache.druid.common.config.NullHandling;
|
||||
import org.apache.druid.query.aggregation.VectorAggregator;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
/**
|
||||
* A vector aggregator that returns the default numeric value.
|
||||
*/
|
||||
public class NumericNilVectorAggregator implements VectorAggregator
|
||||
{
|
||||
private static final NumericNilVectorAggregator DOUBLE_NIL_VECTOR_AGGREGATOR = new NumericNilVectorAggregator(
|
||||
NullHandling.defaultDoubleValue()
|
||||
);
|
||||
|
||||
private static final NumericNilVectorAggregator FLOAT_NIL_VECTOR_AGGREGATOR = new NumericNilVectorAggregator(
|
||||
NullHandling.defaultFloatValue()
|
||||
);
|
||||
|
||||
private static final NumericNilVectorAggregator LONG_NIL_VECTOR_AGGREGATOR = new NumericNilVectorAggregator(
|
||||
NullHandling.defaultLongValue()
|
||||
);
|
||||
|
||||
/**
|
||||
* @return A vectorized aggregator that returns the default double value.
|
||||
*/
|
||||
public static NumericNilVectorAggregator doubleNilVectorAggregator()
|
||||
{
|
||||
return DOUBLE_NIL_VECTOR_AGGREGATOR;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return A vectorized aggregator that returns the default float value.
|
||||
*/
|
||||
public static NumericNilVectorAggregator floatNilVectorAggregator()
|
||||
{
|
||||
return FLOAT_NIL_VECTOR_AGGREGATOR;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return A vectorized aggregator that returns the default long value.
|
||||
*/
|
||||
public static NumericNilVectorAggregator longNilVectorAggregator()
|
||||
{
|
||||
return LONG_NIL_VECTOR_AGGREGATOR;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
private final Object returnValue;
|
||||
|
||||
private NumericNilVectorAggregator(@Nullable Object returnValue)
|
||||
{
|
||||
this.returnValue = returnValue;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init(ByteBuffer buf, int position)
|
||||
{
|
||||
// Do nothing
|
||||
}
|
||||
|
||||
@Override
|
||||
public void aggregate(ByteBuffer buf, int position, int startRow, int endRow)
|
||||
{
|
||||
// Do nothing.
|
||||
}
|
||||
|
||||
@Override
|
||||
public void aggregate(
|
||||
ByteBuffer buf,
|
||||
int numRows,
|
||||
int[] positions,
|
||||
@Nullable int[] rows,
|
||||
int positionOffset
|
||||
)
|
||||
{
|
||||
// Do nothing.
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
public Object get(ByteBuffer buf, int position)
|
||||
{
|
||||
return returnValue;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close()
|
||||
{
|
||||
// Do nothing.
|
||||
}
|
||||
}
|
|
@ -29,8 +29,12 @@ import org.apache.druid.query.aggregation.AggregatorUtil;
|
|||
import org.apache.druid.query.aggregation.BufferAggregator;
|
||||
import org.apache.druid.query.aggregation.first.StringFirstAggregatorFactory;
|
||||
import org.apache.druid.query.cache.CacheKeyBuilder;
|
||||
import org.apache.druid.query.dimension.DefaultDimensionSpec;
|
||||
import org.apache.druid.segment.ColumnInspector;
|
||||
import org.apache.druid.segment.ColumnSelectorFactory;
|
||||
import org.apache.druid.segment.column.ColumnCapabilities;
|
||||
import org.apache.druid.segment.column.ValueType;
|
||||
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.util.Collections;
|
||||
|
@ -77,6 +81,32 @@ public class StringAnyAggregatorFactory extends AggregatorFactory
|
|||
return new StringAnyBufferAggregator(metricFactory.makeColumnValueSelector(fieldName), maxStringBytes);
|
||||
}
|
||||
|
||||
@Override
|
||||
public StringAnyVectorAggregator factorizeVector(VectorColumnSelectorFactory selectorFactory)
|
||||
{
|
||||
|
||||
ColumnCapabilities capabilities = selectorFactory.getColumnCapabilities(fieldName);
|
||||
if (capabilities == null || capabilities.hasMultipleValues().isMaybeTrue()) {
|
||||
return new StringAnyVectorAggregator(
|
||||
null,
|
||||
selectorFactory.makeMultiValueDimensionSelector(DefaultDimensionSpec.of(fieldName)),
|
||||
maxStringBytes
|
||||
);
|
||||
} else {
|
||||
return new StringAnyVectorAggregator(
|
||||
selectorFactory.makeSingleValueDimensionSelector(DefaultDimensionSpec.of(fieldName)),
|
||||
null,
|
||||
maxStringBytes
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean canVectorize(ColumnInspector columnInspector)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Comparator getComparator()
|
||||
{
|
||||
|
|
|
@ -0,0 +1,145 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.query.aggregation.any;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
import org.apache.druid.query.aggregation.VectorAggregator;
|
||||
import org.apache.druid.segment.data.IndexedInts;
|
||||
import org.apache.druid.segment.vector.MultiValueDimensionVectorSelector;
|
||||
import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
public class StringAnyVectorAggregator implements VectorAggregator
|
||||
{
|
||||
private static final int FOUND_AND_NULL_FLAG_VALUE = -1;
|
||||
@VisibleForTesting
|
||||
static final int NOT_FOUND_FLAG_VALUE = -2;
|
||||
@VisibleForTesting
|
||||
static final int FOUND_VALUE_OFFSET = Integer.BYTES;
|
||||
|
||||
@Nullable
|
||||
private final SingleValueDimensionVectorSelector singleValueSelector;
|
||||
@Nullable
|
||||
private final MultiValueDimensionVectorSelector multiValueSelector;
|
||||
private final int maxStringBytes;
|
||||
|
||||
public StringAnyVectorAggregator(
|
||||
SingleValueDimensionVectorSelector singleValueSelector,
|
||||
MultiValueDimensionVectorSelector multiValueSelector,
|
||||
int maxStringBytes
|
||||
)
|
||||
{
|
||||
Preconditions.checkState(
|
||||
singleValueSelector != null || multiValueSelector != null,
|
||||
"At least one selector must be non null"
|
||||
);
|
||||
Preconditions.checkState(
|
||||
singleValueSelector == null || multiValueSelector == null,
|
||||
"Only one selector must be non null"
|
||||
);
|
||||
this.multiValueSelector = multiValueSelector;
|
||||
this.singleValueSelector = singleValueSelector;
|
||||
this.maxStringBytes = maxStringBytes;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init(ByteBuffer buf, int position)
|
||||
{
|
||||
buf.putInt(position, NOT_FOUND_FLAG_VALUE);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void aggregate(ByteBuffer buf, int position, int startRow, int endRow)
|
||||
{
|
||||
if (buf.getInt(position) == NOT_FOUND_FLAG_VALUE && startRow < endRow) {
|
||||
if (multiValueSelector != null) {
|
||||
final IndexedInts[] rows = multiValueSelector.getRowVector();
|
||||
if (startRow < rows.length) {
|
||||
IndexedInts row = rows[startRow];
|
||||
@Nullable
|
||||
String foundValue = row.size() == 0 ? null : multiValueSelector.lookupName(row.get(0));
|
||||
putValue(buf, position, foundValue);
|
||||
}
|
||||
} else if (singleValueSelector != null) {
|
||||
final int[] rows = singleValueSelector.getRowVector();
|
||||
if (startRow < rows.length) {
|
||||
int row = rows[startRow];
|
||||
@Nullable
|
||||
String foundValue = singleValueSelector.lookupName(row);
|
||||
putValue(buf, position, foundValue);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void aggregate(
|
||||
ByteBuffer buf,
|
||||
int numRows,
|
||||
int[] positions, @Nullable int[] rows,
|
||||
int positionOffset
|
||||
)
|
||||
{
|
||||
for (int i = 0; i < numRows; i++) {
|
||||
int position = positions[i] + positionOffset;
|
||||
int row = rows == null ? i : rows[i];
|
||||
aggregate(buf, position, row, row + 1);
|
||||
}
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
public String get(ByteBuffer buf, int position)
|
||||
{
|
||||
ByteBuffer copyBuffer = buf.duplicate();
|
||||
copyBuffer.position(position);
|
||||
int stringSizeBytes = copyBuffer.getInt();
|
||||
if (stringSizeBytes >= 0) {
|
||||
byte[] valueBytes = new byte[stringSizeBytes];
|
||||
copyBuffer.get(valueBytes, 0, stringSizeBytes);
|
||||
return StringUtils.fromUtf8(valueBytes);
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close()
|
||||
{
|
||||
// Nothing to close.
|
||||
}
|
||||
|
||||
private void putValue(ByteBuffer buf, int position, @Nullable String foundValue)
|
||||
{
|
||||
if (foundValue != null) {
|
||||
ByteBuffer mutationBuffer = buf.duplicate();
|
||||
mutationBuffer.position(position + FOUND_VALUE_OFFSET);
|
||||
mutationBuffer.limit(position + FOUND_VALUE_OFFSET + maxStringBytes);
|
||||
final int len = StringUtils.toUtf8WithLimit(foundValue, mutationBuffer);
|
||||
mutationBuffer.putInt(position, len);
|
||||
} else {
|
||||
buf.putInt(position, FOUND_AND_NULL_FLAG_VALUE);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,101 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.query.aggregation.any;
|
||||
|
||||
import org.apache.druid.common.config.NullHandling;
|
||||
import org.apache.druid.query.aggregation.VectorAggregator;
|
||||
import org.apache.druid.segment.ColumnInspector;
|
||||
import org.apache.druid.segment.column.ColumnCapabilities;
|
||||
import org.apache.druid.segment.column.ValueType;
|
||||
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
|
||||
import org.apache.druid.segment.vector.VectorValueSelector;
|
||||
import org.apache.druid.testing.InitializedNullHandlingTest;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.mockito.Answers;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.junit.MockitoJUnitRunner;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
@RunWith(MockitoJUnitRunner.class)
|
||||
public class DoubleAnyAggregatorFactoryTest extends InitializedNullHandlingTest
|
||||
{
|
||||
private static final String NAME = "NAME";
|
||||
private static final String FIELD_NAME = "FIELD_NAME";
|
||||
private static final int POSITION = 2;
|
||||
private static final ByteBuffer BUFFER = ByteBuffer.allocate(128);
|
||||
|
||||
@Mock
|
||||
private ColumnCapabilities capabilities;
|
||||
@Mock
|
||||
private ColumnInspector columnInspector;
|
||||
@Mock(answer = Answers.RETURNS_DEEP_STUBS)
|
||||
private VectorColumnSelectorFactory selectorFactory;
|
||||
@Mock
|
||||
private VectorValueSelector valueSelector;
|
||||
|
||||
private DoubleAnyAggregatorFactory target;
|
||||
|
||||
@Before
|
||||
public void setUp()
|
||||
{
|
||||
Mockito.doReturn(null).when(selectorFactory).getColumnCapabilities(FIELD_NAME);
|
||||
Mockito.doReturn(valueSelector).when(selectorFactory).makeValueSelector(FIELD_NAME);
|
||||
target = new DoubleAnyAggregatorFactory(NAME, FIELD_NAME);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void canVectorizeShouldReturnTrue()
|
||||
{
|
||||
Assert.assertTrue(target.canVectorize(columnInspector));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void factorizeVectorShouldReturnDoubleVectorAggregator()
|
||||
{
|
||||
VectorAggregator aggregator = target.factorizeVector(selectorFactory);
|
||||
Assert.assertNotNull(aggregator);
|
||||
Assert.assertEquals(DoubleAnyVectorAggregator.class, aggregator.getClass());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void factorizeVectorForNumericTypeShouldReturnDoubleVectorAggregator()
|
||||
{
|
||||
Mockito.doReturn(capabilities).when(selectorFactory).getColumnCapabilities(FIELD_NAME);
|
||||
Mockito.doReturn(ValueType.DOUBLE).when(capabilities).getType();
|
||||
VectorAggregator aggregator = target.factorizeVector(selectorFactory);
|
||||
Assert.assertNotNull(aggregator);
|
||||
Assert.assertEquals(DoubleAnyVectorAggregator.class, aggregator.getClass());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void factorizeVectorForStringTypeShouldReturnDoubleVectorAggregatorWithNilSelector()
|
||||
{
|
||||
Mockito.doReturn(capabilities).when(selectorFactory).getColumnCapabilities(FIELD_NAME);
|
||||
Mockito.doReturn(ValueType.STRING).when(capabilities).getType();
|
||||
VectorAggregator aggregator = target.factorizeVector(selectorFactory);
|
||||
Assert.assertNotNull(aggregator);
|
||||
Assert.assertEquals(NullHandling.defaultDoubleValue(), aggregator.get(BUFFER, POSITION));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,104 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.query.aggregation.any;
|
||||
|
||||
import org.apache.druid.segment.vector.VectorValueSelector;
|
||||
import org.apache.druid.testing.InitializedNullHandlingTest;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.junit.MockitoJUnitRunner;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
|
||||
import static org.mockito.Mockito.spy;
|
||||
|
||||
@RunWith(MockitoJUnitRunner.class)
|
||||
public class DoubleAnyVectorAggregatorTest extends InitializedNullHandlingTest
|
||||
{
|
||||
private static final int NULL_POSITION = 32;
|
||||
private static final int POSITION = 2;
|
||||
private static final double EPSILON = 1e-15;
|
||||
private static final double[] VALUES = new double[]{7.8d, 11, 23.67, 60, 123};
|
||||
|
||||
private ByteBuffer buf;
|
||||
@Mock
|
||||
private VectorValueSelector selector;
|
||||
|
||||
private DoubleAnyVectorAggregator target;
|
||||
|
||||
@Before
|
||||
public void setUp()
|
||||
{
|
||||
byte[] randomBytes = new byte[128];
|
||||
ThreadLocalRandom.current().nextBytes(randomBytes);
|
||||
buf = ByteBuffer.wrap(randomBytes);
|
||||
Mockito.doReturn(VALUES).when(selector).getDoubleVector();
|
||||
|
||||
target = spy(new DoubleAnyVectorAggregator(selector));
|
||||
Mockito.when(target.isValueNull(buf, NULL_POSITION)).thenReturn(true);
|
||||
Mockito.when(target.isValueNull(buf, POSITION)).thenReturn(false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void initValueShouldInitZero()
|
||||
{
|
||||
target.initValue(buf, POSITION);
|
||||
Assert.assertEquals(0, buf.getDouble(POSITION), EPSILON);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void getAtPositionIsNullShouldReturnNull()
|
||||
{
|
||||
Assert.assertNull(target.get(buf, NULL_POSITION));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void getAtPositionShouldReturnValue()
|
||||
{
|
||||
buf.putDouble(POSITION + 1, VALUES[3]);
|
||||
Assert.assertEquals(VALUES[3], (double) target.get(buf, POSITION), EPSILON);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void putValueShouldAddToBuffer()
|
||||
{
|
||||
Assert.assertTrue(target.putAnyValueFromRow(buf, POSITION, 2, 3));
|
||||
Assert.assertEquals(VALUES[2], buf.getDouble(POSITION), EPSILON);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void putValueStartAfterEndShouldNotAddToBuffer()
|
||||
{
|
||||
Assert.assertFalse(target.putAnyValueFromRow(buf, POSITION, 2, 2));
|
||||
Assert.assertNotEquals(VALUES[2], buf.getDouble(POSITION), EPSILON);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void putValueStartOutsideRangeShouldNotAddToBuffer()
|
||||
{
|
||||
Assert.assertFalse(target.putAnyValueFromRow(buf, POSITION, 5, 6));
|
||||
Assert.assertNotEquals(VALUES[2], buf.getDouble(POSITION), EPSILON);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,101 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.query.aggregation.any;
|
||||
|
||||
import org.apache.druid.common.config.NullHandling;
|
||||
import org.apache.druid.query.aggregation.VectorAggregator;
|
||||
import org.apache.druid.segment.ColumnInspector;
|
||||
import org.apache.druid.segment.column.ColumnCapabilities;
|
||||
import org.apache.druid.segment.column.ValueType;
|
||||
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
|
||||
import org.apache.druid.segment.vector.VectorValueSelector;
|
||||
import org.apache.druid.testing.InitializedNullHandlingTest;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.mockito.Answers;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.junit.MockitoJUnitRunner;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
@RunWith(MockitoJUnitRunner.class)
|
||||
public class FloatAnyAggregatorFactoryTest extends InitializedNullHandlingTest
|
||||
{
|
||||
private static final String NAME = "NAME";
|
||||
private static final String FIELD_NAME = "FIELD_NAME";
|
||||
private static final int POSITION = 2;
|
||||
private static final ByteBuffer BUFFER = ByteBuffer.allocate(128);
|
||||
|
||||
@Mock
|
||||
private ColumnCapabilities capabilities;
|
||||
@Mock
|
||||
private ColumnInspector columnInspector;
|
||||
@Mock(answer = Answers.RETURNS_DEEP_STUBS)
|
||||
private VectorColumnSelectorFactory selectorFactory;
|
||||
@Mock
|
||||
private VectorValueSelector valueSelector;
|
||||
|
||||
private FloatAnyAggregatorFactory target;
|
||||
|
||||
@Before
|
||||
public void setUp()
|
||||
{
|
||||
Mockito.doReturn(null).when(selectorFactory).getColumnCapabilities(FIELD_NAME);
|
||||
Mockito.doReturn(valueSelector).when(selectorFactory).makeValueSelector(FIELD_NAME);
|
||||
target = new FloatAnyAggregatorFactory(NAME, FIELD_NAME);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void canVectorizeShouldReturnTrue()
|
||||
{
|
||||
Assert.assertTrue(target.canVectorize(columnInspector));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void factorizeVectorShouldReturnFloatVectorAggregator()
|
||||
{
|
||||
VectorAggregator aggregator = target.factorizeVector(selectorFactory);
|
||||
Assert.assertNotNull(aggregator);
|
||||
Assert.assertEquals(FloatAnyVectorAggregator.class, aggregator.getClass());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void factorizeVectorForNumericTypeShouldReturnFloatVectorAggregator()
|
||||
{
|
||||
Mockito.doReturn(capabilities).when(selectorFactory).getColumnCapabilities(FIELD_NAME);
|
||||
Mockito.doReturn(ValueType.FLOAT).when(capabilities).getType();
|
||||
VectorAggregator aggregator = target.factorizeVector(selectorFactory);
|
||||
Assert.assertNotNull(aggregator);
|
||||
Assert.assertEquals(FloatAnyVectorAggregator.class, aggregator.getClass());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void factorizeVectorForStringTypeShouldReturnFloatVectorAggregatorWithNilSelector()
|
||||
{
|
||||
Mockito.doReturn(capabilities).when(selectorFactory).getColumnCapabilities(FIELD_NAME);
|
||||
Mockito.doReturn(ValueType.STRING).when(capabilities).getType();
|
||||
VectorAggregator aggregator = target.factorizeVector(selectorFactory);
|
||||
Assert.assertNotNull(aggregator);
|
||||
Assert.assertEquals(NullHandling.defaultFloatValue(), aggregator.get(BUFFER, POSITION));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,104 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.query.aggregation.any;
|
||||
|
||||
import org.apache.druid.segment.vector.VectorValueSelector;
|
||||
import org.apache.druid.testing.InitializedNullHandlingTest;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.junit.MockitoJUnitRunner;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
|
||||
import static org.mockito.Mockito.spy;
|
||||
|
||||
@RunWith(MockitoJUnitRunner.class)
|
||||
public class FloatAnyVectorAggregatorTest extends InitializedNullHandlingTest
|
||||
{
|
||||
private static final int NULL_POSITION = 32;
|
||||
private static final int POSITION = 2;
|
||||
private static final double EPSILON = 1e-15;
|
||||
private static final float[] VALUES = new float[]{7.8f, 11, 23.67f, 60, 123};
|
||||
|
||||
private ByteBuffer buf;
|
||||
@Mock
|
||||
private VectorValueSelector selector;
|
||||
|
||||
private FloatAnyVectorAggregator target;
|
||||
|
||||
@Before
|
||||
public void setUp()
|
||||
{
|
||||
byte[] randomBytes = new byte[128];
|
||||
ThreadLocalRandom.current().nextBytes(randomBytes);
|
||||
buf = ByteBuffer.wrap(randomBytes);
|
||||
Mockito.doReturn(VALUES).when(selector).getFloatVector();
|
||||
|
||||
target = spy(new FloatAnyVectorAggregator(selector));
|
||||
Mockito.when(target.isValueNull(buf, NULL_POSITION)).thenReturn(true);
|
||||
Mockito.when(target.isValueNull(buf, POSITION)).thenReturn(false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void initValueShouldInitZero()
|
||||
{
|
||||
target.initValue(buf, POSITION);
|
||||
Assert.assertEquals(0, buf.getFloat(POSITION), EPSILON);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void getAtPositionIsNullShouldReturnNull()
|
||||
{
|
||||
Assert.assertNull(target.get(buf, NULL_POSITION));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void getAtPositionShouldReturnValue()
|
||||
{
|
||||
buf.putFloat(POSITION + 1, VALUES[3]);
|
||||
Assert.assertEquals(VALUES[3], (float) target.get(buf, POSITION), EPSILON);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void putValueShouldAddToBuffer()
|
||||
{
|
||||
Assert.assertTrue(target.putAnyValueFromRow(buf, POSITION, 2, 3));
|
||||
Assert.assertEquals(VALUES[2], buf.getFloat(POSITION), EPSILON);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void putValueStartAfterEndShouldNotAddToBuffer()
|
||||
{
|
||||
Assert.assertFalse(target.putAnyValueFromRow(buf, POSITION, 2, 2));
|
||||
Assert.assertNotEquals(VALUES[2], buf.getFloat(POSITION), EPSILON);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void putValueStartOutsideRangeShouldNotAddToBuffer()
|
||||
{
|
||||
Assert.assertFalse(target.putAnyValueFromRow(buf, POSITION, 5, 6));
|
||||
Assert.assertNotEquals(VALUES[2], buf.getFloat(POSITION), EPSILON);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,101 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.query.aggregation.any;
|
||||
|
||||
import org.apache.druid.common.config.NullHandling;
|
||||
import org.apache.druid.query.aggregation.VectorAggregator;
|
||||
import org.apache.druid.segment.ColumnInspector;
|
||||
import org.apache.druid.segment.column.ColumnCapabilities;
|
||||
import org.apache.druid.segment.column.ValueType;
|
||||
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
|
||||
import org.apache.druid.segment.vector.VectorValueSelector;
|
||||
import org.apache.druid.testing.InitializedNullHandlingTest;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.mockito.Answers;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.junit.MockitoJUnitRunner;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
@RunWith(MockitoJUnitRunner.class)
|
||||
public class LongAnyAggregatorFactoryTest extends InitializedNullHandlingTest
|
||||
{
|
||||
private static final String NAME = "NAME";
|
||||
private static final String FIELD_NAME = "FIELD_NAME";
|
||||
private static final int POSITION = 2;
|
||||
private static final ByteBuffer BUFFER = ByteBuffer.allocate(128);
|
||||
|
||||
@Mock
|
||||
private ColumnCapabilities capabilities;
|
||||
@Mock
|
||||
private ColumnInspector columnInspector;
|
||||
@Mock(answer = Answers.RETURNS_DEEP_STUBS)
|
||||
private VectorColumnSelectorFactory selectorFactory;
|
||||
@Mock
|
||||
private VectorValueSelector valueSelector;
|
||||
|
||||
private LongAnyAggregatorFactory target;
|
||||
|
||||
@Before
|
||||
public void setUp()
|
||||
{
|
||||
Mockito.doReturn(null).when(selectorFactory).getColumnCapabilities(FIELD_NAME);
|
||||
Mockito.doReturn(valueSelector).when(selectorFactory).makeValueSelector(FIELD_NAME);
|
||||
target = new LongAnyAggregatorFactory(NAME, FIELD_NAME);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void canVectorizeShouldReturnTrue()
|
||||
{
|
||||
Assert.assertTrue(target.canVectorize(columnInspector));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void factorizeVectorShouldReturnLongVectorAggregator()
|
||||
{
|
||||
VectorAggregator aggregator = target.factorizeVector(selectorFactory);
|
||||
Assert.assertNotNull(aggregator);
|
||||
Assert.assertEquals(LongAnyVectorAggregator.class, aggregator.getClass());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void factorizeVectorWithNumericColumnShouldReturnLongVectorAggregator()
|
||||
{
|
||||
Mockito.doReturn(capabilities).when(selectorFactory).getColumnCapabilities(FIELD_NAME);
|
||||
Mockito.doReturn(ValueType.LONG).when(capabilities).getType();
|
||||
VectorAggregator aggregator = target.factorizeVector(selectorFactory);
|
||||
Assert.assertNotNull(aggregator);
|
||||
Assert.assertEquals(LongAnyVectorAggregator.class, aggregator.getClass());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void factorizeVectorForStringTypeShouldReturnLongVectorAggregatorWithNilSelector()
|
||||
{
|
||||
Mockito.doReturn(capabilities).when(selectorFactory).getColumnCapabilities(FIELD_NAME);
|
||||
Mockito.doReturn(ValueType.STRING).when(capabilities).getType();
|
||||
VectorAggregator aggregator = target.factorizeVector(selectorFactory);
|
||||
Assert.assertNotNull(aggregator);
|
||||
Assert.assertEquals(NullHandling.defaultLongValue(), aggregator.get(BUFFER, POSITION));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,103 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.query.aggregation.any;
|
||||
|
||||
import org.apache.druid.segment.vector.VectorValueSelector;
|
||||
import org.apache.druid.testing.InitializedNullHandlingTest;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.junit.MockitoJUnitRunner;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
|
||||
import static org.mockito.Mockito.spy;
|
||||
|
||||
@RunWith(MockitoJUnitRunner.class)
|
||||
public class LongAnyVectorAggregatorTest extends InitializedNullHandlingTest
|
||||
{
|
||||
private static final int NULL_POSITION = 32;
|
||||
private static final int POSITION = 2;
|
||||
private static final long[] VALUES = new long[]{7L, 11, -892587293, 60, 123};
|
||||
|
||||
private ByteBuffer buf;
|
||||
@Mock
|
||||
private VectorValueSelector selector;
|
||||
|
||||
private LongAnyVectorAggregator target;
|
||||
|
||||
@Before
|
||||
public void setUp()
|
||||
{
|
||||
byte[] randomBytes = new byte[128];
|
||||
ThreadLocalRandom.current().nextBytes(randomBytes);
|
||||
buf = ByteBuffer.wrap(randomBytes);
|
||||
Mockito.doReturn(VALUES).when(selector).getLongVector();
|
||||
|
||||
target = spy(new LongAnyVectorAggregator(selector));
|
||||
Mockito.when(target.isValueNull(buf, NULL_POSITION)).thenReturn(true);
|
||||
Mockito.when(target.isValueNull(buf, POSITION)).thenReturn(false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void initValueShouldInitZero()
|
||||
{
|
||||
target.initValue(buf, POSITION);
|
||||
Assert.assertEquals(0, buf.getLong(POSITION));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void getAtPositionIsNullShouldReturnNull()
|
||||
{
|
||||
Assert.assertNull(target.get(buf, NULL_POSITION));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void getAtPositionShouldReturnValue()
|
||||
{
|
||||
buf.putLong(POSITION + 1, VALUES[3]);
|
||||
Assert.assertEquals(VALUES[3], (long) target.get(buf, POSITION));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void putValueShouldAddToBuffer()
|
||||
{
|
||||
Assert.assertTrue(target.putAnyValueFromRow(buf, POSITION, 2, 3));
|
||||
Assert.assertEquals(VALUES[2], buf.getLong(POSITION));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void putValueStartAfterEndShouldNotAddToBuffer()
|
||||
{
|
||||
Assert.assertFalse(target.putAnyValueFromRow(buf, POSITION, 2, 2));
|
||||
Assert.assertNotEquals(VALUES[2], buf.getLong(POSITION));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void putValueStartOutsideRangeShouldNotAddToBuffer()
|
||||
{
|
||||
Assert.assertFalse(target.putAnyValueFromRow(buf, POSITION, 5, 6));
|
||||
Assert.assertNotEquals(VALUES[2], buf.getLong(POSITION));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,210 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.query.aggregation.any;
|
||||
|
||||
import org.apache.druid.common.config.NullHandling;
|
||||
import org.apache.druid.segment.vector.VectorValueSelector;
|
||||
import org.apache.druid.testing.InitializedNullHandlingTest;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.junit.MockitoJUnitRunner;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
|
||||
import static org.apache.druid.query.aggregation.any.NumericAnyVectorAggregator.BYTE_FLAG_FOUND_MASK;
|
||||
|
||||
@RunWith(MockitoJUnitRunner.class)
|
||||
public class NumericAnyVectorAggregatorTest extends InitializedNullHandlingTest
|
||||
{
|
||||
private static final int NULL_POSITION = 10;
|
||||
private static final int BUFFER_SIZE = 128;
|
||||
private static final long FOUND_OBJECT = 23;
|
||||
private static final int POSITION = 2;
|
||||
private static final boolean[] NULLS = new boolean[] {false, false, true, false};
|
||||
|
||||
private ByteBuffer buf;
|
||||
@Mock
|
||||
private VectorValueSelector selector;
|
||||
|
||||
private NumericAnyVectorAggregator target;
|
||||
|
||||
@Before
|
||||
public void setUp()
|
||||
{
|
||||
Mockito.doReturn(NULLS).when(selector).getNullVector();
|
||||
target = Mockito.spy(new NumericAnyVectorAggregator(selector)
|
||||
{
|
||||
@Override
|
||||
void initValue(ByteBuffer buf, int position)
|
||||
{
|
||||
/* Do nothing. */
|
||||
}
|
||||
|
||||
@Override
|
||||
boolean putAnyValueFromRow(ByteBuffer buf, int position, int startRow, int endRow)
|
||||
{
|
||||
boolean isRowsWithinIndex = startRow < endRow && startRow < NULLS.length;
|
||||
if (isRowsWithinIndex) {
|
||||
buf.putLong(position, startRow);
|
||||
}
|
||||
return isRowsWithinIndex;
|
||||
}
|
||||
|
||||
@Override
|
||||
Object getNonNullObject(ByteBuffer buf, int position)
|
||||
{
|
||||
if (position == POSITION + 1) {
|
||||
return FOUND_OBJECT;
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
|
||||
});
|
||||
byte[] randomBuffer = new byte[BUFFER_SIZE];
|
||||
ThreadLocalRandom.current().nextBytes(randomBuffer);
|
||||
buf = ByteBuffer.wrap(randomBuffer);
|
||||
clearBufferForPositions(0, POSITION);
|
||||
buf.put(NULL_POSITION, (byte) 0x01);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void initShouldSetDoubleAfterPositionToZero()
|
||||
{
|
||||
target.init(buf, POSITION);
|
||||
Assert.assertEquals(0, buf.get(POSITION) & BYTE_FLAG_FOUND_MASK);
|
||||
Assert.assertEquals(
|
||||
NullHandling.sqlCompatible() ? NullHandling.IS_NULL_BYTE : NullHandling.IS_NOT_NULL_BYTE,
|
||||
buf.get(POSITION)
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void aggregateNotFoundAndHasNullsShouldPutNull()
|
||||
{
|
||||
target.aggregate(buf, POSITION, 0, 3);
|
||||
if (NullHandling.sqlCompatible()) {
|
||||
Assert.assertEquals(BYTE_FLAG_FOUND_MASK | NullHandling.IS_NULL_BYTE, buf.get(POSITION));
|
||||
} else {
|
||||
Assert.assertEquals(BYTE_FLAG_FOUND_MASK | NullHandling.IS_NOT_NULL_BYTE, buf.get(POSITION));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void aggregateNotFoundAndHasNullsOutsideRangeShouldPutValue()
|
||||
{
|
||||
target.aggregate(buf, POSITION, 0, 1);
|
||||
Assert.assertEquals(BYTE_FLAG_FOUND_MASK | NullHandling.IS_NOT_NULL_BYTE, buf.get(POSITION));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void aggregateNotFoundAndNoNullsShouldPutValue()
|
||||
{
|
||||
Mockito.doReturn(null).when(selector).getNullVector();
|
||||
target.aggregate(buf, POSITION, 0, 3);
|
||||
Assert.assertEquals(BYTE_FLAG_FOUND_MASK | NullHandling.IS_NOT_NULL_BYTE, buf.get(POSITION));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void aggregateNotFoundNoNullsAndValuesOutsideRangeShouldNotPutValue()
|
||||
{
|
||||
Mockito.doReturn(null).when(selector).getNullVector();
|
||||
target.aggregate(buf, POSITION, NULLS.length, NULLS.length + 1);
|
||||
Assert.assertNotEquals(BYTE_FLAG_FOUND_MASK, (buf.get(POSITION) & BYTE_FLAG_FOUND_MASK));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void aggregateBatchNoRowsShouldAggregateAllRows()
|
||||
{
|
||||
Mockito.doReturn(null).when(selector).getNullVector();
|
||||
int[] positions = new int[] {0, 43, 70};
|
||||
int positionOffset = 2;
|
||||
clearBufferForPositions(positionOffset, positions);
|
||||
target.aggregate(buf, 3, positions, null, positionOffset);
|
||||
for (int i = 0; i < positions.length; i++) {
|
||||
int position = positions[i] + positionOffset;
|
||||
Assert.assertEquals(
|
||||
BYTE_FLAG_FOUND_MASK | NullHandling.IS_NOT_NULL_BYTE,
|
||||
buf.get(position) & (byte) (BYTE_FLAG_FOUND_MASK | NullHandling.IS_NOT_NULL_BYTE)
|
||||
);
|
||||
Assert.assertEquals(i, buf.getLong(position + 1));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void aggregateBatchWithRowsShouldAggregateAllRows()
|
||||
{
|
||||
Mockito.doReturn(null).when(selector).getNullVector();
|
||||
int[] positions = new int[] {0, 43, 70};
|
||||
int positionOffset = 2;
|
||||
clearBufferForPositions(positionOffset, positions);
|
||||
int[] rows = new int[] {3, 2, 0};
|
||||
target.aggregate(buf, 3, positions, rows, positionOffset);
|
||||
for (int i = 0; i < positions.length; i++) {
|
||||
int position = positions[i] + positionOffset;
|
||||
Assert.assertEquals(
|
||||
BYTE_FLAG_FOUND_MASK | NullHandling.IS_NOT_NULL_BYTE,
|
||||
buf.get(position) & (byte) (BYTE_FLAG_FOUND_MASK | NullHandling.IS_NOT_NULL_BYTE)
|
||||
);
|
||||
Assert.assertEquals(rows[i], buf.getLong(position + 1));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void aggregateFoundShouldDoNothing()
|
||||
{
|
||||
long previous = buf.getLong(POSITION + 1);
|
||||
buf.put(POSITION, BYTE_FLAG_FOUND_MASK);
|
||||
target.aggregate(buf, POSITION, 0, 3);
|
||||
Assert.assertEquals(previous, buf.getLong(POSITION + 1));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void getNullShouldReturnNull()
|
||||
{
|
||||
Assert.assertNull(target.get(buf, NULL_POSITION));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void getNotNullShouldReturnValue()
|
||||
{
|
||||
Assert.assertEquals(FOUND_OBJECT, target.get(buf, POSITION));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void isValueNull()
|
||||
{
|
||||
buf.put(POSITION, (byte) 4);
|
||||
Assert.assertFalse(target.isValueNull(buf, POSITION));
|
||||
buf.put(POSITION, (byte) 3);
|
||||
Assert.assertTrue(target.isValueNull(buf, POSITION));
|
||||
}
|
||||
|
||||
private void clearBufferForPositions(int offset, int... positions)
|
||||
{
|
||||
for (int position : positions) {
|
||||
buf.put(position + offset, (byte) 0);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,110 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.query.aggregation.any;
|
||||
|
||||
import org.apache.druid.segment.ColumnInspector;
|
||||
import org.apache.druid.segment.column.ColumnCapabilities;
|
||||
import org.apache.druid.segment.vector.MultiValueDimensionVectorSelector;
|
||||
import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector;
|
||||
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
|
||||
import org.apache.druid.testing.InitializedNullHandlingTest;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.junit.MockitoJUnitRunner;
|
||||
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
|
||||
@RunWith(MockitoJUnitRunner.class)
|
||||
public class StringAnyAggregatorFactoryTest extends InitializedNullHandlingTest
|
||||
{
|
||||
private static final String NAME = "NAME";
|
||||
private static final String FIELD_NAME = "FIELD_NAME";
|
||||
private static final int MAX_STRING_BYTES = 10;
|
||||
|
||||
@Mock
|
||||
private ColumnInspector columnInspector;
|
||||
@Mock
|
||||
private ColumnCapabilities capabilities;
|
||||
@Mock
|
||||
private VectorColumnSelectorFactory vectorSelectorFactory;
|
||||
@Mock
|
||||
private SingleValueDimensionVectorSelector singleValueDimensionVectorSelector;
|
||||
@Mock
|
||||
private MultiValueDimensionVectorSelector multiValueDimensionVectorSelector;
|
||||
|
||||
private StringAnyAggregatorFactory target;
|
||||
|
||||
@Before
|
||||
public void setUp()
|
||||
{
|
||||
Mockito.doReturn(capabilities).when(vectorSelectorFactory).getColumnCapabilities(FIELD_NAME);
|
||||
Mockito.doReturn(ColumnCapabilities.Capable.UNKNOWN).when(capabilities).hasMultipleValues();
|
||||
target = new StringAnyAggregatorFactory(NAME, FIELD_NAME, MAX_STRING_BYTES);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void canVectorizeWithoutCapabilitiesShouldReturnTrue()
|
||||
{
|
||||
Assert.assertTrue(target.canVectorize(columnInspector));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void factorizeVectorWithoutCapabilitiesShouldReturnAggregatorWithMultiDimensionSelector()
|
||||
{
|
||||
Mockito.doReturn(null).when(vectorSelectorFactory).getColumnCapabilities(FIELD_NAME);
|
||||
Mockito.doReturn(multiValueDimensionVectorSelector)
|
||||
.when(vectorSelectorFactory).makeMultiValueDimensionSelector(any());
|
||||
StringAnyVectorAggregator aggregator = target.factorizeVector(vectorSelectorFactory);
|
||||
Assert.assertNotNull(aggregator);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void factorizeVectorWithUnknownCapabilitiesShouldReturnAggregatorWithMultiDimensionSelector()
|
||||
{
|
||||
Mockito.doReturn(multiValueDimensionVectorSelector)
|
||||
.when(vectorSelectorFactory).makeMultiValueDimensionSelector(any());
|
||||
StringAnyVectorAggregator aggregator = target.factorizeVector(vectorSelectorFactory);
|
||||
Assert.assertNotNull(aggregator);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void factorizeVectorWithMultipleValuesCapabilitiesShouldReturnAggregatorWithMultiDimensionSelector()
|
||||
{
|
||||
Mockito.doReturn(ColumnCapabilities.Capable.TRUE).when(capabilities).hasMultipleValues();
|
||||
Mockito.doReturn(multiValueDimensionVectorSelector)
|
||||
.when(vectorSelectorFactory).makeMultiValueDimensionSelector(any());
|
||||
StringAnyVectorAggregator aggregator = target.factorizeVector(vectorSelectorFactory);
|
||||
Assert.assertNotNull(aggregator);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void factorizeVectorWithoutMultipleValuesCapabilitiesShouldReturnAggregatorWithSingleDimensionSelector()
|
||||
{
|
||||
Mockito.doReturn(ColumnCapabilities.Capable.FALSE).when(capabilities).hasMultipleValues();
|
||||
Mockito.doReturn(singleValueDimensionVectorSelector)
|
||||
.when(vectorSelectorFactory).makeSingleValueDimensionSelector(any());
|
||||
StringAnyVectorAggregator aggregator = target.factorizeVector(vectorSelectorFactory);
|
||||
Assert.assertNotNull(aggregator);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,189 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.query.aggregation.any;
|
||||
|
||||
import org.apache.druid.segment.data.ArrayBasedIndexedInts;
|
||||
import org.apache.druid.segment.data.IndexedInts;
|
||||
import org.apache.druid.segment.vector.MultiValueDimensionVectorSelector;
|
||||
import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector;
|
||||
import org.apache.druid.testing.InitializedNullHandlingTest;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.junit.MockitoJUnitRunner;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
|
||||
import static org.apache.druid.query.aggregation.any.StringAnyVectorAggregator.NOT_FOUND_FLAG_VALUE;
|
||||
import static org.mockito.ArgumentMatchers.anyInt;
|
||||
|
||||
@RunWith(MockitoJUnitRunner.class)
|
||||
public class StringAnyVectorAggregatorTest extends InitializedNullHandlingTest
|
||||
{
|
||||
private static final int MAX_STRING_BYTES = 32;
|
||||
private static final int BUFFER_SIZE = 1024;
|
||||
private static final int POSITION = 2;
|
||||
private static final IndexedInts[] MULTI_VALUE_ROWS = new IndexedInts[]{
|
||||
new ArrayBasedIndexedInts(new int[]{1, 0}),
|
||||
new ArrayBasedIndexedInts(new int[]{1}),
|
||||
new ArrayBasedIndexedInts(),
|
||||
new ArrayBasedIndexedInts(new int[]{2})
|
||||
};
|
||||
private static final int[] SINGLE_VALUE_ROWS = new int[]{1, 1, 3, 2};
|
||||
private static final String[] DICTIONARY = new String[]{"Zero", "One", "TwoThisStringIsLongerThanThirtyTwoBytes"};
|
||||
|
||||
private ByteBuffer buf;
|
||||
@Mock
|
||||
private SingleValueDimensionVectorSelector singleValueSelector;
|
||||
@Mock
|
||||
private MultiValueDimensionVectorSelector multiValueSelector;
|
||||
|
||||
private StringAnyVectorAggregator singleValueTarget;
|
||||
private StringAnyVectorAggregator multiValueTarget;
|
||||
|
||||
@Before
|
||||
public void setUp()
|
||||
{
|
||||
Mockito.doReturn(MULTI_VALUE_ROWS).when(multiValueSelector).getRowVector();
|
||||
Mockito.doAnswer(invocation -> DICTIONARY[(int) invocation.getArgument(0)])
|
||||
.when(multiValueSelector).lookupName(anyInt());
|
||||
Mockito.doReturn(SINGLE_VALUE_ROWS).when(singleValueSelector).getRowVector();
|
||||
Mockito.doAnswer(invocation -> {
|
||||
int index = invocation.getArgument(0);
|
||||
return index >= DICTIONARY.length ? null : DICTIONARY[index];
|
||||
}).when(singleValueSelector).lookupName(anyInt());
|
||||
initializeRandomBuffer();
|
||||
singleValueTarget = new StringAnyVectorAggregator(singleValueSelector, null, MAX_STRING_BYTES);
|
||||
multiValueTarget = new StringAnyVectorAggregator(null, multiValueSelector, MAX_STRING_BYTES);
|
||||
}
|
||||
|
||||
@Test(expected = IllegalStateException.class)
|
||||
public void initWithBothSingleAndMultiValueSelectorShouldThrowException()
|
||||
{
|
||||
new StringAnyVectorAggregator(singleValueSelector, multiValueSelector, MAX_STRING_BYTES);
|
||||
}
|
||||
|
||||
@Test(expected = IllegalStateException.class)
|
||||
public void initWithNeitherSingleNorMultiValueSelectorShouldThrowException()
|
||||
{
|
||||
new StringAnyVectorAggregator(null, null, MAX_STRING_BYTES);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void initSingleValueTargetShouldMarkPositionAsNotFound()
|
||||
{
|
||||
singleValueTarget.init(buf, POSITION + 1);
|
||||
Assert.assertEquals(NOT_FOUND_FLAG_VALUE, buf.getInt(POSITION + 1));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void initMultiValueTargetShouldMarkPositionAsNotFound()
|
||||
{
|
||||
multiValueTarget.init(buf, POSITION + 1);
|
||||
Assert.assertEquals(NOT_FOUND_FLAG_VALUE, buf.getInt(POSITION + 1));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void aggregatePositionNotFoundShouldPutFirstValue()
|
||||
{
|
||||
singleValueTarget.aggregate(buf, POSITION, 0, 2);
|
||||
Assert.assertEquals(DICTIONARY[1], singleValueTarget.get(buf, POSITION));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void aggregateEmptyShouldPutNull()
|
||||
{
|
||||
singleValueTarget.aggregate(buf, POSITION, 2, 3);
|
||||
Assert.assertNull(singleValueTarget.get(buf, POSITION));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void aggregateMultiValuePositionNotFoundShouldPutFirstValue()
|
||||
{
|
||||
multiValueTarget.aggregate(buf, POSITION, 0, 2);
|
||||
Assert.assertEquals(DICTIONARY[1], multiValueTarget.get(buf, POSITION));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void aggregateMultiValueEmptyShouldPutNull()
|
||||
{
|
||||
multiValueTarget.aggregate(buf, POSITION, 2, 3);
|
||||
Assert.assertNull(multiValueTarget.get(buf, POSITION));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void aggregateValueLongerThanLimitShouldPutTruncatedValue()
|
||||
{
|
||||
singleValueTarget.aggregate(buf, POSITION, 3, 4);
|
||||
Assert.assertEquals(DICTIONARY[2].substring(0, 32), singleValueTarget.get(buf, POSITION));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void aggregateBatchNoRowsShouldAggregateAllRows()
|
||||
{
|
||||
int[] positions = new int[] {0, 43, 100};
|
||||
int positionOffset = 2;
|
||||
clearBufferForPositions(positionOffset, positions);
|
||||
singleValueTarget.aggregate(buf, 3, positions, null, positionOffset);
|
||||
for (int i = 0; i < positions.length; i++) {
|
||||
int position = positions[i] + positionOffset;
|
||||
Assert.assertEquals(singleValueSelector.lookupName(SINGLE_VALUE_ROWS[i]), singleValueTarget.get(buf, position));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void aggregateBatchWithRowsShouldAggregateAllRows()
|
||||
{
|
||||
int[] positions = new int[] {0, 43, 100};
|
||||
int positionOffset = 2;
|
||||
int[] rows = new int[] {2, 1, 0};
|
||||
clearBufferForPositions(positionOffset, positions);
|
||||
multiValueTarget.aggregate(buf, 3, positions, rows, positionOffset);
|
||||
for (int i = 0; i < positions.length; i++) {
|
||||
int position = positions[i] + positionOffset;
|
||||
int row = rows[i];
|
||||
IndexedInts rowIndex = MULTI_VALUE_ROWS[row];
|
||||
if (rowIndex.size() == 0) {
|
||||
Assert.assertNull(multiValueTarget.get(buf, position));
|
||||
} else {
|
||||
Assert.assertEquals(multiValueSelector.lookupName(rowIndex.get(0)), multiValueTarget.get(buf, position));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void initializeRandomBuffer()
|
||||
{
|
||||
byte[] randomBuffer = new byte[BUFFER_SIZE];
|
||||
ThreadLocalRandom.current().nextBytes(randomBuffer);
|
||||
buf = ByteBuffer.wrap(randomBuffer);
|
||||
clearBufferForPositions(0, POSITION);
|
||||
}
|
||||
|
||||
private void clearBufferForPositions(int offset, int... positions)
|
||||
{
|
||||
for (int position : positions) {
|
||||
buf.putInt(position + offset, NOT_FOUND_FLAG_VALUE);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1910,7 +1910,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
|
|||
@Test
|
||||
public void testAnyAggregator() throws Exception
|
||||
{
|
||||
// Cannot vectorize ANY aggregator.
|
||||
// Cannot vectorize virtual expressions.
|
||||
skipVectorize();
|
||||
|
||||
testQuery(
|
||||
|
@ -1951,9 +1951,6 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
|
|||
@Test
|
||||
public void testAnyAggregatorsOnHeapNumericNulls() throws Exception
|
||||
{
|
||||
// Cannot vectorize ANY aggregator.
|
||||
skipVectorize();
|
||||
|
||||
testQuery(
|
||||
"SELECT ANY_VALUE(l1), ANY_VALUE(d1), ANY_VALUE(f1) FROM druid.numfoo",
|
||||
ImmutableList.of(
|
||||
|
@ -1982,8 +1979,6 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
|
|||
@Test
|
||||
public void testAnyAggregatorsOffHeapNumericNulls() throws Exception
|
||||
{
|
||||
// Cannot vectorize ANY aggregator.
|
||||
skipVectorize();
|
||||
testQuery(
|
||||
"SELECT ANY_VALUE(l1), ANY_VALUE(d1), ANY_VALUE(f1) FROM druid.numfoo GROUP BY dim2",
|
||||
ImmutableList.of(
|
||||
|
@ -2203,9 +2198,12 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
|
|||
@Test
|
||||
public void testPrimitiveAnyInSubquery() throws Exception
|
||||
{
|
||||
// Cannot vectorize ANY aggregator.
|
||||
skipVectorize();
|
||||
|
||||
// The grouping works like this
|
||||
// dim2 -> m1 | m2
|
||||
// a -> [1,4] | [1,4]
|
||||
// null -> [2,3,6] | [2,3,6]
|
||||
// abc -> [5] | [5]
|
||||
// So the acceptable response can be any combination of these values
|
||||
testQuery(
|
||||
"SELECT SUM(val1), SUM(val2), SUM(val3) FROM (SELECT dim2, ANY_VALUE(m1) AS val1, ANY_VALUE(cnt) AS val2, ANY_VALUE(m2) AS val3 FROM foo GROUP BY dim2)",
|
||||
ImmutableList.of(
|
||||
|
@ -2251,9 +2249,6 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
|
|||
@Test
|
||||
public void testStringAnyInSubquery() throws Exception
|
||||
{
|
||||
// Cannot vectorize ANY aggregator.
|
||||
skipVectorize();
|
||||
|
||||
testQuery(
|
||||
"SELECT SUM(val) FROM (SELECT dim2, ANY_VALUE(dim1, 10) AS val FROM foo GROUP BY dim2)",
|
||||
ImmutableList.of(
|
||||
|
@ -2408,9 +2403,6 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
|
|||
@Test
|
||||
public void testAnyAggregatorsDoesNotSkipNulls() throws Exception
|
||||
{
|
||||
// Cannot vectorize ANY aggregator.
|
||||
skipVectorize();
|
||||
|
||||
testQuery(
|
||||
"SELECT ANY_VALUE(dim1, 32), ANY_VALUE(l2), ANY_VALUE(d2), ANY_VALUE(f2) FROM druid.numfoo",
|
||||
ImmutableList.of(
|
||||
|
@ -2439,9 +2431,6 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
|
|||
@Test
|
||||
public void testAnyAggregatorsSkipNullsWithFilter() throws Exception
|
||||
{
|
||||
// Cannot vectorize ANY aggregator.
|
||||
skipVectorize();
|
||||
|
||||
final DimFilter filter;
|
||||
if (useDefault) {
|
||||
filter = not(selector("dim1", null, null));
|
||||
|
@ -2767,8 +2756,6 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
|
|||
@Test
|
||||
public void testOrderByAnyFloat() throws Exception
|
||||
{
|
||||
// Cannot vectorize ANY aggregator.
|
||||
skipVectorize();
|
||||
List<Object[]> expected;
|
||||
if (NullHandling.replaceWithDefault()) {
|
||||
expected = ImmutableList.of(
|
||||
|
@ -2817,8 +2804,6 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
|
|||
@Test
|
||||
public void testOrderByAnyDouble() throws Exception
|
||||
{
|
||||
// Cannot vectorize ANY aggregator.
|
||||
skipVectorize();
|
||||
List<Object[]> expected;
|
||||
if (NullHandling.replaceWithDefault()) {
|
||||
expected = ImmutableList.of(
|
||||
|
@ -2866,8 +2851,6 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
|
|||
@Test
|
||||
public void testOrderByAnyLong() throws Exception
|
||||
{
|
||||
// Cannot vectorize ANY aggregator.
|
||||
skipVectorize();
|
||||
List<Object[]> expected;
|
||||
if (NullHandling.replaceWithDefault()) {
|
||||
expected = ImmutableList.of(
|
||||
|
|
|
@ -538,6 +538,7 @@ MeanNoNulls
|
|||
P1D
|
||||
cycleSize
|
||||
doubleMax
|
||||
doubleAny
|
||||
doubleMean
|
||||
doubleMeanNoNulls
|
||||
doubleMin
|
||||
|
@ -545,6 +546,7 @@ doubleSum
|
|||
druid.generic.useDefaultValueForNull
|
||||
limitSpec
|
||||
longMax
|
||||
longAny
|
||||
longMean
|
||||
longMeanNoNulls
|
||||
longMin
|
||||
|
@ -1107,6 +1109,7 @@ signum
|
|||
str1
|
||||
str2
|
||||
string_to_array
|
||||
stringAny
|
||||
strlen
|
||||
strpos
|
||||
timestamp_ceil
|
||||
|
@ -1672,6 +1675,7 @@ file.encoding
|
|||
fillCapacity
|
||||
first_location
|
||||
floatMax
|
||||
floatAny
|
||||
floatMin
|
||||
floatSum
|
||||
freeSpacePercent
|
||||
|
|
Loading…
Reference in New Issue