Add vectorization for druid-histogram extension (#10304)

* First draft

* Remove redundant code from FixedBucketsHistogramAggregator classes

* Add test cases for new classes

* Fix tests in sql compatible mode

* Typo fix

* Fix comment

* Add spelling

* Vectorize only for supported types

* Rename internal aggregator files

* Fix tests
This commit is contained in:
Abhishek Agarwal 2020-09-10 02:26:33 +05:30 committed by GitHub
parent e5f0da30ae
commit a5c46dc84b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 1267 additions and 115 deletions

View File

@ -90,7 +90,8 @@ requirements:
include "selector", "bound", "in", "like", "regex", "search", "and", "or", and "not". include "selector", "bound", "in", "like", "regex", "search", "and", "or", and "not".
- All filters in filtered aggregators must offer vectorized row-matchers. - All filters in filtered aggregators must offer vectorized row-matchers.
- All aggregators must offer vectorized implementations. These include "count", "doubleSum", "floatSum", "longSum", "longMin", - All aggregators must offer vectorized implementations. These include "count", "doubleSum", "floatSum", "longSum", "longMin",
"longMax", "doubleMin", "doubleMax", "floatMin", "floatMax", "hyperUnique", and "filtered". "longMax", "doubleMin", "doubleMax", "floatMin", "floatMax", "hyperUnique", "filtered", "approxHistogram",
"approxHistogramFold", and "fixedBucketsHistogram" (with numerical input).
- No virtual columns. - No virtual columns.
- For GroupBy: All dimension specs must be "default" (no extraction functions or filtered dimension specs). - For GroupBy: All dimension specs must be "default" (no extraction functions or filtered dimension specs).
- For GroupBy: No multi-value dimensions. - For GroupBy: No multi-value dimensions.

View File

@ -32,10 +32,14 @@ import org.apache.druid.query.aggregation.AggregatorFactoryNotMergeableException
import org.apache.druid.query.aggregation.AggregatorUtil; import org.apache.druid.query.aggregation.AggregatorUtil;
import org.apache.druid.query.aggregation.BufferAggregator; import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.aggregation.ObjectAggregateCombiner; import org.apache.druid.query.aggregation.ObjectAggregateCombiner;
import org.apache.druid.query.aggregation.VectorAggregator;
import org.apache.druid.query.cache.CacheKeyBuilder; import org.apache.druid.query.cache.CacheKeyBuilder;
import org.apache.druid.segment.ColumnInspector;
import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ValueType; import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
@ -103,6 +107,24 @@ public class ApproximateHistogramAggregatorFactory extends AggregatorFactory
); );
} }
@Override
public VectorAggregator factorizeVector(VectorColumnSelectorFactory metricVectorFactory)
{
return new ApproximateHistogramVectorAggregator(
metricVectorFactory.makeValueSelector(fieldName),
resolution
);
}
@Override
public boolean canVectorize(ColumnInspector columnInspector)
{
/* skip vectorization for string types which may be parseable to numbers. There is no vector equivalent of
string value selector*/
ColumnCapabilities capabilities = columnInspector.getColumnCapabilities(fieldName);
return (capabilities != null) && capabilities.getType().isNumeric();
}
@Override @Override
public Comparator getComparator() public Comparator getComparator()
{ {

View File

@ -28,54 +28,30 @@ import java.nio.ByteBuffer;
public class ApproximateHistogramBufferAggregator implements BufferAggregator public class ApproximateHistogramBufferAggregator implements BufferAggregator
{ {
private final BaseFloatColumnValueSelector selector; private final BaseFloatColumnValueSelector selector;
private final int resolution; private final ApproximateHistogramBufferAggregatorHelper innerAggregator;
public ApproximateHistogramBufferAggregator(BaseFloatColumnValueSelector selector, int resolution) public ApproximateHistogramBufferAggregator(BaseFloatColumnValueSelector selector, int resolution)
{ {
this.selector = selector; this.selector = selector;
this.resolution = resolution; this.innerAggregator = new ApproximateHistogramBufferAggregatorHelper(resolution);
} }
@Override @Override
public void init(ByteBuffer buf, int position) public void init(ByteBuffer buf, int position)
{ {
ByteBuffer mutationBuffer = buf.duplicate(); innerAggregator.init(buf, position);
mutationBuffer.position(position);
mutationBuffer.putInt(resolution);
mutationBuffer.putInt(0); //initial binCount
for (int i = 0; i < resolution; ++i) {
mutationBuffer.putFloat(0f);
}
for (int i = 0; i < resolution; ++i) {
mutationBuffer.putLong(0L);
}
// min
mutationBuffer.putFloat(Float.POSITIVE_INFINITY);
// max
mutationBuffer.putFloat(Float.NEGATIVE_INFINITY);
} }
@Override @Override
public void aggregate(ByteBuffer buf, int position) public void aggregate(ByteBuffer buf, int position)
{ {
ByteBuffer mutationBuffer = buf.duplicate(); innerAggregator.aggregate(buf, position, selector.getFloat());
mutationBuffer.position(position);
ApproximateHistogram h0 = ApproximateHistogram.fromBytesDense(mutationBuffer);
h0.offer(selector.getFloat());
mutationBuffer.position(position);
h0.toBytesDense(mutationBuffer);
} }
@Override @Override
public Object get(ByteBuffer buf, int position) public Object get(ByteBuffer buf, int position)
{ {
ByteBuffer mutationBuffer = buf.duplicate(); return innerAggregator.get(buf, position);
mutationBuffer.position(position);
return ApproximateHistogram.fromBytes(mutationBuffer);
} }
@Override @Override

View File

@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.histogram;
import java.nio.ByteBuffer;
/**
* A helper class used by {@link ApproximateHistogramBufferAggregator} and {@link ApproximateHistogramVectorAggregator}
* for aggregation operations on byte buffers. Getting the object from value selectors is outside this class.
*/
final class ApproximateHistogramBufferAggregatorHelper
{
private final int resolution;
public ApproximateHistogramBufferAggregatorHelper(int resolution)
{
this.resolution = resolution;
}
public void init(final ByteBuffer buf, final int position)
{
ApproximateHistogram histogram = new ApproximateHistogram(resolution);
ByteBuffer mutationBuffer = buf.duplicate();
mutationBuffer.position(position);
histogram.toBytesDense(mutationBuffer);
}
public ApproximateHistogram get(final ByteBuffer buf, final int position)
{
ByteBuffer mutationBuffer = buf.duplicate();
mutationBuffer.position(position);
return ApproximateHistogram.fromBytesDense(mutationBuffer);
}
public void put(final ByteBuffer buf, final int position, final ApproximateHistogram histogram)
{
ByteBuffer mutationBuffer = buf.duplicate();
mutationBuffer.position(position);
histogram.toBytesDense(mutationBuffer);
}
public void aggregate(final ByteBuffer buf, final int position, final float value)
{
ByteBuffer mutationBuffer = buf.duplicate();
mutationBuffer.position(position);
ApproximateHistogram h0 = ApproximateHistogram.fromBytesDense(mutationBuffer);
h0.offer(value);
mutationBuffer.position(position);
h0.toBytesDense(mutationBuffer);
}
}

View File

@ -27,9 +27,15 @@ import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.AggregatorUtil; import org.apache.druid.query.aggregation.AggregatorUtil;
import org.apache.druid.query.aggregation.BufferAggregator; import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.aggregation.VectorAggregator;
import org.apache.druid.query.cache.CacheKeyBuilder; import org.apache.druid.query.cache.CacheKeyBuilder;
import org.apache.druid.segment.ColumnInspector;
import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
import org.apache.druid.segment.vector.VectorObjectSelector;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.util.Objects; import java.util.Objects;
@ -93,10 +99,33 @@ public class ApproximateHistogramFoldingAggregatorFactory extends ApproximateHis
); );
} }
@Override
public VectorAggregator factorizeVector(VectorColumnSelectorFactory metricVectorFactory)
{
VectorObjectSelector selector = metricVectorFactory.makeObjectSelector(fieldName);
return new ApproximateHistogramFoldingVectorAggregator(selector, resolution, lowerLimit, upperLimit);
}
@Override
public boolean canVectorize(ColumnInspector columnInspector)
{
ColumnCapabilities capabilities = columnInspector.getColumnCapabilities(fieldName);
return (capabilities != null) && (capabilities.getType() == ValueType.COMPLEX);
}
@Override @Override
public AggregatorFactory getCombiningFactory() public AggregatorFactory getCombiningFactory()
{ {
return new ApproximateHistogramFoldingAggregatorFactory(name, name, resolution, numBuckets, lowerLimit, upperLimit, finalizeAsBase64Binary); return new ApproximateHistogramFoldingAggregatorFactory(
name,
name,
resolution,
numBuckets,
lowerLimit,
upperLimit,
finalizeAsBase64Binary
);
} }
@Override @Override

