mirror of https://github.com/apache/druid.git
Vectorize the cardinality aggregator. (#11182)
* Vectorize the cardinality aggregator. Does not include a byRow implementation, so if byRow is true then the aggregator still goes through the non-vectorized path. Testing strategy: - New tests that exercise both styles of "aggregate" for supported types. - Some existing tests have also become active (note the deleted "cannotVectorize" lines). * Adjust whitespace.
This commit is contained in:
parent
ca1412d574
commit
bef7cc911f
|
@ -35,15 +35,21 @@ import org.apache.druid.query.aggregation.AggregatorUtil;
|
|||
import org.apache.druid.query.aggregation.BufferAggregator;
|
||||
import org.apache.druid.query.aggregation.NoopAggregator;
|
||||
import org.apache.druid.query.aggregation.NoopBufferAggregator;
|
||||
import org.apache.druid.query.aggregation.NoopVectorAggregator;
|
||||
import org.apache.druid.query.aggregation.VectorAggregator;
|
||||
import org.apache.druid.query.aggregation.cardinality.types.CardinalityAggregatorColumnSelectorStrategy;
|
||||
import org.apache.druid.query.aggregation.cardinality.types.CardinalityAggregatorColumnSelectorStrategyFactory;
|
||||
import org.apache.druid.query.aggregation.cardinality.vector.CardinalityVectorProcessorFactory;
|
||||
import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
|
||||
import org.apache.druid.query.cache.CacheKeyBuilder;
|
||||
import org.apache.druid.query.dimension.DefaultDimensionSpec;
|
||||
import org.apache.druid.query.dimension.DimensionSpec;
|
||||
import org.apache.druid.segment.ColumnInspector;
|
||||
import org.apache.druid.segment.ColumnProcessors;
|
||||
import org.apache.druid.segment.ColumnSelectorFactory;
|
||||
import org.apache.druid.segment.DimensionHandlerUtils;
|
||||
import org.apache.druid.segment.column.ValueType;
|
||||
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.nio.ByteBuffer;
|
||||
|
@ -146,7 +152,6 @@ public class CardinalityAggregatorFactory extends AggregatorFactory
|
|||
return new CardinalityAggregator(selectorPluses, byRow);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public BufferAggregator factorizeBuffered(ColumnSelectorFactory columnFactory)
|
||||
{
|
||||
|
@ -163,6 +168,32 @@ public class CardinalityAggregatorFactory extends AggregatorFactory
|
|||
return new CardinalityBufferAggregator(selectorPluses, byRow);
|
||||
}
|
||||
|
||||
@Override
|
||||
public VectorAggregator factorizeVector(VectorColumnSelectorFactory selectorFactory)
|
||||
{
|
||||
if (fields.isEmpty()) {
|
||||
return NoopVectorAggregator.instance();
|
||||
}
|
||||
|
||||
return new CardinalityVectorAggregator(
|
||||
fields.stream().map(
|
||||
field ->
|
||||
ColumnProcessors.makeVectorProcessor(
|
||||
field,
|
||||
CardinalityVectorProcessorFactory.INSTANCE,
|
||||
selectorFactory
|
||||
)
|
||||
).collect(Collectors.toList())
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean canVectorize(ColumnInspector columnInspector)
|
||||
{
|
||||
// !byRow because there is not yet a vector implementation.
|
||||
return !byRow && fields.stream().allMatch(DimensionSpec::canVectorize);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Comparator getComparator()
|
||||
{
|
||||
|
|
|
@ -55,10 +55,11 @@ public class CardinalityBufferAggregator implements BufferAggregator
|
|||
// Save position, limit and restore later instead of allocating a new ByteBuffer object
|
||||
final int oldPosition = buf.position();
|
||||
final int oldLimit = buf.limit();
|
||||
buf.limit(position + HyperLogLogCollector.getLatestNumBytesForDenseStorage());
|
||||
buf.position(position);
|
||||
|
||||
try {
|
||||
buf.limit(position + HyperLogLogCollector.getLatestNumBytesForDenseStorage());
|
||||
buf.position(position);
|
||||
|
||||
final HyperLogLogCollector collector = HyperLogLogCollector.makeCollector(buf);
|
||||
if (byRow) {
|
||||
CardinalityAggregator.hashRow(selectorPluses, collector);
|
||||
|
|
|
@ -0,0 +1,73 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.query.aggregation.cardinality;
|
||||
|
||||
import org.apache.druid.query.aggregation.VectorAggregator;
|
||||
import org.apache.druid.query.aggregation.cardinality.vector.CardinalityVectorProcessor;
|
||||
import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesBufferAggregator;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.List;
|
||||
|
||||
public class CardinalityVectorAggregator implements VectorAggregator
|
||||
{
|
||||
private final List<CardinalityVectorProcessor> processors;
|
||||
|
||||
CardinalityVectorAggregator(List<CardinalityVectorProcessor> processors)
|
||||
{
|
||||
this.processors = processors;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init(ByteBuffer buf, int position)
|
||||
{
|
||||
HyperUniquesBufferAggregator.doInit(buf, position);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void aggregate(ByteBuffer buf, int position, int startRow, int endRow)
|
||||
{
|
||||
for (final CardinalityVectorProcessor processor : processors) {
|
||||
processor.aggregate(buf, position, startRow, endRow);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void aggregate(ByteBuffer buf, int numRows, int[] positions, @Nullable int[] rows, int positionOffset)
|
||||
{
|
||||
for (final CardinalityVectorProcessor processor : processors) {
|
||||
processor.aggregate(buf, numRows, positions, rows, positionOffset);
|
||||
}
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
public Object get(ByteBuffer buf, int position)
|
||||
{
|
||||
return HyperUniquesBufferAggregator.doGet(buf, position);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close()
|
||||
{
|
||||
// Nothing to close.
|
||||
}
|
||||
}
|
|
@ -34,6 +34,11 @@ import org.apache.druid.segment.BaseDoubleColumnValueSelector;
|
|||
public class DoubleCardinalityAggregatorColumnSelectorStrategy
|
||||
implements CardinalityAggregatorColumnSelectorStrategy<BaseDoubleColumnValueSelector>
|
||||
{
|
||||
public static void addDoubleToCollector(final HyperLogLogCollector collector, final double n)
|
||||
{
|
||||
collector.add(CardinalityAggregator.HASH_FUNCTION.hashLong(Double.doubleToLongBits(n)).asBytes());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void hashRow(BaseDoubleColumnValueSelector selector, Hasher hasher)
|
||||
{
|
||||
|
@ -46,7 +51,7 @@ public class DoubleCardinalityAggregatorColumnSelectorStrategy
|
|||
public void hashValues(BaseDoubleColumnValueSelector selector, HyperLogLogCollector collector)
|
||||
{
|
||||
if (NullHandling.replaceWithDefault() || !selector.isNull()) {
|
||||
collector.add(CardinalityAggregator.HASH_FUNCTION.hashLong(Double.doubleToLongBits(selector.getDouble())).asBytes());
|
||||
addDoubleToCollector(collector, selector.getDouble());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -34,6 +34,11 @@ import org.apache.druid.segment.BaseFloatColumnValueSelector;
|
|||
public class FloatCardinalityAggregatorColumnSelectorStrategy
|
||||
implements CardinalityAggregatorColumnSelectorStrategy<BaseFloatColumnValueSelector>
|
||||
{
|
||||
public static void addFloatToCollector(final HyperLogLogCollector collector, final float n)
|
||||
{
|
||||
collector.add(CardinalityAggregator.HASH_FUNCTION.hashInt(Float.floatToIntBits(n)).asBytes());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void hashRow(BaseFloatColumnValueSelector selector, Hasher hasher)
|
||||
{
|
||||
|
@ -46,7 +51,7 @@ public class FloatCardinalityAggregatorColumnSelectorStrategy
|
|||
public void hashValues(BaseFloatColumnValueSelector selector, HyperLogLogCollector collector)
|
||||
{
|
||||
if (NullHandling.replaceWithDefault() || !selector.isNull()) {
|
||||
collector.add(CardinalityAggregator.HASH_FUNCTION.hashInt(Float.floatToIntBits(selector.getFloat())).asBytes());
|
||||
addFloatToCollector(collector, selector.getFloat());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -34,6 +34,11 @@ import org.apache.druid.segment.BaseLongColumnValueSelector;
|
|||
public class LongCardinalityAggregatorColumnSelectorStrategy
|
||||
implements CardinalityAggregatorColumnSelectorStrategy<BaseLongColumnValueSelector>
|
||||
{
|
||||
public static void addLongToCollector(final HyperLogLogCollector collector, final long n)
|
||||
{
|
||||
collector.add(CardinalityAggregator.HASH_FUNCTION.hashLong(n).asBytes());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void hashRow(BaseLongColumnValueSelector selector, Hasher hasher)
|
||||
{
|
||||
|
@ -46,7 +51,7 @@ public class LongCardinalityAggregatorColumnSelectorStrategy
|
|||
public void hashValues(BaseLongColumnValueSelector selector, HyperLogLogCollector collector)
|
||||
{
|
||||
if (NullHandling.replaceWithDefault() || !selector.isNull()) {
|
||||
collector.add(CardinalityAggregator.HASH_FUNCTION.hashLong(selector.getLong()).asBytes());
|
||||
addLongToCollector(collector, selector.getLong());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,6 +26,7 @@ import org.apache.druid.query.aggregation.cardinality.CardinalityAggregator;
|
|||
import org.apache.druid.segment.DimensionSelector;
|
||||
import org.apache.druid.segment.data.IndexedInts;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.util.Arrays;
|
||||
|
||||
public class StringCardinalityAggregatorColumnSelectorStrategy implements CardinalityAggregatorColumnSelectorStrategy<DimensionSelector>
|
||||
|
@ -33,6 +34,16 @@ public class StringCardinalityAggregatorColumnSelectorStrategy implements Cardin
|
|||
public static final String CARDINALITY_AGG_NULL_STRING = "\u0000";
|
||||
public static final char CARDINALITY_AGG_SEPARATOR = '\u0001';
|
||||
|
||||
public static void addStringToCollector(final HyperLogLogCollector collector, @Nullable final String s)
|
||||
{
|
||||
// SQL standard spec does not count null values,
|
||||
// Skip counting null values when we are not replacing null with default value.
|
||||
// A special value for null in case null handling is configured to use empty string for null.
|
||||
if (NullHandling.replaceWithDefault() || s != null) {
|
||||
collector.add(CardinalityAggregator.HASH_FUNCTION.hashUnencodedChars(nullToSpecial(s)).asBytes());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void hashRow(DimensionSelector dimSelector, Hasher hasher)
|
||||
{
|
||||
|
@ -80,16 +91,11 @@ public class StringCardinalityAggregatorColumnSelectorStrategy implements Cardin
|
|||
for (int i = 0, rowSize = row.size(); i < rowSize; i++) {
|
||||
int index = row.get(i);
|
||||
final String value = dimSelector.lookupName(index);
|
||||
// SQL standard spec does not count null values,
|
||||
// Skip counting null values when we are not replacing null with default value.
|
||||
// A special value for null in case null handling is configured to use empty string for null.
|
||||
if (NullHandling.replaceWithDefault() || value != null) {
|
||||
collector.add(CardinalityAggregator.HASH_FUNCTION.hashUnencodedChars(nullToSpecial(value)).asBytes());
|
||||
}
|
||||
addStringToCollector(collector, value);
|
||||
}
|
||||
}
|
||||
|
||||
private String nullToSpecial(String value)
|
||||
private static String nullToSpecial(String value)
|
||||
{
|
||||
return value == null ? CARDINALITY_AGG_NULL_STRING : value;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,41 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.query.aggregation.cardinality.vector;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
/**
|
||||
* Processor for {@link org.apache.druid.query.aggregation.cardinality.CardinalityVectorAggregator}.
|
||||
*/
|
||||
public interface CardinalityVectorProcessor
|
||||
{
|
||||
/**
|
||||
* Processor for {@link org.apache.druid.query.aggregation.VectorAggregator#aggregate(ByteBuffer, int, int, int)}
|
||||
* in byRow = false mode.
|
||||
*/
|
||||
void aggregate(ByteBuffer buf, int position, int startRow, int endRow);
|
||||
|
||||
/**
|
||||
* Processor for {@link org.apache.druid.query.aggregation.VectorAggregator#aggregate(ByteBuffer, int, int, int)}
|
||||
* in byRow = false mode.
|
||||
*/
|
||||
void aggregate(ByteBuffer buf, int numRows, int[] positions, @Nullable int[] rows, int positionOffset);
|
||||
}
|
|
@ -0,0 +1,74 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.query.aggregation.cardinality.vector;
|
||||
|
||||
import org.apache.druid.segment.VectorColumnProcessorFactory;
|
||||
import org.apache.druid.segment.column.ColumnCapabilities;
|
||||
import org.apache.druid.segment.vector.MultiValueDimensionVectorSelector;
|
||||
import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector;
|
||||
import org.apache.druid.segment.vector.VectorObjectSelector;
|
||||
import org.apache.druid.segment.vector.VectorValueSelector;
|
||||
|
||||
public class CardinalityVectorProcessorFactory implements VectorColumnProcessorFactory<CardinalityVectorProcessor>
|
||||
{
|
||||
public static final CardinalityVectorProcessorFactory INSTANCE = new CardinalityVectorProcessorFactory();
|
||||
|
||||
@Override
|
||||
public CardinalityVectorProcessor makeSingleValueDimensionProcessor(
|
||||
ColumnCapabilities capabilities,
|
||||
SingleValueDimensionVectorSelector selector
|
||||
)
|
||||
{
|
||||
return new SingleValueStringCardinalityVectorProcessor(selector);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CardinalityVectorProcessor makeMultiValueDimensionProcessor(
|
||||
ColumnCapabilities capabilities,
|
||||
MultiValueDimensionVectorSelector selector
|
||||
)
|
||||
{
|
||||
return new MultiValueStringCardinalityVectorProcessor(selector);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CardinalityVectorProcessor makeFloatProcessor(ColumnCapabilities capabilities, VectorValueSelector selector)
|
||||
{
|
||||
return new FloatCardinalityVectorProcessor(selector);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CardinalityVectorProcessor makeDoubleProcessor(ColumnCapabilities capabilities, VectorValueSelector selector)
|
||||
{
|
||||
return new DoubleCardinalityVectorProcessor(selector);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CardinalityVectorProcessor makeLongProcessor(ColumnCapabilities capabilities, VectorValueSelector selector)
|
||||
{
|
||||
return new LongCardinalityVectorProcessor(selector);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CardinalityVectorProcessor makeObjectProcessor(ColumnCapabilities capabilities, VectorObjectSelector selector)
|
||||
{
|
||||
return NilCardinalityVectorProcessor.INSTANCE;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,94 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.query.aggregation.cardinality.vector;
|
||||
|
||||
import org.apache.druid.common.config.NullHandling;
|
||||
import org.apache.druid.hll.HyperLogLogCollector;
|
||||
import org.apache.druid.query.aggregation.cardinality.types.DoubleCardinalityAggregatorColumnSelectorStrategy;
|
||||
import org.apache.druid.segment.vector.VectorValueSelector;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
public class DoubleCardinalityVectorProcessor implements CardinalityVectorProcessor
|
||||
{
|
||||
private final VectorValueSelector selector;
|
||||
|
||||
public DoubleCardinalityVectorProcessor(final VectorValueSelector selector)
|
||||
{
|
||||
this.selector = selector;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void aggregate(ByteBuffer buf, int position, int startRow, int endRow)
|
||||
{
|
||||
// Save position, limit and restore later instead of allocating a new ByteBuffer object
|
||||
final int oldPosition = buf.position();
|
||||
final int oldLimit = buf.limit();
|
||||
|
||||
try {
|
||||
final double[] vector = selector.getDoubleVector();
|
||||
final boolean[] nullVector = selector.getNullVector();
|
||||
|
||||
buf.limit(position + HyperLogLogCollector.getLatestNumBytesForDenseStorage());
|
||||
buf.position(position);
|
||||
|
||||
final HyperLogLogCollector collector = HyperLogLogCollector.makeCollector(buf);
|
||||
|
||||
for (int i = startRow; i < endRow; i++) {
|
||||
if (NullHandling.replaceWithDefault() || nullVector == null || !nullVector[i]) {
|
||||
DoubleCardinalityAggregatorColumnSelectorStrategy.addDoubleToCollector(collector, vector[i]);
|
||||
}
|
||||
}
|
||||
}
|
||||
finally {
|
||||
buf.limit(oldLimit);
|
||||
buf.position(oldPosition);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void aggregate(ByteBuffer buf, int numRows, int[] positions, @Nullable int[] rows, int positionOffset)
|
||||
{
|
||||
// Save position, limit and restore later instead of allocating a new ByteBuffer object
|
||||
final int oldPosition = buf.position();
|
||||
final int oldLimit = buf.limit();
|
||||
|
||||
try {
|
||||
final double[] vector = selector.getDoubleVector();
|
||||
final boolean[] nullVector = selector.getNullVector();
|
||||
|
||||
for (int i = 0; i < numRows; i++) {
|
||||
final int idx = rows != null ? rows[i] : i;
|
||||
if (NullHandling.replaceWithDefault() || nullVector == null || !nullVector[idx]) {
|
||||
final int position = positions[i] + positionOffset;
|
||||
buf.limit(position + HyperLogLogCollector.getLatestNumBytesForDenseStorage());
|
||||
buf.position(position);
|
||||
final HyperLogLogCollector collector = HyperLogLogCollector.makeCollector(buf);
|
||||
DoubleCardinalityAggregatorColumnSelectorStrategy.addDoubleToCollector(collector, vector[idx]);
|
||||
}
|
||||
}
|
||||
}
|
||||
finally {
|
||||
buf.limit(oldLimit);
|
||||
buf.position(oldPosition);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,94 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.query.aggregation.cardinality.vector;
|
||||
|
||||
import org.apache.druid.common.config.NullHandling;
|
||||
import org.apache.druid.hll.HyperLogLogCollector;
|
||||
import org.apache.druid.query.aggregation.cardinality.types.FloatCardinalityAggregatorColumnSelectorStrategy;
|
||||
import org.apache.druid.segment.vector.VectorValueSelector;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
public class FloatCardinalityVectorProcessor implements CardinalityVectorProcessor
|
||||
{
|
||||
private final VectorValueSelector selector;
|
||||
|
||||
public FloatCardinalityVectorProcessor(final VectorValueSelector selector)
|
||||
{
|
||||
this.selector = selector;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void aggregate(ByteBuffer buf, int position, int startRow, int endRow)
|
||||
{
|
||||
// Save position, limit and restore later instead of allocating a new ByteBuffer object
|
||||
final int oldPosition = buf.position();
|
||||
final int oldLimit = buf.limit();
|
||||
|
||||
try {
|
||||
final float[] vector = selector.getFloatVector();
|
||||
final boolean[] nullVector = selector.getNullVector();
|
||||
|
||||
buf.limit(position + HyperLogLogCollector.getLatestNumBytesForDenseStorage());
|
||||
buf.position(position);
|
||||
|
||||
final HyperLogLogCollector collector = HyperLogLogCollector.makeCollector(buf);
|
||||
|
||||
for (int i = startRow; i < endRow; i++) {
|
||||
if (NullHandling.replaceWithDefault() || nullVector == null || !nullVector[i]) {
|
||||
FloatCardinalityAggregatorColumnSelectorStrategy.addFloatToCollector(collector, vector[i]);
|
||||
}
|
||||
}
|
||||
}
|
||||
finally {
|
||||
buf.limit(oldLimit);
|
||||
buf.position(oldPosition);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void aggregate(ByteBuffer buf, int numRows, int[] positions, @Nullable int[] rows, int positionOffset)
|
||||
{
|
||||
// Save position, limit and restore later instead of allocating a new ByteBuffer object
|
||||
final int oldPosition = buf.position();
|
||||
final int oldLimit = buf.limit();
|
||||
|
||||
try {
|
||||
final float[] vector = selector.getFloatVector();
|
||||
final boolean[] nullVector = selector.getNullVector();
|
||||
|
||||
for (int i = 0; i < numRows; i++) {
|
||||
final int idx = rows != null ? rows[i] : i;
|
||||
if (NullHandling.replaceWithDefault() || nullVector == null || !nullVector[idx]) {
|
||||
final int position = positions[i] + positionOffset;
|
||||
buf.limit(position + HyperLogLogCollector.getLatestNumBytesForDenseStorage());
|
||||
buf.position(position);
|
||||
final HyperLogLogCollector collector = HyperLogLogCollector.makeCollector(buf);
|
||||
FloatCardinalityAggregatorColumnSelectorStrategy.addFloatToCollector(collector, vector[idx]);
|
||||
}
|
||||
}
|
||||
}
|
||||
finally {
|
||||
buf.limit(oldLimit);
|
||||
buf.position(oldPosition);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,94 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.query.aggregation.cardinality.vector;
|
||||
|
||||
import org.apache.druid.common.config.NullHandling;
|
||||
import org.apache.druid.hll.HyperLogLogCollector;
|
||||
import org.apache.druid.query.aggregation.cardinality.types.LongCardinalityAggregatorColumnSelectorStrategy;
|
||||
import org.apache.druid.segment.vector.VectorValueSelector;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
public class LongCardinalityVectorProcessor implements CardinalityVectorProcessor
|
||||
{
|
||||
private final VectorValueSelector selector;
|
||||
|
||||
public LongCardinalityVectorProcessor(final VectorValueSelector selector)
|
||||
{
|
||||
this.selector = selector;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void aggregate(ByteBuffer buf, int position, int startRow, int endRow)
|
||||
{
|
||||
// Save position, limit and restore later instead of allocating a new ByteBuffer object
|
||||
final int oldPosition = buf.position();
|
||||
final int oldLimit = buf.limit();
|
||||
|
||||
try {
|
||||
final long[] vector = selector.getLongVector();
|
||||
final boolean[] nullVector = selector.getNullVector();
|
||||
|
||||
buf.limit(position + HyperLogLogCollector.getLatestNumBytesForDenseStorage());
|
||||
buf.position(position);
|
||||
|
||||
final HyperLogLogCollector collector = HyperLogLogCollector.makeCollector(buf);
|
||||
|
||||
for (int i = startRow; i < endRow; i++) {
|
||||
if (NullHandling.replaceWithDefault() || nullVector == null || !nullVector[i]) {
|
||||
LongCardinalityAggregatorColumnSelectorStrategy.addLongToCollector(collector, vector[i]);
|
||||
}
|
||||
}
|
||||
}
|
||||
finally {
|
||||
buf.limit(oldLimit);
|
||||
buf.position(oldPosition);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void aggregate(ByteBuffer buf, int numRows, int[] positions, @Nullable int[] rows, int positionOffset)
|
||||
{
|
||||
// Save position, limit and restore later instead of allocating a new ByteBuffer object
|
||||
final int oldPosition = buf.position();
|
||||
final int oldLimit = buf.limit();
|
||||
|
||||
try {
|
||||
final long[] vector = selector.getLongVector();
|
||||
final boolean[] nullVector = selector.getNullVector();
|
||||
|
||||
for (int i = 0; i < numRows; i++) {
|
||||
final int idx = rows != null ? rows[i] : i;
|
||||
if (NullHandling.replaceWithDefault() || nullVector == null || !nullVector[idx]) {
|
||||
final int position = positions[i] + positionOffset;
|
||||
buf.limit(position + HyperLogLogCollector.getLatestNumBytesForDenseStorage());
|
||||
buf.position(position);
|
||||
final HyperLogLogCollector collector = HyperLogLogCollector.makeCollector(buf);
|
||||
LongCardinalityAggregatorColumnSelectorStrategy.addLongToCollector(collector, vector[idx]);
|
||||
}
|
||||
}
|
||||
}
|
||||
finally {
|
||||
buf.limit(oldLimit);
|
||||
buf.position(oldPosition);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,102 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.query.aggregation.cardinality.vector;
|
||||
|
||||
import org.apache.druid.common.config.NullHandling;
|
||||
import org.apache.druid.hll.HyperLogLogCollector;
|
||||
import org.apache.druid.query.aggregation.cardinality.types.StringCardinalityAggregatorColumnSelectorStrategy;
|
||||
import org.apache.druid.segment.data.IndexedInts;
|
||||
import org.apache.druid.segment.vector.MultiValueDimensionVectorSelector;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
public class MultiValueStringCardinalityVectorProcessor implements CardinalityVectorProcessor
|
||||
{
|
||||
private final MultiValueDimensionVectorSelector selector;
|
||||
|
||||
public MultiValueStringCardinalityVectorProcessor(final MultiValueDimensionVectorSelector selector)
|
||||
{
|
||||
this.selector = selector;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void aggregate(ByteBuffer buf, int position, int startRow, int endRow)
|
||||
{
|
||||
// Save position, limit and restore later instead of allocating a new ByteBuffer object
|
||||
final int oldPosition = buf.position();
|
||||
final int oldLimit = buf.limit();
|
||||
|
||||
try {
|
||||
final IndexedInts[] vector = selector.getRowVector();
|
||||
|
||||
buf.limit(position + HyperLogLogCollector.getLatestNumBytesForDenseStorage());
|
||||
buf.position(position);
|
||||
|
||||
final HyperLogLogCollector collector = HyperLogLogCollector.makeCollector(buf);
|
||||
|
||||
for (int i = startRow; i < endRow; i++) {
|
||||
final IndexedInts ids = vector[i];
|
||||
final int sz = ids.size();
|
||||
|
||||
for (int j = 0; j < sz; j++) {
|
||||
final String value = selector.lookupName(ids.get(j));
|
||||
StringCardinalityAggregatorColumnSelectorStrategy.addStringToCollector(collector, value);
|
||||
}
|
||||
}
|
||||
}
|
||||
finally {
|
||||
buf.limit(oldLimit);
|
||||
buf.position(oldPosition);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void aggregate(ByteBuffer buf, int numRows, int[] positions, @Nullable int[] rows, int positionOffset)
|
||||
{
|
||||
// Save position, limit and restore later instead of allocating a new ByteBuffer object
|
||||
final int oldPosition = buf.position();
|
||||
final int oldLimit = buf.limit();
|
||||
|
||||
try {
|
||||
final IndexedInts[] vector = selector.getRowVector();
|
||||
|
||||
for (int i = 0; i < numRows; i++) {
|
||||
final IndexedInts ids = vector[rows != null ? rows[i] : i];
|
||||
final int sz = ids.size();
|
||||
|
||||
for (int j = 0; j < sz; j++) {
|
||||
final String s = selector.lookupName(ids.get(j));
|
||||
if (NullHandling.replaceWithDefault() || s != null) {
|
||||
final int position = positions[i] + positionOffset;
|
||||
buf.limit(position + HyperLogLogCollector.getLatestNumBytesForDenseStorage());
|
||||
buf.position(position);
|
||||
final HyperLogLogCollector collector = HyperLogLogCollector.makeCollector(buf);
|
||||
StringCardinalityAggregatorColumnSelectorStrategy.addStringToCollector(collector, s);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
finally {
|
||||
buf.limit(oldLimit);
|
||||
buf.position(oldPosition);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,46 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.query.aggregation.cardinality.vector;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
public class NilCardinalityVectorProcessor implements CardinalityVectorProcessor
|
||||
{
|
||||
public static final NilCardinalityVectorProcessor INSTANCE = new NilCardinalityVectorProcessor();
|
||||
|
||||
@Override
|
||||
public void aggregate(ByteBuffer buf, int position, int startRow, int endRow)
|
||||
{
|
||||
// Do nothing.
|
||||
}
|
||||
|
||||
@Override
|
||||
public void aggregate(
|
||||
ByteBuffer buf,
|
||||
int numRows,
|
||||
int[] positions,
|
||||
@Nullable int[] rows,
|
||||
int positionOffset
|
||||
)
|
||||
{
|
||||
// Do nothing.
|
||||
}
|
||||
}
|
|
@ -0,0 +1,92 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.query.aggregation.cardinality.vector;
|
||||
|
||||
import org.apache.druid.common.config.NullHandling;
|
||||
import org.apache.druid.hll.HyperLogLogCollector;
|
||||
import org.apache.druid.query.aggregation.cardinality.types.StringCardinalityAggregatorColumnSelectorStrategy;
|
||||
import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
public class SingleValueStringCardinalityVectorProcessor implements CardinalityVectorProcessor
|
||||
{
|
||||
private final SingleValueDimensionVectorSelector selector;
|
||||
|
||||
public SingleValueStringCardinalityVectorProcessor(final SingleValueDimensionVectorSelector selector)
|
||||
{
|
||||
this.selector = selector;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void aggregate(ByteBuffer buf, int position, int startRow, int endRow)
|
||||
{
|
||||
// Save position, limit and restore later instead of allocating a new ByteBuffer object
|
||||
final int oldPosition = buf.position();
|
||||
final int oldLimit = buf.limit();
|
||||
|
||||
try {
|
||||
final int[] vector = selector.getRowVector();
|
||||
|
||||
buf.limit(position + HyperLogLogCollector.getLatestNumBytesForDenseStorage());
|
||||
buf.position(position);
|
||||
|
||||
final HyperLogLogCollector collector = HyperLogLogCollector.makeCollector(buf);
|
||||
|
||||
for (int i = startRow; i < endRow; i++) {
|
||||
final String value = selector.lookupName(vector[i]);
|
||||
StringCardinalityAggregatorColumnSelectorStrategy.addStringToCollector(collector, value);
|
||||
}
|
||||
}
|
||||
finally {
|
||||
buf.limit(oldLimit);
|
||||
buf.position(oldPosition);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void aggregate(ByteBuffer buf, int numRows, int[] positions, @Nullable int[] rows, int positionOffset)
|
||||
{
|
||||
// Save position, limit and restore later instead of allocating a new ByteBuffer object
|
||||
final int oldPosition = buf.position();
|
||||
final int oldLimit = buf.limit();
|
||||
|
||||
try {
|
||||
final int[] vector = selector.getRowVector();
|
||||
|
||||
for (int i = 0; i < numRows; i++) {
|
||||
final String s = selector.lookupName(vector[rows != null ? rows[i] : i]);
|
||||
|
||||
if (NullHandling.replaceWithDefault() || s != null) {
|
||||
final int position = positions[i] + positionOffset;
|
||||
buf.limit(position + HyperLogLogCollector.getLatestNumBytesForDenseStorage());
|
||||
buf.position(position);
|
||||
final HyperLogLogCollector collector = HyperLogLogCollector.makeCollector(buf);
|
||||
StringCardinalityAggregatorColumnSelectorStrategy.addStringToCollector(collector, s);
|
||||
}
|
||||
}
|
||||
}
|
||||
finally {
|
||||
buf.limit(oldLimit);
|
||||
buf.position(oldPosition);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,338 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.query.aggregation.cardinality;
|
||||
|
||||
import org.apache.druid.common.config.NullHandling;
|
||||
import org.apache.druid.hll.HyperLogLogCollector;
|
||||
import org.apache.druid.query.aggregation.cardinality.vector.DoubleCardinalityVectorProcessor;
|
||||
import org.apache.druid.query.aggregation.cardinality.vector.FloatCardinalityVectorProcessor;
|
||||
import org.apache.druid.query.aggregation.cardinality.vector.LongCardinalityVectorProcessor;
|
||||
import org.apache.druid.query.aggregation.cardinality.vector.MultiValueStringCardinalityVectorProcessor;
|
||||
import org.apache.druid.query.aggregation.cardinality.vector.SingleValueStringCardinalityVectorProcessor;
|
||||
import org.apache.druid.segment.IdLookup;
|
||||
import org.apache.druid.segment.data.ArrayBasedIndexedInts;
|
||||
import org.apache.druid.segment.data.IndexedInts;
|
||||
import org.apache.druid.segment.vector.BaseDoubleVectorValueSelector;
|
||||
import org.apache.druid.segment.vector.BaseFloatVectorValueSelector;
|
||||
import org.apache.druid.segment.vector.BaseLongVectorValueSelector;
|
||||
import org.apache.druid.segment.vector.MultiValueDimensionVectorSelector;
|
||||
import org.apache.druid.segment.vector.NoFilterVectorOffset;
|
||||
import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector;
|
||||
import org.apache.druid.testing.InitializedNullHandlingTest;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Collections;
|
||||
|
||||
public class CardinalityVectorAggregatorTest extends InitializedNullHandlingTest
|
||||
{
|
||||
@Test
|
||||
public void testAggregateLong()
|
||||
{
|
||||
final long[] values = {1, 2, 2, 3, 3, 3, 0};
|
||||
final boolean[] nulls = NullHandling.replaceWithDefault()
|
||||
? null
|
||||
: new boolean[]{false, false, false, false, false, false, true};
|
||||
|
||||
final CardinalityVectorAggregator aggregator = new CardinalityVectorAggregator(
|
||||
Collections.singletonList(
|
||||
new LongCardinalityVectorProcessor(
|
||||
new BaseLongVectorValueSelector(new NoFilterVectorOffset(values.length, 0, values.length))
|
||||
{
|
||||
@Override
|
||||
public long[] getLongVector()
|
||||
{
|
||||
return values;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
public boolean[] getNullVector()
|
||||
{
|
||||
return nulls;
|
||||
}
|
||||
}
|
||||
)
|
||||
)
|
||||
);
|
||||
|
||||
testAggregate(aggregator, values.length, NullHandling.replaceWithDefault() ? 4 : 3);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAggregateDouble()
|
||||
{
|
||||
final double[] values = {1, 2, 2, 3, 3, 3, 0};
|
||||
final boolean[] nulls = NullHandling.replaceWithDefault()
|
||||
? null
|
||||
: new boolean[]{false, false, false, false, false, false, true};
|
||||
|
||||
final CardinalityVectorAggregator aggregator = new CardinalityVectorAggregator(
|
||||
Collections.singletonList(
|
||||
new DoubleCardinalityVectorProcessor(
|
||||
new BaseDoubleVectorValueSelector(new NoFilterVectorOffset(values.length, 0, values.length))
|
||||
{
|
||||
@Override
|
||||
public double[] getDoubleVector()
|
||||
{
|
||||
return values;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
public boolean[] getNullVector()
|
||||
{
|
||||
return nulls;
|
||||
}
|
||||
}
|
||||
)
|
||||
)
|
||||
);
|
||||
|
||||
testAggregate(aggregator, values.length, NullHandling.replaceWithDefault() ? 4 : 3);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAggregateFloat()
|
||||
{
|
||||
final float[] values = {1, 2, 2, 3, 3, 3, 0};
|
||||
final boolean[] nulls = NullHandling.replaceWithDefault()
|
||||
? null
|
||||
: new boolean[]{false, false, false, false, false, false, true};
|
||||
|
||||
final CardinalityVectorAggregator aggregator = new CardinalityVectorAggregator(
|
||||
Collections.singletonList(
|
||||
new FloatCardinalityVectorProcessor(
|
||||
new BaseFloatVectorValueSelector(new NoFilterVectorOffset(values.length, 0, values.length))
|
||||
{
|
||||
@Override
|
||||
public float[] getFloatVector()
|
||||
{
|
||||
return values;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
public boolean[] getNullVector()
|
||||
{
|
||||
return nulls;
|
||||
}
|
||||
}
|
||||
)
|
||||
)
|
||||
);
|
||||
|
||||
testAggregate(aggregator, values.length, NullHandling.replaceWithDefault() ? 4 : 3);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAggregateSingleValueString()
|
||||
{
|
||||
final int[] ids = {1, 2, 2, 3, 3, 3, 0};
|
||||
final String[] dict = {null, "abc", "def", "foo"};
|
||||
|
||||
final CardinalityVectorAggregator aggregator = new CardinalityVectorAggregator(
|
||||
Collections.singletonList(
|
||||
new SingleValueStringCardinalityVectorProcessor(
|
||||
new SingleValueDimensionVectorSelector()
|
||||
{
|
||||
@Override
|
||||
public int[] getRowVector()
|
||||
{
|
||||
return ids;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getValueCardinality()
|
||||
{
|
||||
return dict.length;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
public String lookupName(int id)
|
||||
{
|
||||
return dict[id];
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean nameLookupPossibleInAdvance()
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
public IdLookup idLookup()
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getMaxVectorSize()
|
||||
{
|
||||
return ids.length;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getCurrentVectorSize()
|
||||
{
|
||||
return ids.length;
|
||||
}
|
||||
}
|
||||
)
|
||||
)
|
||||
);
|
||||
|
||||
testAggregate(aggregator, ids.length, NullHandling.replaceWithDefault() ? 4 : 3);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAggregateMultiValueString()
|
||||
{
|
||||
final IndexedInts[] ids = {
|
||||
new ArrayBasedIndexedInts(new int[]{1, 2}),
|
||||
new ArrayBasedIndexedInts(new int[]{2, 3}),
|
||||
new ArrayBasedIndexedInts(new int[]{3, 3}),
|
||||
new ArrayBasedIndexedInts(new int[]{0})
|
||||
};
|
||||
|
||||
final String[] dict = {null, "abc", "def", "foo"};
|
||||
|
||||
final CardinalityVectorAggregator aggregator = new CardinalityVectorAggregator(
|
||||
Collections.singletonList(
|
||||
new MultiValueStringCardinalityVectorProcessor(
|
||||
new MultiValueDimensionVectorSelector()
|
||||
{
|
||||
@Override
|
||||
public IndexedInts[] getRowVector()
|
||||
{
|
||||
return ids;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getValueCardinality()
|
||||
{
|
||||
return dict.length;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
public String lookupName(int id)
|
||||
{
|
||||
return dict[id];
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean nameLookupPossibleInAdvance()
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
public IdLookup idLookup()
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getMaxVectorSize()
|
||||
{
|
||||
return ids.length;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getCurrentVectorSize()
|
||||
{
|
||||
return ids.length;
|
||||
}
|
||||
}
|
||||
)
|
||||
)
|
||||
);
|
||||
|
||||
testAggregate(aggregator, ids.length, NullHandling.replaceWithDefault() ? 4 : 3);
|
||||
}
|
||||
|
||||
private static void testAggregate(
|
||||
final CardinalityVectorAggregator aggregator,
|
||||
final int numRows,
|
||||
final double expectedResult
|
||||
)
|
||||
{
|
||||
testAggregateStyle1(aggregator, numRows, expectedResult);
|
||||
testAggregateStyle2(aggregator, numRows, expectedResult);
|
||||
}
|
||||
|
||||
private static void testAggregateStyle1(
|
||||
final CardinalityVectorAggregator aggregator,
|
||||
final int numRows,
|
||||
final double expectedResult
|
||||
)
|
||||
{
|
||||
final int position = 1;
|
||||
final ByteBuffer buf = ByteBuffer.allocate(HyperLogLogCollector.getLatestNumBytesForDenseStorage() + position);
|
||||
aggregator.init(buf, position);
|
||||
aggregator.aggregate(buf, position, 0, numRows);
|
||||
|
||||
Assert.assertEquals(
|
||||
"style1",
|
||||
expectedResult,
|
||||
((HyperLogLogCollector) aggregator.get(buf, position)).estimateCardinality(),
|
||||
0.01
|
||||
);
|
||||
}
|
||||
|
||||
private static void testAggregateStyle2(
|
||||
final CardinalityVectorAggregator aggregator,
|
||||
final int numRows,
|
||||
final double expectedResult
|
||||
)
|
||||
{
|
||||
final int positionOffset = 1;
|
||||
|
||||
final int aggregatorSize = HyperLogLogCollector.getLatestNumBytesForDenseStorage();
|
||||
final ByteBuffer buf = ByteBuffer.allocate(positionOffset + 2 * aggregatorSize);
|
||||
aggregator.init(buf, positionOffset);
|
||||
aggregator.init(buf, positionOffset + aggregatorSize);
|
||||
|
||||
final int[] positions = new int[numRows];
|
||||
final int[] rows = new int[numRows];
|
||||
|
||||
for (int i = 0; i < numRows; i++) {
|
||||
positions[i] = (i % 2) * aggregatorSize;
|
||||
rows[i] = (i + 1) % numRows;
|
||||
}
|
||||
|
||||
aggregator.aggregate(buf, numRows, positions, rows, positionOffset);
|
||||
|
||||
Assert.assertEquals(
|
||||
"style2",
|
||||
expectedResult,
|
||||
((HyperLogLogCollector) aggregator.get(buf, positionOffset))
|
||||
.fold((HyperLogLogCollector) aggregator.get(buf, positionOffset + aggregatorSize))
|
||||
.estimateCardinality(),
|
||||
0.01
|
||||
);
|
||||
}
|
||||
}
|
|
@ -2566,9 +2566,6 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
|
|||
@Test
|
||||
public void testGroupByWithCardinality()
|
||||
{
|
||||
// Cannot vectorize due to "cardinality" aggregator.
|
||||
cannotVectorize();
|
||||
|
||||
GroupByQuery query = makeQueryBuilder()
|
||||
.setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
|
||||
.setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD)
|
||||
|
@ -2684,7 +2681,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
|
|||
@Test
|
||||
public void testGroupByWithNoResult()
|
||||
{
|
||||
// Cannot vectorize due to "cardinality" aggregator.
|
||||
// Cannot vectorize due to first, last aggregators.
|
||||
cannotVectorize();
|
||||
|
||||
GroupByQuery query = makeQueryBuilder()
|
||||
|
@ -8684,7 +8681,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
|
|||
@Test
|
||||
public void testGroupByCardinalityAggWithExtractionFn()
|
||||
{
|
||||
// Cannot vectorize due to "cardinality" aggregator.
|
||||
// Cannot vectorize due to extraction dimension spec.
|
||||
cannotVectorize();
|
||||
|
||||
String helloJsFn = "function(str) { return 'hello' }";
|
||||
|
@ -8776,9 +8773,6 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
|
|||
@Test
|
||||
public void testGroupByCardinalityAggOnFloat()
|
||||
{
|
||||
// Cannot vectorize due to "cardinality" aggregator.
|
||||
cannotVectorize();
|
||||
|
||||
GroupByQuery query = makeQueryBuilder()
|
||||
.setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
|
||||
.setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD)
|
||||
|
|
|
@ -3459,9 +3459,6 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
|
|||
@Test
|
||||
public void testHavingOnApproximateCountDistinct() throws Exception
|
||||
{
|
||||
// Cannot vectorize due to "cardinality" aggregator.
|
||||
cannotVectorize();
|
||||
|
||||
testQuery(
|
||||
"SELECT dim2, COUNT(DISTINCT m1) FROM druid.foo GROUP BY dim2 HAVING COUNT(DISTINCT m1) > 1",
|
||||
ImmutableList.of(
|
||||
|
@ -6269,9 +6266,6 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
|
|||
@Test
|
||||
public void testFilteredAggregations() throws Exception
|
||||
{
|
||||
// Cannot vectorize due to "cardinality" aggregator.
|
||||
cannotVectorize();
|
||||
|
||||
testQuery(
|
||||
"SELECT "
|
||||
+ "SUM(case dim1 when 'abc' then cnt end), "
|
||||
|
@ -7657,9 +7651,6 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
|
|||
@Test
|
||||
public void testCountDistinct() throws Exception
|
||||
{
|
||||
// Cannot vectorize due to "cardinality" aggregator.
|
||||
cannotVectorize();
|
||||
|
||||
testQuery(
|
||||
"SELECT SUM(cnt), COUNT(distinct dim2), COUNT(distinct unique_dim1) FROM druid.foo",
|
||||
ImmutableList.of(
|
||||
|
@ -7692,9 +7683,6 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
|
|||
@Test
|
||||
public void testCountDistinctOfCaseWhen() throws Exception
|
||||
{
|
||||
// Cannot vectorize due to "cardinality" aggregator.
|
||||
cannotVectorize();
|
||||
|
||||
testQuery(
|
||||
"SELECT\n"
|
||||
+ "COUNT(DISTINCT CASE WHEN m1 >= 4 THEN m1 END),\n"
|
||||
|
@ -7787,9 +7775,6 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
|
|||
{
|
||||
// When HLL is disabled, APPROX_COUNT_DISTINCT is still approximate.
|
||||
|
||||
// Cannot vectorize due to "cardinality" aggregator.
|
||||
cannotVectorize();
|
||||
|
||||
testQuery(
|
||||
PLANNER_CONFIG_NO_HLL,
|
||||
"SELECT APPROX_COUNT_DISTINCT(dim2) FROM druid.foo",
|
||||
|
@ -8362,7 +8347,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
|
|||
@Test
|
||||
public void testAvgDailyCountDistinct() throws Exception
|
||||
{
|
||||
// Cannot vectorize due to virtual columns.
|
||||
// Cannot vectorize outer query due to inlined inner query.
|
||||
cannotVectorize();
|
||||
|
||||
testQuery(
|
||||
|
@ -9034,9 +9019,6 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
|
|||
@Test
|
||||
public void testCountDistinctArithmetic() throws Exception
|
||||
{
|
||||
// Cannot vectorize due to "cardinality" aggregator.
|
||||
cannotVectorize();
|
||||
|
||||
testQuery(
|
||||
"SELECT\n"
|
||||
+ " SUM(cnt),\n"
|
||||
|
@ -9081,7 +9063,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
|
|||
@Test
|
||||
public void testCountDistinctOfSubstring() throws Exception
|
||||
{
|
||||
// Cannot vectorize due to "cardinality" aggregator.
|
||||
// Cannot vectorize due to extraction dimension spec.
|
||||
cannotVectorize();
|
||||
|
||||
testQuery(
|
||||
|
@ -11808,7 +11790,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
|
|||
@Test
|
||||
public void testCountDistinctOfLookup() throws Exception
|
||||
{
|
||||
// Cannot vectorize due to "cardinality" aggregator.
|
||||
// Cannot vectorize due to extraction dimension spec.
|
||||
cannotVectorize();
|
||||
|
||||
final RegisteredLookupExtractionFn extractionFn = new RegisteredLookupExtractionFn(
|
||||
|
@ -15219,7 +15201,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
|
|||
)
|
||||
);
|
||||
|
||||
// Cannot vectorize next test due to "cardinality" aggregator.
|
||||
// Cannot vectorize next test due to extraction dimension spec.
|
||||
cannotVectorize();
|
||||
|
||||
// semi-join requires time condition on both left and right query
|
||||
|
|
Loading…
Reference in New Issue