View File

@ -28,12 +28,7 @@ import java.nio.ByteBuffer;
public class ApproximateHistogramFoldingBufferAggregator implements BufferAggregator public class ApproximateHistogramFoldingBufferAggregator implements BufferAggregator
{ {
private final BaseObjectColumnValueSelector<ApproximateHistogram> selector; private final BaseObjectColumnValueSelector<ApproximateHistogram> selector;
private final int resolution; private final ApproximateHistogramFoldingBufferAggregatorHelper innerAggregator;
private final float upperLimit;
private final float lowerLimit;
private float[] tmpBufferP;
private long[] tmpBufferB;
public ApproximateHistogramFoldingBufferAggregator( public ApproximateHistogramFoldingBufferAggregator(
BaseObjectColumnValueSelector<ApproximateHistogram> selector, BaseObjectColumnValueSelector<ApproximateHistogram> selector,
@ -43,50 +38,26 @@ public class ApproximateHistogramFoldingBufferAggregator implements BufferAggreg
) )
{ {
this.selector = selector; this.selector = selector;
this.resolution = resolution; this.innerAggregator = new ApproximateHistogramFoldingBufferAggregatorHelper(resolution, lowerLimit, upperLimit);
this.lowerLimit = lowerLimit;
this.upperLimit = upperLimit;
tmpBufferP = new float[resolution];
tmpBufferB = new long[resolution];
} }
@Override @Override
public void init(ByteBuffer buf, int position) public void init(ByteBuffer buf, int position)
{ {
ApproximateHistogram h = new ApproximateHistogram(resolution, lowerLimit, upperLimit); innerAggregator.init(buf, position);
ByteBuffer mutationBuffer = buf.duplicate();
mutationBuffer.position(position);
// use dense storage for aggregation
h.toBytesDense(mutationBuffer);
} }
@Override @Override
public void aggregate(ByteBuffer buf, int position) public void aggregate(ByteBuffer buf, int position)
{ {
ApproximateHistogram hNext = selector.getObject(); ApproximateHistogram hNext = selector.getObject();
if (hNext == null) { innerAggregator.aggregate(buf, position, hNext);
return;
}
ByteBuffer mutationBuffer = buf.duplicate();
mutationBuffer.position(position);
ApproximateHistogram h0 = ApproximateHistogram.fromBytesDense(mutationBuffer);
h0.setLowerLimit(lowerLimit);
h0.setUpperLimit(upperLimit);
h0.foldFast(hNext, tmpBufferP, tmpBufferB);
mutationBuffer.position(position);
h0.toBytesDense(mutationBuffer);
} }
@Override @Override
public Object get(ByteBuffer buf, int position) public Object get(ByteBuffer buf, int position)
{ {
ByteBuffer mutationBuffer = buf.asReadOnlyBuffer(); return innerAggregator.get(buf, position);
mutationBuffer.position(position);
return ApproximateHistogram.fromBytesDense(mutationBuffer);
} }
@Override @Override
@ -106,6 +77,7 @@ public class ApproximateHistogramFoldingBufferAggregator implements BufferAggreg
{ {
throw new UnsupportedOperationException("ApproximateHistogramFoldingBufferAggregator does not support getDouble()"); throw new UnsupportedOperationException("ApproximateHistogramFoldingBufferAggregator does not support getDouble()");
} }
@Override @Override
public void close() public void close()
{ {

View File

@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.histogram;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
/**
* A helper class used by {@link ApproximateHistogramFoldingBufferAggregator} and
* {@link ApproximateHistogramFoldingVectorAggregator} for aggregation operations on byte buffers.
* Getting the object from value selectors is outside this class.
*/
final class ApproximateHistogramFoldingBufferAggregatorHelper
{
private final int resolution;
private final float upperLimit;
private final float lowerLimit;
private float[] tmpBufferA;
private long[] tmpBufferB;
public ApproximateHistogramFoldingBufferAggregatorHelper(
int resolution,
float lowerLimit,
float upperLimit
)
{
this.resolution = resolution;
this.lowerLimit = lowerLimit;
this.upperLimit = upperLimit;
tmpBufferA = new float[resolution];
tmpBufferB = new long[resolution];
}
public void init(ByteBuffer buf, int position)
{
ApproximateHistogram h = new ApproximateHistogram(resolution, lowerLimit, upperLimit);
ByteBuffer mutationBuffer = buf.duplicate();
mutationBuffer.position(position);
// use dense storage for aggregation
h.toBytesDense(mutationBuffer);
}
public void aggregate(ByteBuffer buf, int position, @Nullable ApproximateHistogram hNext)
{
if (hNext == null) {
return;
}
ByteBuffer mutationBuffer = buf.duplicate();
mutationBuffer.position(position);
ApproximateHistogram h0 = ApproximateHistogram.fromBytesDense(mutationBuffer);
foldFast(h0, hNext);
mutationBuffer.position(position);
h0.toBytesDense(mutationBuffer);
}
public void foldFast(ApproximateHistogram left, ApproximateHistogram right)
{
//These have to set in every call since limits are transient and lost during serialization-deserialization
left.setLowerLimit(lowerLimit);
left.setUpperLimit(upperLimit);
left.foldFast(right, tmpBufferA, tmpBufferB);
}
public ApproximateHistogram get(ByteBuffer buf, int position)
{
ByteBuffer mutationBuffer = buf.asReadOnlyBuffer();
mutationBuffer.position(position);
return ApproximateHistogram.fromBytesDense(mutationBuffer);
}
public void put(ByteBuffer buf, int position, ApproximateHistogram histogram)
{
ByteBuffer mutationBuffer = buf.duplicate();
mutationBuffer.position(position);
histogram.toBytesDense(mutationBuffer);
}
}

View File

@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.histogram;
import org.apache.druid.query.aggregation.VectorAggregator;
import org.apache.druid.segment.vector.VectorObjectSelector;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
public class ApproximateHistogramFoldingVectorAggregator implements VectorAggregator
{
private final ApproximateHistogramFoldingBufferAggregatorHelper innerAggregator;
private final VectorObjectSelector selector;
public ApproximateHistogramFoldingVectorAggregator(
final VectorObjectSelector selector,
final int resolution,
final float lowerLimit,
final float upperLimit
)
{
this.selector = selector;
this.innerAggregator = new ApproximateHistogramFoldingBufferAggregatorHelper(resolution, lowerLimit, upperLimit);
}
@Override
public void init(ByteBuffer buf, int position)
{
innerAggregator.init(buf, position);
}
@Override
public void aggregate(ByteBuffer buf, int position, int startRow, int endRow)
{
Object[] vector = selector.getObjectVector();
ApproximateHistogram histogram = innerAggregator.get(buf, position);
for (int i = startRow; i < endRow; i++) {
ApproximateHistogram other = (ApproximateHistogram) vector[i];
if (null != other) {
innerAggregator.foldFast(histogram, other);
}
}
innerAggregator.put(buf, position, histogram);
}
@Override
public void aggregate(ByteBuffer buf, int numRows, int[] positions, @Nullable int[] rows, int positionOffset)
{
Object[] vector = selector.getObjectVector();
for (int i = 0; i < numRows; i++) {
ApproximateHistogram other = (ApproximateHistogram) vector[null != rows ? rows[i] : i];
if (null == other) {
continue;
}
int position = positions[i] + positionOffset;
innerAggregator.aggregate(buf, position, other);
}
}
@Nullable
@Override
public Object get(ByteBuffer buf, int position)
{
return innerAggregator.get(buf, position);
}
@Override
public void close()
{
// Nothing to close
}
}

View File

@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.histogram;
import org.apache.druid.query.aggregation.VectorAggregator;
import org.apache.druid.segment.vector.VectorValueSelector;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
public class ApproximateHistogramVectorAggregator implements VectorAggregator
{
private final VectorValueSelector selector;
private final ApproximateHistogramBufferAggregatorHelper innerAggregator;
public ApproximateHistogramVectorAggregator(
VectorValueSelector selector,
int resolution
)
{
this.selector = selector;
this.innerAggregator = new ApproximateHistogramBufferAggregatorHelper(resolution);
}
@Override
public void init(final ByteBuffer buf, final int position)
{
innerAggregator.init(buf, position);
}
@Override
public void aggregate(final ByteBuffer buf, final int position, final int startRow, final int endRow)
{
final boolean[] isValueNull = selector.getNullVector();
final float[] vector = selector.getFloatVector();
ApproximateHistogram histogram = innerAggregator.get(buf, position);
for (int i = startRow; i < endRow; i++) {
if (isValueNull != null && isValueNull[i]) {
continue;
}
histogram.offer(vector[i]);
}
innerAggregator.put(buf, position, histogram);
}
@Override
public Object get(ByteBuffer buf, int position)
{
return innerAggregator.get(buf, position);
}
@Override
public void aggregate(ByteBuffer buf, int numRows, int[] positions, @Nullable int[] rows, int positionOffset)
{
final float[] vector = selector.getFloatVector();
final boolean[] isValueNull = selector.getNullVector();
for (int i = 0; i < numRows; i++) {
if (isValueNull != null && isValueNull[i]) {
continue;
}
final int position = positions[i] + positionOffset;
innerAggregator.aggregate(buf, position, vector[rows != null ? rows[i] : i]);
}
}
@Override
public void close()
{
// no resources to cleanup
}
}

View File

@ -25,9 +25,11 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonValue; import com.fasterxml.jackson.annotation.JsonValue;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
import javax.annotation.Nullable;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Arrays; import java.util.Arrays;
@ -431,6 +433,34 @@ public class FixedBucketsHistogram
} }
} }
/**
* Merge another datapoint into this one. The other datapoint could be
* - base64 encoded string of {@code FixedBucketsHistogram}
* - {@code FixedBucketsHistogram} object
* - Numeric value
*
* @param val
*/
@VisibleForTesting
public void combine(@Nullable Object val)
{
if (val == null) {
if (NullHandling.replaceWithDefault()) {
add(NullHandling.defaultDoubleValue());
} else {
incrementMissing();
}
} else if (val instanceof String) {
combineHistogram(fromBase64((String) val));
} else if (val instanceof FixedBucketsHistogram) {
combineHistogram((FixedBucketsHistogram) val);
} else if (val instanceof Number) {
add(((Number) val).doubleValue());
} else {
throw new ISE("Unknown class for object: " + val.getClass());
}
}
/** /**
* Merge another histogram into this one. Only the state of this histogram is updated. * Merge another histogram into this one. Only the state of this histogram is updated.
* *

View File

@ -20,8 +20,6 @@
package org.apache.druid.query.aggregation.histogram; package org.apache.druid.query.aggregation.histogram;
import com.google.common.primitives.Longs; import com.google.common.primitives.Longs;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.segment.BaseObjectColumnValueSelector; import org.apache.druid.segment.BaseObjectColumnValueSelector;
@ -66,22 +64,7 @@ public class FixedBucketsHistogramAggregator implements Aggregator
public void aggregate() public void aggregate()
{ {
Object val = selector.getObject(); Object val = selector.getObject();
histogram.combine(val);
if (val == null) {
if (NullHandling.replaceWithDefault()) {
histogram.add(NullHandling.defaultDoubleValue());
} else {
histogram.incrementMissing();
}
} else if (val instanceof String) {
histogram.combineHistogram(FixedBucketsHistogram.fromBase64((String) val));
} else if (val instanceof FixedBucketsHistogram) {
histogram.combineHistogram((FixedBucketsHistogram) val);
} else if (val instanceof Number) {
histogram.add(((Number) val).doubleValue());
} else {
throw new ISE("Unknown class for object: " + val.getClass());
}
} }
@Nullable @Nullable

View File

@ -22,6 +22,7 @@ package org.apache.druid.query.aggregation.histogram;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName; import com.fasterxml.jackson.annotation.JsonTypeName;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.aggregation.AggregateCombiner; import org.apache.druid.query.aggregation.AggregateCombiner;
import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.query.aggregation.Aggregator;
@ -29,10 +30,14 @@ import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.AggregatorUtil; import org.apache.druid.query.aggregation.AggregatorUtil;
import org.apache.druid.query.aggregation.BufferAggregator; import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.aggregation.ObjectAggregateCombiner; import org.apache.druid.query.aggregation.ObjectAggregateCombiner;
import org.apache.druid.query.aggregation.VectorAggregator;
import org.apache.druid.query.cache.CacheKeyBuilder; import org.apache.druid.query.cache.CacheKeyBuilder;
import org.apache.druid.segment.ColumnInspector;
import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ValueType; import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.util.Collections; import java.util.Collections;
@ -100,6 +105,34 @@ public class FixedBucketsHistogramAggregatorFactory extends AggregatorFactory
); );
} }
@Override
public VectorAggregator factorizeVector(VectorColumnSelectorFactory columnSelectorFactory)
{
ColumnCapabilities capabilities = columnSelectorFactory.getColumnCapabilities(fieldName);
if (null == capabilities) {
throw new IAE("could not find the column type for column %s", fieldName);
}
ValueType type = capabilities.getType();
if (type.isNumeric()) {
return new FixedBucketsHistogramVectorAggregator(
columnSelectorFactory.makeValueSelector(fieldName),
lowerLimit,
upperLimit,
numBuckets,
outlierHandlingMode
);
} else {
throw new IAE("cannot vectorize fixed bucket histogram aggregation for type %s", type);
}
}
@Override
public boolean canVectorize(ColumnInspector columnInspector)
{
ColumnCapabilities capabilities = columnInspector.getColumnCapabilities(fieldName);
return (capabilities != null) && capabilities.getType().isNumeric();
}
@Override @Override
public Comparator getComparator() public Comparator getComparator()
{ {

View File

@ -19,7 +19,6 @@
package org.apache.druid.query.aggregation.histogram; package org.apache.druid.query.aggregation.histogram;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.query.aggregation.BufferAggregator; import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.BaseObjectColumnValueSelector; import org.apache.druid.segment.BaseObjectColumnValueSelector;
@ -29,8 +28,7 @@ import java.nio.ByteBuffer;
public class FixedBucketsHistogramBufferAggregator implements BufferAggregator public class FixedBucketsHistogramBufferAggregator implements BufferAggregator
{ {
private final BaseObjectColumnValueSelector selector; private final BaseObjectColumnValueSelector selector;
private final FixedBucketsHistogramBufferAggregatorHelper innerAggregator;
private FixedBucketsHistogram histogram;
public FixedBucketsHistogramBufferAggregator( public FixedBucketsHistogramBufferAggregator(
BaseObjectColumnValueSelector selector, BaseObjectColumnValueSelector selector,
@ -41,7 +39,7 @@ public class FixedBucketsHistogramBufferAggregator implements BufferAggregator
) )
{ {
this.selector = selector; this.selector = selector;
this.histogram = new FixedBucketsHistogram( this.innerAggregator = new FixedBucketsHistogramBufferAggregatorHelper(
lowerLimit, lowerLimit,
upperLimit, upperLimit,
numBuckets, numBuckets,
@ -52,45 +50,20 @@ public class FixedBucketsHistogramBufferAggregator implements BufferAggregator
@Override @Override
public void init(ByteBuffer buf, int position) public void init(ByteBuffer buf, int position)
{ {
ByteBuffer mutationBuffer = buf.duplicate(); innerAggregator.init(buf, position);
mutationBuffer.position(position);
mutationBuffer.put(histogram.toBytesFull(false));
} }
@Override @Override
public void aggregate(ByteBuffer buf, int position) public void aggregate(ByteBuffer buf, int position)
{ {
ByteBuffer mutationBuffer = buf.duplicate();
mutationBuffer.position(position);
FixedBucketsHistogram h0 = FixedBucketsHistogram.fromByteBufferFullNoSerdeHeader(mutationBuffer);
Object val = selector.getObject(); Object val = selector.getObject();
if (val == null) { innerAggregator.aggregate(buf, position, val);
if (NullHandling.replaceWithDefault()) {
h0.incrementMissing();
} else {
h0.add(NullHandling.defaultDoubleValue());
}
} else if (val instanceof String) {
h0.combineHistogram(FixedBucketsHistogram.fromBase64((String) val));
} else if (val instanceof FixedBucketsHistogram) {
h0.combineHistogram((FixedBucketsHistogram) val);
} else {
Double x = ((Number) val).doubleValue();
h0.add(x);
}
mutationBuffer.position(position);
mutationBuffer.put(h0.toBytesFull(false));
} }
@Override @Override
public Object get(ByteBuffer buf, int position) public Object get(ByteBuffer buf, int position)
{ {
ByteBuffer mutationBuffer = buf.duplicate(); return innerAggregator.get(buf, position);
mutationBuffer.position(position);
return FixedBucketsHistogram.fromByteBufferFullNoSerdeHeader(mutationBuffer);
} }
@Override @Override

View File

@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.histogram;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
/**
* A helper class used by {@link FixedBucketsHistogramBufferAggregator} and
* {@link FixedBucketsHistogramVectorAggregator} for aggregation operations on byte buffers.
* Getting the object from value selectors is outside this class.
*/
final class FixedBucketsHistogramBufferAggregatorHelper
{
private final double lowerLimit;
private final double upperLimit;
private final int numBuckets;
private final FixedBucketsHistogram.OutlierHandlingMode outlierHandlingMode;
public FixedBucketsHistogramBufferAggregatorHelper(
double lowerLimit,
double upperLimit,
int numBuckets,
FixedBucketsHistogram.OutlierHandlingMode outlierHandlingMode
)
{
this.lowerLimit = lowerLimit;
this.upperLimit = upperLimit;
this.numBuckets = numBuckets;
this.outlierHandlingMode = outlierHandlingMode;
}
public void init(ByteBuffer buf, int position)
{
ByteBuffer mutationBuffer = buf.duplicate();
mutationBuffer.position(position);
FixedBucketsHistogram histogram = new FixedBucketsHistogram(
lowerLimit,
upperLimit,
numBuckets,
outlierHandlingMode
);
mutationBuffer.put(histogram.toBytesFull(false));
}
public void aggregate(ByteBuffer buf, int position, @Nullable Object val)
{
ByteBuffer mutationBuffer = buf.duplicate();
mutationBuffer.position(position);
FixedBucketsHistogram h0 = FixedBucketsHistogram.fromByteBufferFullNoSerdeHeader(mutationBuffer);
h0.combine(val);
mutationBuffer.position(position);
mutationBuffer.put(h0.toBytesFull(false));
}
public FixedBucketsHistogram get(ByteBuffer buf, int position)
{
ByteBuffer mutationBuffer = buf.duplicate();
mutationBuffer.position(position);
return FixedBucketsHistogram.fromByteBufferFullNoSerdeHeader(mutationBuffer);
}
public void put(ByteBuffer buf, int position, FixedBucketsHistogram histogram)
{
ByteBuffer mutationBuffer = buf.duplicate();
mutationBuffer.position(position);
mutationBuffer.put(histogram.toBytesFull(false));
}
}

View File

@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.histogram;
import org.apache.druid.query.aggregation.VectorAggregator;
import org.apache.druid.segment.vector.VectorValueSelector;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
public class FixedBucketsHistogramVectorAggregator implements VectorAggregator
{
private final VectorValueSelector selector;
private final FixedBucketsHistogramBufferAggregatorHelper innerAggregator;
public FixedBucketsHistogramVectorAggregator(
VectorValueSelector selector,
double lowerLimit,
double upperLimit,
int numBuckets,
FixedBucketsHistogram.OutlierHandlingMode outlierHandlingMode
)
{
this.selector = selector;
this.innerAggregator = new FixedBucketsHistogramBufferAggregatorHelper(
lowerLimit,
upperLimit,
numBuckets,
outlierHandlingMode
);
}
@Override
public void init(ByteBuffer buf, int position)
{
innerAggregator.init(buf, position);
}
@Override
public void aggregate(ByteBuffer buf, int position, int startRow, int endRow)
{
double[] vector = selector.getDoubleVector();
boolean[] isNull = selector.getNullVector();
FixedBucketsHistogram histogram = innerAggregator.get(buf, position);
for (int i = startRow; i < endRow; i++) {
histogram.combine(toObject(vector, isNull, i));
}
innerAggregator.put(buf, position, histogram);
}
@Override
public void aggregate(ByteBuffer buf, int numRows, int[] positions, @Nullable int[] rows, int positionOffset)
{
double[] vector = selector.getDoubleVector();
boolean[] isNull = selector.getNullVector();
for (int i = 0; i < numRows; i++) {
int position = positions[i] + positionOffset;
int index = rows != null ? rows[i] : i;
Double val = toObject(vector, isNull, index);
innerAggregator.aggregate(buf, position, val);
}
}
@Nullable
@Override
public Object get(ByteBuffer buf, int position)
{
return innerAggregator.get(buf, position);
}
@Override
public void close()
{
// Nothing to close
}
@Nullable
private Double toObject(double[] vector, @Nullable boolean[] isNull, int index)
{
return (isNull != null && isNull[index]) ? null : vector[index];
}
}

View File

@ -0,0 +1,143 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.histogram;
import org.apache.druid.query.aggregation.VectorAggregator;
import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
import org.apache.druid.segment.vector.VectorObjectSelector;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.nio.ByteBuffer;
import static org.easymock.EasyMock.createMock;
import static org.easymock.EasyMock.expect;
public class ApproximateHistogramFoldingVectorAggregatorTest
{
private static final float[] FLOATS = {23, 19, 10, 16, 36, 2, 9, 32, 30, 45};
private VectorColumnSelectorFactory vectorColumnSelectorFactory;
private ApproximateHistogram h1;
private ApproximateHistogram h2;
@Before
public void setup()
{
h1 = new ApproximateHistogram(5);
h2 = new ApproximateHistogram(5);
for (int i = 0; i < 5; ++i) {
h1.offer(FLOATS[i]);
}
for (int i = 5; i < FLOATS.length; ++i) {
h2.offer(FLOATS[i]);
}
VectorObjectSelector vectorObjectSelector = createMock(VectorObjectSelector.class);
expect(vectorObjectSelector.getObjectVector()).andReturn(new Object[]{h1, null, h2, null}).anyTimes();
EasyMock.replay(vectorObjectSelector);
vectorColumnSelectorFactory = createMock(VectorColumnSelectorFactory.class);
expect(vectorColumnSelectorFactory.makeObjectSelector("field"))
.andReturn(vectorObjectSelector).anyTimes();
expect(vectorColumnSelectorFactory.getColumnCapabilities("field")).andReturn(
new ColumnCapabilitiesImpl().setType(ValueType.COMPLEX)
);
expect(vectorColumnSelectorFactory.getColumnCapabilities("string_field")).andReturn(
new ColumnCapabilitiesImpl().setType(ValueType.STRING)
);
expect(vectorColumnSelectorFactory.getColumnCapabilities("double_field")).andReturn(
new ColumnCapabilitiesImpl().setType(ValueType.STRING)
);
EasyMock.replay(vectorColumnSelectorFactory);
}
@Test
public void doNotVectorizedNonComplexTypes()
{
ApproximateHistogramFoldingAggregatorFactory factory = buildHistogramFactory("string_field");
Assert.assertFalse(factory.canVectorize(vectorColumnSelectorFactory));
factory = buildHistogramFactory("double_field");
Assert.assertFalse(factory.canVectorize(vectorColumnSelectorFactory));
}
@Test
public void testAggregateSinglePosition()
{
ApproximateHistogramFoldingAggregatorFactory factory = buildHistogramFactory();
ByteBuffer byteBuffer = ByteBuffer.allocate(factory.getMaxIntermediateSize());
Assert.assertTrue(factory.canVectorize(vectorColumnSelectorFactory));
VectorAggregator vectorAggregator = factory.factorizeVector(vectorColumnSelectorFactory);
vectorAggregator.init(byteBuffer, 0);
vectorAggregator.aggregate(byteBuffer, 0, 0, 4);
ApproximateHistogram h = (ApproximateHistogram) vectorAggregator.get(byteBuffer, 0);
Assert.assertArrayEquals(new float[]{19.6f, 45.0f}, h.positions(), 0.1f);
Assert.assertArrayEquals(new long[]{9, 1}, h.bins());
Assert.assertEquals(10, h.count());
Assert.assertEquals(2.0f, h.min(), 0.1f);
Assert.assertEquals(45.0f, h.max(), 0.1f);
}
@Test
public void testAggregateMultiPositions()
{
ApproximateHistogramFoldingAggregatorFactory factory = buildHistogramFactory();
ByteBuffer byteBuffer = ByteBuffer.allocate(factory.getMaxIntermediateSize() * 2);
int[] positions = new int[]{0, factory.getMaxIntermediateSize()};
VectorAggregator vectorAggregator = factory.factorizeVector(vectorColumnSelectorFactory);
vectorAggregator.init(byteBuffer, 0);
vectorAggregator.init(byteBuffer, positions[1]);
vectorAggregator.aggregate(byteBuffer, 2, positions, null, 0);
vectorAggregator.aggregate(byteBuffer, 2, positions, new int[]{1, 2}, 0); // indirection
ApproximateHistogram actualH1 = (ApproximateHistogram) vectorAggregator.get(byteBuffer, 0);
ApproximateHistogram actualH2 = (ApproximateHistogram) vectorAggregator.get(byteBuffer, positions[1]);
Assert.assertEquals(actualH1, h1);
Assert.assertEquals(actualH2, h2);
}
private ApproximateHistogramFoldingAggregatorFactory buildHistogramFactory()
{
return buildHistogramFactory("field");
}
private ApproximateHistogramFoldingAggregatorFactory buildHistogramFactory(String fieldName)
{
return new ApproximateHistogramFoldingAggregatorFactory(
"approximateHistoFold",
fieldName,
5,
5,
0f,
50.0f,
false
);
}
}

View File

@ -0,0 +1,152 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.histogram;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.query.aggregation.VectorAggregator;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
import org.apache.druid.segment.vector.VectorValueSelector;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.nio.ByteBuffer;
import static org.easymock.EasyMock.createMock;
import static org.easymock.EasyMock.expect;
public class ApproximateHistogramVectorAggregatorTest
{
private static final float[] FLOATS = {23, 19, 10, 16, 36, 2, 9, 32, 30, 45, 33}; // Last value is never included
private static final boolean[] NULL_VECTOR =
{false, false, false, false, false, false, false, false, false, false, true};
private VectorColumnSelectorFactory vectorColumnSelectorFactory;
@Before
public void setup()
{
NullHandling.initializeForTests();
VectorValueSelector vectorValueSelector_1 = createMock(VectorValueSelector.class);
expect(vectorValueSelector_1.getFloatVector()).andReturn(FLOATS).anyTimes();
expect(vectorValueSelector_1.getNullVector()).andReturn(NULL_VECTOR).anyTimes();
VectorValueSelector vectorValueSelector_2 = createMock(VectorValueSelector.class);
expect(vectorValueSelector_2.getFloatVector()).andReturn(FLOATS).anyTimes();
expect(vectorValueSelector_2.getNullVector()).andReturn(null).anyTimes();
EasyMock.replay(vectorValueSelector_1);
EasyMock.replay(vectorValueSelector_2);
ColumnCapabilities columnCapabilities
= ColumnCapabilitiesImpl.createSimpleNumericColumnCapabilities(ValueType.DOUBLE);
vectorColumnSelectorFactory = createMock(VectorColumnSelectorFactory.class);
expect(vectorColumnSelectorFactory.getColumnCapabilities("field_1")).andReturn(columnCapabilities).anyTimes();
expect(vectorColumnSelectorFactory.makeValueSelector("field_1"))
.andReturn(vectorValueSelector_1).anyTimes();
expect(vectorColumnSelectorFactory.getColumnCapabilities("field_2")).andReturn(columnCapabilities).anyTimes();
expect(vectorColumnSelectorFactory.makeValueSelector("field_2"))
.andReturn(vectorValueSelector_2).anyTimes();
expect(vectorColumnSelectorFactory.getColumnCapabilities("string_field")).andReturn(
new ColumnCapabilitiesImpl().setType(ValueType.STRING)
);
expect(vectorColumnSelectorFactory.getColumnCapabilities("complex_field")).andReturn(
new ColumnCapabilitiesImpl().setType(ValueType.COMPLEX)
);
EasyMock.replay(vectorColumnSelectorFactory);
}
@Test
public void doNotVectorizedNonNumericTypes()
{
ApproximateHistogramAggregatorFactory factory = buildHistogramAggFactory("string_field");
Assert.assertFalse(factory.canVectorize(vectorColumnSelectorFactory));
factory = buildHistogramAggFactory("complex_field");
Assert.assertFalse(factory.canVectorize(vectorColumnSelectorFactory));
}
@Test
public void testAggregateSinglePosition()
{
ApproximateHistogramAggregatorFactory factory = buildHistogramAggFactory("field_1");
ByteBuffer byteBuffer = ByteBuffer.allocate(factory.getMaxIntermediateSizeWithNulls());
Assert.assertTrue(factory.canVectorize(vectorColumnSelectorFactory));
VectorAggregator vectorAggregator = factory.factorizeVector(vectorColumnSelectorFactory);
vectorAggregator.init(byteBuffer, 0);
vectorAggregator.aggregate(byteBuffer, 0, 0, 11);
ApproximateHistogram h = (ApproximateHistogram) vectorAggregator.get(byteBuffer, 0);
// (2, 1), (9.5, 2), (19.33, 3), (32.67, 3), (45, 1)
Assert.assertArrayEquals(new float[]{2, 9.5f, 19.33f, 32.67f, 45f}, h.positions(), 0.1f);
Assert.assertArrayEquals(new long[]{1, 2, 3, 3, 1}, h.bins());
factory = buildHistogramAggFactory("field_2");
vectorAggregator = factory.factorizeVector(vectorColumnSelectorFactory);
vectorAggregator.init(byteBuffer, 0);
vectorAggregator.aggregate(byteBuffer, 0, 0, 10);
h = (ApproximateHistogram) vectorAggregator.get(byteBuffer, 0);
Assert.assertArrayEquals(new float[]{2, 9.5f, 19.33f, 32.67f, 45f}, h.positions(), 0.1f);
Assert.assertArrayEquals(new long[]{1, 2, 3, 3, 1}, h.bins());
}
@Test
public void testAggregateMultiPositions()
{
ApproximateHistogramAggregatorFactory factory = buildHistogramAggFactory("field_2");
int size = factory.getMaxIntermediateSize();
ByteBuffer byteBuffer = ByteBuffer.allocate(size * 2);
VectorAggregator vectorAggregator = factory.factorizeVector(vectorColumnSelectorFactory);
int[] positions = new int[]{0, size};
vectorAggregator.init(byteBuffer, positions[0]);
vectorAggregator.init(byteBuffer, positions[1]);
vectorAggregator.aggregate(byteBuffer, 2, positions, null, 0);
// Put rest of 10 elements using the access indirection. Second vector gets the same element always
for (int i = 1; i < 10; i++) {
vectorAggregator.aggregate(byteBuffer, 2, positions, new int[]{i, 1}, 0);
}
ApproximateHistogram h0 = (ApproximateHistogram) vectorAggregator.get(byteBuffer, 0);
Assert.assertArrayEquals(new float[]{2, 9.5f, 19.33f, 32.67f, 45f}, h0.positions(), 0.1f);
Assert.assertArrayEquals(new long[]{1, 2, 3, 3, 1}, h0.bins());
ApproximateHistogram h2 = (ApproximateHistogram) vectorAggregator.get(byteBuffer, size);
Assert.assertArrayEquals(new float[]{19}, h2.positions(), 0.1f);
Assert.assertArrayEquals(new long[]{10}, h2.bins());
}
private ApproximateHistogramAggregatorFactory buildHistogramAggFactory(String fieldName)
{
return new ApproximateHistogramAggregatorFactory(
"approxHisto",
fieldName,
5,
5,
0.0f,
45.0f,
false
);
}
}

View File

@ -1242,6 +1242,94 @@ public class FixedBucketsHistogramTest
Assert.assertEquals(0, hIgnore.getUpperOutlierCount()); Assert.assertEquals(0, hIgnore.getUpperOutlierCount());
} }
@Test
public void testCombineBase64()
{
FixedBucketsHistogram h = buildHistogram(
0,
20,
5,
FixedBucketsHistogram.OutlierHandlingMode.OVERFLOW,
new float[]{1, 2, 7, 12, 18}
);
FixedBucketsHistogram h2 = buildHistogram(
0,
20,
7,
FixedBucketsHistogram.OutlierHandlingMode.OVERFLOW,
new float[]{3, 8, 9, 19, 99, -50}
);
h.combine(h2.toBase64());
Assert.assertEquals(5, h.getNumBuckets());
Assert.assertEquals(4.0, h.getBucketSize(), 0.01);
Assert.assertEquals(0, h.getLowerLimit(), 0.01);
Assert.assertEquals(20, h.getUpperLimit(), 0.01);
Assert.assertEquals(FixedBucketsHistogram.OutlierHandlingMode.OVERFLOW, h.getOutlierHandlingMode());
Assert.assertArrayEquals(new long[]{2, 3, 1, 1, 2}, h.getHistogram());
Assert.assertEquals(9, h.getCount());
Assert.assertEquals(1, h.getMin(), 0.01);
Assert.assertEquals(18, h.getMax(), 0.01);
Assert.assertEquals(0, h.getMissingValueCount());
Assert.assertEquals(1, h.getLowerOutlierCount());
Assert.assertEquals(1, h.getUpperOutlierCount());
}
@Test
public void testCombineAnotherHistogram()
{
FixedBucketsHistogram h = buildHistogram(
0,
20,
5,
FixedBucketsHistogram.OutlierHandlingMode.OVERFLOW,
new float[]{1, 2, 7, 12, 18}
);
FixedBucketsHistogram h2 = buildHistogram(
0,
20,
7,
FixedBucketsHistogram.OutlierHandlingMode.OVERFLOW,
new float[]{3, 8, 9, 19, 99, -50}
);
h.combine(h2);
Assert.assertEquals(5, h.getNumBuckets());
Assert.assertEquals(4.0, h.getBucketSize(), 0.01);
Assert.assertEquals(0, h.getLowerLimit(), 0.01);
Assert.assertEquals(20, h.getUpperLimit(), 0.01);
Assert.assertEquals(FixedBucketsHistogram.OutlierHandlingMode.OVERFLOW, h.getOutlierHandlingMode());
Assert.assertArrayEquals(new long[]{2, 3, 1, 1, 2}, h.getHistogram());
Assert.assertEquals(9, h.getCount());
Assert.assertEquals(1, h.getMin(), 0.01);
Assert.assertEquals(18, h.getMax(), 0.01);
Assert.assertEquals(0, h.getMissingValueCount());
Assert.assertEquals(1, h.getLowerOutlierCount());
Assert.assertEquals(1, h.getUpperOutlierCount());
}
@Test
public void testCombineNumber()
{
FixedBucketsHistogram h = new FixedBucketsHistogram(
0,
200,
200,
FixedBucketsHistogram.OutlierHandlingMode.IGNORE
);
h.combine(10);
h.combine(20);
Assert.assertEquals(0, h.getUpperOutlierCount());
Assert.assertEquals(0, h.getLowerOutlierCount());
Assert.assertEquals(2, h.getCount());
Assert.assertEquals(10, h.getMin(), 0.01);
Assert.assertEquals(20, h.getMax(), 0.01);
}
@Test @Test
public void testMissing() public void testMissing()
{ {

View File

@ -0,0 +1,209 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.histogram;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.query.aggregation.VectorAggregator;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
import org.apache.druid.segment.vector.VectorValueSelector;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.nio.ByteBuffer;
import static org.easymock.EasyMock.createMock;
import static org.easymock.EasyMock.expect;
public class FixedBucketsHistogramVectorAggregatorTest
{
private static final double[] DOUBLES = {1.0, 12.0, 3.0, 14.0, 15.0, 16.0};
private static final boolean[] NULL_VECTOR = {false, false, false, false, true, false};
private VectorColumnSelectorFactory vectorColumnSelectorFactory;
@Before
public void setup()
{
NullHandling.initializeForTests();
VectorValueSelector vectorValueSelector_1 = createMock(VectorValueSelector.class);
expect(vectorValueSelector_1.getDoubleVector()).andReturn(DOUBLES).anyTimes();
expect(vectorValueSelector_1.getNullVector()).andReturn(NULL_VECTOR).anyTimes();
VectorValueSelector vectorValueSelector_2 = createMock(VectorValueSelector.class);
expect(vectorValueSelector_2.getDoubleVector()).andReturn(DOUBLES).anyTimes();
expect(vectorValueSelector_2.getNullVector()).andReturn(null).anyTimes();
EasyMock.replay(vectorValueSelector_1);
EasyMock.replay(vectorValueSelector_2);
ColumnCapabilities columnCapabilities
= ColumnCapabilitiesImpl.createSimpleNumericColumnCapabilities(ValueType.DOUBLE);
vectorColumnSelectorFactory = createMock(VectorColumnSelectorFactory.class);
expect(vectorColumnSelectorFactory.getColumnCapabilities("field_1")).andReturn(columnCapabilities).anyTimes();
expect(vectorColumnSelectorFactory.makeValueSelector("field_1"))
.andReturn(vectorValueSelector_1).anyTimes();
expect(vectorColumnSelectorFactory.getColumnCapabilities("field_2")).andReturn(columnCapabilities).anyTimes();
expect(vectorColumnSelectorFactory.makeValueSelector("field_2"))
.andReturn(vectorValueSelector_2).anyTimes();
EasyMock.replay(vectorColumnSelectorFactory);
}
@Test
public void testAggregateSinglePosition()
{
ByteBuffer byteBuffer = ByteBuffer.allocate(FixedBucketsHistogram.getFullStorageSize(2));
FixedBucketsHistogramAggregatorFactory factory = buildHistogramAggFactory("field_1");
Assert.assertTrue(factory.canVectorize(vectorColumnSelectorFactory));
VectorAggregator vectorAggregator = factory.factorizeVector(vectorColumnSelectorFactory);
vectorAggregator.init(byteBuffer, 0);
vectorAggregator.aggregate(byteBuffer, 0, 0, 6);
FixedBucketsHistogram h = (FixedBucketsHistogram) vectorAggregator.get(byteBuffer, 0);
Assert.assertEquals(2, h.getNumBuckets());
Assert.assertEquals(10.0, h.getBucketSize(), 0.01);
Assert.assertEquals(1, h.getLowerLimit(), 0.01);
Assert.assertEquals(21, h.getUpperLimit(), 0.01);
Assert.assertEquals(FixedBucketsHistogram.OutlierHandlingMode.OVERFLOW, h.getOutlierHandlingMode());
Assert.assertArrayEquals(new long[]{2, 3}, h.getHistogram());
Assert.assertEquals(5, h.getCount());
Assert.assertEquals(1.0, h.getMin(), 0.01);
Assert.assertEquals(16.0, h.getMax(), 0.01);
// Default value of null is 0 which is an outlier.
Assert.assertEquals(NullHandling.replaceWithDefault() ? 0 : 1, h.getMissingValueCount());
Assert.assertEquals(NullHandling.replaceWithDefault() ? 1 : 0, h.getLowerOutlierCount());
Assert.assertEquals(0, h.getUpperOutlierCount());
factory = buildHistogramAggFactory("field_2");
vectorAggregator = factory.factorizeVector(vectorColumnSelectorFactory);
vectorAggregator.init(byteBuffer, 0);
vectorAggregator.aggregate(byteBuffer, 0, 0, 6);
h = (FixedBucketsHistogram) vectorAggregator.get(byteBuffer, 0);
Assert.assertEquals(2, h.getNumBuckets());
Assert.assertEquals(10.0, h.getBucketSize(), 0.01);
Assert.assertEquals(1, h.getLowerLimit(), 0.01);
Assert.assertEquals(21, h.getUpperLimit(), 0.01);
Assert.assertEquals(FixedBucketsHistogram.OutlierHandlingMode.OVERFLOW, h.getOutlierHandlingMode());
Assert.assertArrayEquals(new long[]{2, 4}, h.getHistogram());
Assert.assertEquals(6, h.getCount());
Assert.assertEquals(1.0, h.getMin(), 0.01);
Assert.assertEquals(16.0, h.getMax(), 0.01);
Assert.assertEquals(0, h.getMissingValueCount());
Assert.assertEquals(0, h.getLowerOutlierCount());
Assert.assertEquals(0, h.getUpperOutlierCount());
}
@Test
public void testAggregateMultiPositions()
{
int size = FixedBucketsHistogram.getFullStorageSize(2);
ByteBuffer byteBuffer = ByteBuffer.allocate(size * 2);
FixedBucketsHistogramAggregatorFactory factory = buildHistogramAggFactory("field_2");
VectorAggregator vectorAggregator = factory.factorizeVector(vectorColumnSelectorFactory);
int[] positions = new int[]{0, size};
vectorAggregator.init(byteBuffer, positions[0]);
vectorAggregator.init(byteBuffer, positions[1]);
vectorAggregator.aggregate(byteBuffer, 2, positions, null, 0);
FixedBucketsHistogram h0 = (FixedBucketsHistogram) vectorAggregator.get(byteBuffer, 0);
Assert.assertEquals(2, h0.getNumBuckets());
Assert.assertEquals(10.0, h0.getBucketSize(), 0.01);
Assert.assertEquals(1, h0.getLowerLimit(), 0.01);
Assert.assertEquals(21, h0.getUpperLimit(), 0.01);
Assert.assertEquals(FixedBucketsHistogram.OutlierHandlingMode.OVERFLOW, h0.getOutlierHandlingMode());
Assert.assertArrayEquals(new long[]{1, 0}, h0.getHistogram());
Assert.assertEquals(1, h0.getCount());
Assert.assertEquals(1.0, h0.getMin(), 0.01);
Assert.assertEquals(1.0, h0.getMax(), 0.01);
Assert.assertEquals(0, h0.getMissingValueCount());
Assert.assertEquals(0, h0.getLowerOutlierCount());
Assert.assertEquals(0, h0.getUpperOutlierCount());
FixedBucketsHistogram h1 = (FixedBucketsHistogram) vectorAggregator.get(byteBuffer, positions[1]);
Assert.assertEquals(2, h1.getNumBuckets());
Assert.assertEquals(10.0, h1.getBucketSize(), 0.01);
Assert.assertEquals(1, h1.getLowerLimit(), 0.01);
Assert.assertEquals(21, h1.getUpperLimit(), 0.01);
Assert.assertEquals(FixedBucketsHistogram.OutlierHandlingMode.OVERFLOW, h1.getOutlierHandlingMode());
Assert.assertArrayEquals(new long[]{0, 1}, h1.getHistogram());
Assert.assertEquals(1, h1.getCount());
Assert.assertEquals(12.0, h1.getMin(), 0.01);
Assert.assertEquals(12.0, h1.getMax(), 0.01);
Assert.assertEquals(0, h1.getMissingValueCount());
Assert.assertEquals(0, h1.getLowerOutlierCount());
Assert.assertEquals(0, h1.getUpperOutlierCount());
// Tests when there is a level of indirection in accessing the vector
byteBuffer = ByteBuffer.allocate(size * 2);
vectorAggregator.init(byteBuffer, positions[0]);
vectorAggregator.init(byteBuffer, positions[1]);
vectorAggregator.aggregate(byteBuffer, 2, positions, new int[]{2, 3}, 0);
FixedBucketsHistogram h2 = (FixedBucketsHistogram) vectorAggregator.get(byteBuffer, 0);
Assert.assertEquals(2, h2.getNumBuckets());
Assert.assertEquals(10.0, h2.getBucketSize(), 0.01);
Assert.assertEquals(1, h2.getLowerLimit(), 0.01);
Assert.assertEquals(21, h2.getUpperLimit(), 0.01);
Assert.assertEquals(FixedBucketsHistogram.OutlierHandlingMode.OVERFLOW, h2.getOutlierHandlingMode());
Assert.assertArrayEquals(new long[]{1, 0}, h2.getHistogram());
Assert.assertEquals(1, h2.getCount());
Assert.assertEquals(3.0, h2.getMin(), 0.01);
Assert.assertEquals(3.0, h2.getMax(), 0.01);
Assert.assertEquals(0, h2.getMissingValueCount());
Assert.assertEquals(0, h2.getLowerOutlierCount());
Assert.assertEquals(0, h2.getUpperOutlierCount());
FixedBucketsHistogram h3 = (FixedBucketsHistogram) vectorAggregator.get(byteBuffer, positions[1]);
Assert.assertEquals(2, h3.getNumBuckets());
Assert.assertEquals(10.0, h3.getBucketSize(), 0.01);
Assert.assertEquals(1, h3.getLowerLimit(), 0.01);
Assert.assertEquals(21, h3.getUpperLimit(), 0.01);
Assert.assertEquals(FixedBucketsHistogram.OutlierHandlingMode.OVERFLOW, h3.getOutlierHandlingMode());
Assert.assertArrayEquals(new long[]{0, 1}, h3.getHistogram());
Assert.assertEquals(1, h3.getCount());
Assert.assertEquals(14.0, h3.getMin(), 0.01);
Assert.assertEquals(14.0, h3.getMax(), 0.01);
Assert.assertEquals(0, h3.getMissingValueCount());
Assert.assertEquals(0, h3.getLowerOutlierCount());
Assert.assertEquals(0, h3.getUpperOutlierCount());
}
private FixedBucketsHistogramAggregatorFactory buildHistogramAggFactory(String fieldName)
{
return new FixedBucketsHistogramAggregatorFactory(
"fixedHisto",
fieldName,
2,
1,
21,
FixedBucketsHistogram.OutlierHandlingMode.OVERFLOW,
false
);
}
}

View File

@ -48,6 +48,7 @@ import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.segment.IndexBuilder; import org.apache.druid.segment.IndexBuilder;
import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.column.ValueType; import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.incremental.IncrementalIndexSchema; import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.virtual.ExpressionVirtualColumn; import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
@ -120,6 +121,7 @@ public class QuantileSqlAggregatorTest extends CalciteTestBase
ApproximateHistogramDruidModule.registerSerde(); ApproximateHistogramDruidModule.registerSerde();
for (Module mod : new ApproximateHistogramDruidModule().getJacksonModules()) { for (Module mod : new ApproximateHistogramDruidModule().getJacksonModules()) {
CalciteTests.getJsonMapper().registerModule(mod); CalciteTests.getJsonMapper().registerModule(mod);
TestHelper.JSON_MAPPER.registerModule(mod);
} }
final QueryableIndex index = IndexBuilder.create() final QueryableIndex index = IndexBuilder.create()

View File

@ -596,6 +596,7 @@ url
- ../docs/development/extensions-core/approximate-histograms.md - ../docs/development/extensions-core/approximate-histograms.md
approxHistogram approxHistogram
approxHistogramFold approxHistogramFold
fixedBucketsHistogram
bucketNum bucketNum
lowerLimit lowerLimit
numBuckets numBuckets