diff --git a/docs/querying/query-context.md b/docs/querying/query-context.md index 12d51c9e45a..a937d84bba4 100644 --- a/docs/querying/query-context.md +++ b/docs/querying/query-context.md @@ -90,7 +90,8 @@ requirements: include "selector", "bound", "in", "like", "regex", "search", "and", "or", and "not". - All filters in filtered aggregators must offer vectorized row-matchers. - All aggregators must offer vectorized implementations. These include "count", "doubleSum", "floatSum", "longSum", "longMin", - "longMax", "doubleMin", "doubleMax", "floatMin", "floatMax", "hyperUnique", and "filtered". + "longMax", "doubleMin", "doubleMax", "floatMin", "floatMax", "hyperUnique", "filtered", "approxHistogram", + "approxHistogramFold", and "fixedBucketsHistogram" (with numerical input). - No virtual columns. - For GroupBy: All dimension specs must be "default" (no extraction functions or filtered dimension specs). - For GroupBy: No multi-value dimensions. diff --git a/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramAggregatorFactory.java b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramAggregatorFactory.java index c961f504ab3..cde33fd605f 100644 --- a/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramAggregatorFactory.java +++ b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramAggregatorFactory.java @@ -32,10 +32,14 @@ import org.apache.druid.query.aggregation.AggregatorFactoryNotMergeableException import org.apache.druid.query.aggregation.AggregatorUtil; import org.apache.druid.query.aggregation.BufferAggregator; import org.apache.druid.query.aggregation.ObjectAggregateCombiner; +import org.apache.druid.query.aggregation.VectorAggregator; import org.apache.druid.query.cache.CacheKeyBuilder; +import org.apache.druid.segment.ColumnInspector; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.ColumnValueSelector; +import org.apache.druid.segment.column.ColumnCapabilities; import org.apache.druid.segment.column.ValueType; +import org.apache.druid.segment.vector.VectorColumnSelectorFactory; import javax.annotation.Nullable; import java.nio.ByteBuffer; @@ -103,6 +107,24 @@ public class ApproximateHistogramAggregatorFactory extends AggregatorFactory ); } + @Override + public VectorAggregator factorizeVector(VectorColumnSelectorFactory metricVectorFactory) + { + return new ApproximateHistogramVectorAggregator( + metricVectorFactory.makeValueSelector(fieldName), + resolution + ); + } + + @Override + public boolean canVectorize(ColumnInspector columnInspector) + { + /* skip vectorization for string types which may be parseable to numbers. There is no vector equivalent of + string value selector*/ + ColumnCapabilities capabilities = columnInspector.getColumnCapabilities(fieldName); + return (capabilities != null) && capabilities.getType().isNumeric(); + } + @Override public Comparator getComparator() { diff --git a/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramBufferAggregator.java b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramBufferAggregator.java index 4390a63ac60..a33d27baaed 100644 --- a/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramBufferAggregator.java +++ b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramBufferAggregator.java @@ -28,54 +28,30 @@ import java.nio.ByteBuffer; public class ApproximateHistogramBufferAggregator implements BufferAggregator { private final BaseFloatColumnValueSelector selector; - private final int resolution; + private final ApproximateHistogramBufferAggregatorHelper innerAggregator; public ApproximateHistogramBufferAggregator(BaseFloatColumnValueSelector selector, int resolution) { this.selector = selector; - this.resolution = resolution; + this.innerAggregator = new ApproximateHistogramBufferAggregatorHelper(resolution); } @Override public void init(ByteBuffer buf, int position) { - ByteBuffer mutationBuffer = buf.duplicate(); - mutationBuffer.position(position); - - mutationBuffer.putInt(resolution); - mutationBuffer.putInt(0); //initial binCount - for (int i = 0; i < resolution; ++i) { - mutationBuffer.putFloat(0f); - } - for (int i = 0; i < resolution; ++i) { - mutationBuffer.putLong(0L); - } - - // min - mutationBuffer.putFloat(Float.POSITIVE_INFINITY); - // max - mutationBuffer.putFloat(Float.NEGATIVE_INFINITY); + innerAggregator.init(buf, position); } @Override public void aggregate(ByteBuffer buf, int position) { - ByteBuffer mutationBuffer = buf.duplicate(); - mutationBuffer.position(position); - - ApproximateHistogram h0 = ApproximateHistogram.fromBytesDense(mutationBuffer); - h0.offer(selector.getFloat()); - - mutationBuffer.position(position); - h0.toBytesDense(mutationBuffer); + innerAggregator.aggregate(buf, position, selector.getFloat()); } @Override public Object get(ByteBuffer buf, int position) { - ByteBuffer mutationBuffer = buf.duplicate(); - mutationBuffer.position(position); - return ApproximateHistogram.fromBytes(mutationBuffer); + return innerAggregator.get(buf, position); } @Override diff --git a/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramBufferAggregatorHelper.java b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramBufferAggregatorHelper.java new file mode 100644 index 00000000000..b597c4d66d6 --- /dev/null +++ b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramBufferAggregatorHelper.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.histogram; + +import java.nio.ByteBuffer; + +/** + * A helper class used by {@link ApproximateHistogramBufferAggregator} and {@link ApproximateHistogramVectorAggregator} + * for aggregation operations on byte buffers. Getting the object from value selectors is outside this class. + */ +final class ApproximateHistogramBufferAggregatorHelper +{ + private final int resolution; + + public ApproximateHistogramBufferAggregatorHelper(int resolution) + { + this.resolution = resolution; + } + + public void init(final ByteBuffer buf, final int position) + { + ApproximateHistogram histogram = new ApproximateHistogram(resolution); + ByteBuffer mutationBuffer = buf.duplicate(); + mutationBuffer.position(position); + histogram.toBytesDense(mutationBuffer); + } + + public ApproximateHistogram get(final ByteBuffer buf, final int position) + { + ByteBuffer mutationBuffer = buf.duplicate(); + mutationBuffer.position(position); + return ApproximateHistogram.fromBytesDense(mutationBuffer); + } + + public void put(final ByteBuffer buf, final int position, final ApproximateHistogram histogram) + { + ByteBuffer mutationBuffer = buf.duplicate(); + mutationBuffer.position(position); + histogram.toBytesDense(mutationBuffer); + } + + public void aggregate(final ByteBuffer buf, final int position, final float value) + { + ByteBuffer mutationBuffer = buf.duplicate(); + mutationBuffer.position(position); + + ApproximateHistogram h0 = ApproximateHistogram.fromBytesDense(mutationBuffer); + h0.offer(value); + + mutationBuffer.position(position); + h0.toBytesDense(mutationBuffer); + } +} diff --git a/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramFoldingAggregatorFactory.java b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramFoldingAggregatorFactory.java index becbfe1ae64..7a8a80cc21b 100644 --- a/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramFoldingAggregatorFactory.java +++ b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramFoldingAggregatorFactory.java @@ -27,9 +27,15 @@ import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.AggregatorUtil; import org.apache.druid.query.aggregation.BufferAggregator; +import org.apache.druid.query.aggregation.VectorAggregator; import org.apache.druid.query.cache.CacheKeyBuilder; +import org.apache.druid.segment.ColumnInspector; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.ColumnValueSelector; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.column.ValueType; +import org.apache.druid.segment.vector.VectorColumnSelectorFactory; +import org.apache.druid.segment.vector.VectorObjectSelector; import javax.annotation.Nullable; import java.util.Objects; @@ -93,10 +99,33 @@ public class ApproximateHistogramFoldingAggregatorFactory extends ApproximateHis ); } + @Override + public VectorAggregator factorizeVector(VectorColumnSelectorFactory metricVectorFactory) + { + VectorObjectSelector selector = metricVectorFactory.makeObjectSelector(fieldName); + return new ApproximateHistogramFoldingVectorAggregator(selector, resolution, lowerLimit, upperLimit); + } + + + @Override + public boolean canVectorize(ColumnInspector columnInspector) + { + ColumnCapabilities capabilities = columnInspector.getColumnCapabilities(fieldName); + return (capabilities != null) && (capabilities.getType() == ValueType.COMPLEX); + } + @Override public AggregatorFactory getCombiningFactory() { - return new ApproximateHistogramFoldingAggregatorFactory(name, name, resolution, numBuckets, lowerLimit, upperLimit, finalizeAsBase64Binary); + return new ApproximateHistogramFoldingAggregatorFactory( + name, + name, + resolution, + numBuckets, + lowerLimit, + upperLimit, + finalizeAsBase64Binary + ); } @Override diff --git a/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramFoldingBufferAggregator.java b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramFoldingBufferAggregator.java index bd9d0cd39aa..811e2bfb00f 100644 --- a/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramFoldingBufferAggregator.java +++ b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramFoldingBufferAggregator.java @@ -28,12 +28,7 @@ import java.nio.ByteBuffer; public class ApproximateHistogramFoldingBufferAggregator implements BufferAggregator { private final BaseObjectColumnValueSelector selector; - private final int resolution; - private final float upperLimit; - private final float lowerLimit; - - private float[] tmpBufferP; - private long[] tmpBufferB; + private final ApproximateHistogramFoldingBufferAggregatorHelper innerAggregator; public ApproximateHistogramFoldingBufferAggregator( BaseObjectColumnValueSelector selector, @@ -43,50 +38,26 @@ public class ApproximateHistogramFoldingBufferAggregator implements BufferAggreg ) { this.selector = selector; - this.resolution = resolution; - this.lowerLimit = lowerLimit; - this.upperLimit = upperLimit; - - tmpBufferP = new float[resolution]; - tmpBufferB = new long[resolution]; + this.innerAggregator = new ApproximateHistogramFoldingBufferAggregatorHelper(resolution, lowerLimit, upperLimit); } @Override public void init(ByteBuffer buf, int position) { - ApproximateHistogram h = new ApproximateHistogram(resolution, lowerLimit, upperLimit); - - ByteBuffer mutationBuffer = buf.duplicate(); - mutationBuffer.position(position); - // use dense storage for aggregation - h.toBytesDense(mutationBuffer); + innerAggregator.init(buf, position); } @Override public void aggregate(ByteBuffer buf, int position) { ApproximateHistogram hNext = selector.getObject(); - if (hNext == null) { - return; - } - ByteBuffer mutationBuffer = buf.duplicate(); - mutationBuffer.position(position); - - ApproximateHistogram h0 = ApproximateHistogram.fromBytesDense(mutationBuffer); - h0.setLowerLimit(lowerLimit); - h0.setUpperLimit(upperLimit); - h0.foldFast(hNext, tmpBufferP, tmpBufferB); - - mutationBuffer.position(position); - h0.toBytesDense(mutationBuffer); + innerAggregator.aggregate(buf, position, hNext); } @Override public Object get(ByteBuffer buf, int position) { - ByteBuffer mutationBuffer = buf.asReadOnlyBuffer(); - mutationBuffer.position(position); - return ApproximateHistogram.fromBytesDense(mutationBuffer); + return innerAggregator.get(buf, position); } @Override @@ -106,6 +77,7 @@ public class ApproximateHistogramFoldingBufferAggregator implements BufferAggreg { throw new UnsupportedOperationException("ApproximateHistogramFoldingBufferAggregator does not support getDouble()"); } + @Override public void close() { diff --git a/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramFoldingBufferAggregatorHelper.java b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramFoldingBufferAggregatorHelper.java new file mode 100644 index 00000000000..64c6a4cc700 --- /dev/null +++ b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramFoldingBufferAggregatorHelper.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.histogram; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; + +/** + * A helper class used by {@link ApproximateHistogramFoldingBufferAggregator} and + * {@link ApproximateHistogramFoldingVectorAggregator} for aggregation operations on byte buffers. + * Getting the object from value selectors is outside this class. + */ +final class ApproximateHistogramFoldingBufferAggregatorHelper +{ + private final int resolution; + private final float upperLimit; + private final float lowerLimit; + + private float[] tmpBufferA; + private long[] tmpBufferB; + + public ApproximateHistogramFoldingBufferAggregatorHelper( + int resolution, + float lowerLimit, + float upperLimit + ) + { + this.resolution = resolution; + this.lowerLimit = lowerLimit; + this.upperLimit = upperLimit; + + tmpBufferA = new float[resolution]; + tmpBufferB = new long[resolution]; + } + + public void init(ByteBuffer buf, int position) + { + ApproximateHistogram h = new ApproximateHistogram(resolution, lowerLimit, upperLimit); + + ByteBuffer mutationBuffer = buf.duplicate(); + mutationBuffer.position(position); + // use dense storage for aggregation + h.toBytesDense(mutationBuffer); + } + + public void aggregate(ByteBuffer buf, int position, @Nullable ApproximateHistogram hNext) + { + if (hNext == null) { + return; + } + ByteBuffer mutationBuffer = buf.duplicate(); + mutationBuffer.position(position); + + ApproximateHistogram h0 = ApproximateHistogram.fromBytesDense(mutationBuffer); + foldFast(h0, hNext); + + mutationBuffer.position(position); + h0.toBytesDense(mutationBuffer); + } + + public void foldFast(ApproximateHistogram left, ApproximateHistogram right) + { + //These have to set in every call since limits are transient and lost during serialization-deserialization + left.setLowerLimit(lowerLimit); + left.setUpperLimit(upperLimit); + left.foldFast(right, tmpBufferA, tmpBufferB); + } + + public ApproximateHistogram get(ByteBuffer buf, int position) + { + ByteBuffer mutationBuffer = buf.asReadOnlyBuffer(); + mutationBuffer.position(position); + return ApproximateHistogram.fromBytesDense(mutationBuffer); + } + + public void put(ByteBuffer buf, int position, ApproximateHistogram histogram) + { + ByteBuffer mutationBuffer = buf.duplicate(); + mutationBuffer.position(position); + histogram.toBytesDense(mutationBuffer); + } +} diff --git a/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramFoldingVectorAggregator.java b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramFoldingVectorAggregator.java new file mode 100644 index 00000000000..69c6e596dcf --- /dev/null +++ b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramFoldingVectorAggregator.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.histogram; + +import org.apache.druid.query.aggregation.VectorAggregator; +import org.apache.druid.segment.vector.VectorObjectSelector; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; + +public class ApproximateHistogramFoldingVectorAggregator implements VectorAggregator +{ + private final ApproximateHistogramFoldingBufferAggregatorHelper innerAggregator; + private final VectorObjectSelector selector; + + public ApproximateHistogramFoldingVectorAggregator( + final VectorObjectSelector selector, + final int resolution, + final float lowerLimit, + final float upperLimit + ) + { + this.selector = selector; + this.innerAggregator = new ApproximateHistogramFoldingBufferAggregatorHelper(resolution, lowerLimit, upperLimit); + } + + @Override + public void init(ByteBuffer buf, int position) + { + innerAggregator.init(buf, position); + } + + @Override + public void aggregate(ByteBuffer buf, int position, int startRow, int endRow) + { + Object[] vector = selector.getObjectVector(); + ApproximateHistogram histogram = innerAggregator.get(buf, position); + for (int i = startRow; i < endRow; i++) { + ApproximateHistogram other = (ApproximateHistogram) vector[i]; + if (null != other) { + innerAggregator.foldFast(histogram, other); + } + } + innerAggregator.put(buf, position, histogram); + } + + @Override + public void aggregate(ByteBuffer buf, int numRows, int[] positions, @Nullable int[] rows, int positionOffset) + { + Object[] vector = selector.getObjectVector(); + for (int i = 0; i < numRows; i++) { + ApproximateHistogram other = (ApproximateHistogram) vector[null != rows ? rows[i] : i]; + if (null == other) { + continue; + } + int position = positions[i] + positionOffset; + innerAggregator.aggregate(buf, position, other); + } + } + + @Nullable + @Override + public Object get(ByteBuffer buf, int position) + { + return innerAggregator.get(buf, position); + } + + @Override + public void close() + { + // Nothing to close + } +} diff --git a/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramVectorAggregator.java b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramVectorAggregator.java new file mode 100644 index 00000000000..728271f89d6 --- /dev/null +++ b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramVectorAggregator.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.histogram; + +import org.apache.druid.query.aggregation.VectorAggregator; +import org.apache.druid.segment.vector.VectorValueSelector; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; + +public class ApproximateHistogramVectorAggregator implements VectorAggregator +{ + + private final VectorValueSelector selector; + private final ApproximateHistogramBufferAggregatorHelper innerAggregator; + + public ApproximateHistogramVectorAggregator( + VectorValueSelector selector, + int resolution + ) + { + this.selector = selector; + this.innerAggregator = new ApproximateHistogramBufferAggregatorHelper(resolution); + } + + @Override + public void init(final ByteBuffer buf, final int position) + { + innerAggregator.init(buf, position); + } + + @Override + public void aggregate(final ByteBuffer buf, final int position, final int startRow, final int endRow) + { + final boolean[] isValueNull = selector.getNullVector(); + final float[] vector = selector.getFloatVector(); + ApproximateHistogram histogram = innerAggregator.get(buf, position); + + for (int i = startRow; i < endRow; i++) { + if (isValueNull != null && isValueNull[i]) { + continue; + } + histogram.offer(vector[i]); + } + innerAggregator.put(buf, position, histogram); + } + + @Override + public Object get(ByteBuffer buf, int position) + { + return innerAggregator.get(buf, position); + } + + @Override + public void aggregate(ByteBuffer buf, int numRows, int[] positions, @Nullable int[] rows, int positionOffset) + { + final float[] vector = selector.getFloatVector(); + final boolean[] isValueNull = selector.getNullVector(); + + for (int i = 0; i < numRows; i++) { + if (isValueNull != null && isValueNull[i]) { + continue; + } + final int position = positions[i] + positionOffset; + innerAggregator.aggregate(buf, position, vector[rows != null ? rows[i] : i]); + } + } + + + @Override + public void close() + { + // no resources to cleanup + } +} diff --git a/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogram.java b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogram.java index f408dbfb857..e10203caf23 100644 --- a/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogram.java +++ b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogram.java @@ -25,9 +25,11 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonValue; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import org.apache.druid.common.config.NullHandling; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; +import javax.annotation.Nullable; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.Arrays; @@ -431,6 +433,34 @@ public class FixedBucketsHistogram } } + /** + * Merge another datapoint into this one. The other datapoint could be + * - base64 encoded string of {@code FixedBucketsHistogram} + * - {@code FixedBucketsHistogram} object + * - Numeric value + * + * @param val + */ + @VisibleForTesting + public void combine(@Nullable Object val) + { + if (val == null) { + if (NullHandling.replaceWithDefault()) { + add(NullHandling.defaultDoubleValue()); + } else { + incrementMissing(); + } + } else if (val instanceof String) { + combineHistogram(fromBase64((String) val)); + } else if (val instanceof FixedBucketsHistogram) { + combineHistogram((FixedBucketsHistogram) val); + } else if (val instanceof Number) { + add(((Number) val).doubleValue()); + } else { + throw new ISE("Unknown class for object: " + val.getClass()); + } + } + /** * Merge another histogram into this one. Only the state of this histogram is updated. * diff --git a/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogramAggregator.java b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogramAggregator.java index 2f5b5650893..eed1c9e7d38 100644 --- a/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogramAggregator.java +++ b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogramAggregator.java @@ -20,8 +20,6 @@ package org.apache.druid.query.aggregation.histogram; import com.google.common.primitives.Longs; -import org.apache.druid.common.config.NullHandling; -import org.apache.druid.java.util.common.ISE; import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.segment.BaseObjectColumnValueSelector; @@ -66,22 +64,7 @@ public class FixedBucketsHistogramAggregator implements Aggregator public void aggregate() { Object val = selector.getObject(); - - if (val == null) { - if (NullHandling.replaceWithDefault()) { - histogram.add(NullHandling.defaultDoubleValue()); - } else { - histogram.incrementMissing(); - } - } else if (val instanceof String) { - histogram.combineHistogram(FixedBucketsHistogram.fromBase64((String) val)); - } else if (val instanceof FixedBucketsHistogram) { - histogram.combineHistogram((FixedBucketsHistogram) val); - } else if (val instanceof Number) { - histogram.add(((Number) val).doubleValue()); - } else { - throw new ISE("Unknown class for object: " + val.getClass()); - } + histogram.combine(val); } @Nullable diff --git a/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogramAggregatorFactory.java b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogramAggregatorFactory.java index 7cd8de80ca6..705c420774b 100644 --- a/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogramAggregatorFactory.java +++ b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogramAggregatorFactory.java @@ -22,6 +22,7 @@ package org.apache.druid.query.aggregation.histogram; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; +import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.query.aggregation.AggregateCombiner; import org.apache.druid.query.aggregation.Aggregator; @@ -29,10 +30,14 @@ import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.AggregatorUtil; import org.apache.druid.query.aggregation.BufferAggregator; import org.apache.druid.query.aggregation.ObjectAggregateCombiner; +import org.apache.druid.query.aggregation.VectorAggregator; import org.apache.druid.query.cache.CacheKeyBuilder; +import org.apache.druid.segment.ColumnInspector; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.ColumnValueSelector; +import org.apache.druid.segment.column.ColumnCapabilities; import org.apache.druid.segment.column.ValueType; +import org.apache.druid.segment.vector.VectorColumnSelectorFactory; import javax.annotation.Nullable; import java.util.Collections; @@ -100,6 +105,34 @@ public class FixedBucketsHistogramAggregatorFactory extends AggregatorFactory ); } + @Override + public VectorAggregator factorizeVector(VectorColumnSelectorFactory columnSelectorFactory) + { + ColumnCapabilities capabilities = columnSelectorFactory.getColumnCapabilities(fieldName); + if (null == capabilities) { + throw new IAE("could not find the column type for column %s", fieldName); + } + ValueType type = capabilities.getType(); + if (type.isNumeric()) { + return new FixedBucketsHistogramVectorAggregator( + columnSelectorFactory.makeValueSelector(fieldName), + lowerLimit, + upperLimit, + numBuckets, + outlierHandlingMode + ); + } else { + throw new IAE("cannot vectorize fixed bucket histogram aggregation for type %s", type); + } + } + + @Override + public boolean canVectorize(ColumnInspector columnInspector) + { + ColumnCapabilities capabilities = columnInspector.getColumnCapabilities(fieldName); + return (capabilities != null) && capabilities.getType().isNumeric(); + } + @Override public Comparator getComparator() { diff --git a/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogramBufferAggregator.java b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogramBufferAggregator.java index 0e894d7a59d..1ecebb54577 100644 --- a/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogramBufferAggregator.java +++ b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogramBufferAggregator.java @@ -19,7 +19,6 @@ package org.apache.druid.query.aggregation.histogram; -import org.apache.druid.common.config.NullHandling; import org.apache.druid.query.aggregation.BufferAggregator; import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; import org.apache.druid.segment.BaseObjectColumnValueSelector; @@ -29,8 +28,7 @@ import java.nio.ByteBuffer; public class FixedBucketsHistogramBufferAggregator implements BufferAggregator { private final BaseObjectColumnValueSelector selector; - - private FixedBucketsHistogram histogram; + private final FixedBucketsHistogramBufferAggregatorHelper innerAggregator; public FixedBucketsHistogramBufferAggregator( BaseObjectColumnValueSelector selector, @@ -41,7 +39,7 @@ public class FixedBucketsHistogramBufferAggregator implements BufferAggregator ) { this.selector = selector; - this.histogram = new FixedBucketsHistogram( + this.innerAggregator = new FixedBucketsHistogramBufferAggregatorHelper( lowerLimit, upperLimit, numBuckets, @@ -52,45 +50,20 @@ public class FixedBucketsHistogramBufferAggregator implements BufferAggregator @Override public void init(ByteBuffer buf, int position) { - ByteBuffer mutationBuffer = buf.duplicate(); - mutationBuffer.position(position); - mutationBuffer.put(histogram.toBytesFull(false)); + innerAggregator.init(buf, position); } @Override public void aggregate(ByteBuffer buf, int position) { - ByteBuffer mutationBuffer = buf.duplicate(); - mutationBuffer.position(position); - - FixedBucketsHistogram h0 = FixedBucketsHistogram.fromByteBufferFullNoSerdeHeader(mutationBuffer); - Object val = selector.getObject(); - if (val == null) { - if (NullHandling.replaceWithDefault()) { - h0.incrementMissing(); - } else { - h0.add(NullHandling.defaultDoubleValue()); - } - } else if (val instanceof String) { - h0.combineHistogram(FixedBucketsHistogram.fromBase64((String) val)); - } else if (val instanceof FixedBucketsHistogram) { - h0.combineHistogram((FixedBucketsHistogram) val); - } else { - Double x = ((Number) val).doubleValue(); - h0.add(x); - } - - mutationBuffer.position(position); - mutationBuffer.put(h0.toBytesFull(false)); + innerAggregator.aggregate(buf, position, val); } @Override public Object get(ByteBuffer buf, int position) { - ByteBuffer mutationBuffer = buf.duplicate(); - mutationBuffer.position(position); - return FixedBucketsHistogram.fromByteBufferFullNoSerdeHeader(mutationBuffer); + return innerAggregator.get(buf, position); } @Override diff --git a/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogramBufferAggregatorHelper.java b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogramBufferAggregatorHelper.java new file mode 100644 index 00000000000..8844e25ae4c --- /dev/null +++ b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogramBufferAggregatorHelper.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.histogram; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; + +/** + * A helper class used by {@link FixedBucketsHistogramBufferAggregator} and + * {@link FixedBucketsHistogramVectorAggregator} for aggregation operations on byte buffers. + * Getting the object from value selectors is outside this class. + */ +final class FixedBucketsHistogramBufferAggregatorHelper +{ + private final double lowerLimit; + private final double upperLimit; + private final int numBuckets; + private final FixedBucketsHistogram.OutlierHandlingMode outlierHandlingMode; + + public FixedBucketsHistogramBufferAggregatorHelper( + double lowerLimit, + double upperLimit, + int numBuckets, + FixedBucketsHistogram.OutlierHandlingMode outlierHandlingMode + ) + { + this.lowerLimit = lowerLimit; + this.upperLimit = upperLimit; + this.numBuckets = numBuckets; + this.outlierHandlingMode = outlierHandlingMode; + } + + public void init(ByteBuffer buf, int position) + { + ByteBuffer mutationBuffer = buf.duplicate(); + mutationBuffer.position(position); + FixedBucketsHistogram histogram = new FixedBucketsHistogram( + lowerLimit, + upperLimit, + numBuckets, + outlierHandlingMode + ); + mutationBuffer.put(histogram.toBytesFull(false)); + } + + public void aggregate(ByteBuffer buf, int position, @Nullable Object val) + { + ByteBuffer mutationBuffer = buf.duplicate(); + mutationBuffer.position(position); + + FixedBucketsHistogram h0 = FixedBucketsHistogram.fromByteBufferFullNoSerdeHeader(mutationBuffer); + h0.combine(val); + + mutationBuffer.position(position); + mutationBuffer.put(h0.toBytesFull(false)); + } + + public FixedBucketsHistogram get(ByteBuffer buf, int position) + { + ByteBuffer mutationBuffer = buf.duplicate(); + mutationBuffer.position(position); + return FixedBucketsHistogram.fromByteBufferFullNoSerdeHeader(mutationBuffer); + } + + public void put(ByteBuffer buf, int position, FixedBucketsHistogram histogram) + { + ByteBuffer mutationBuffer = buf.duplicate(); + mutationBuffer.position(position); + mutationBuffer.put(histogram.toBytesFull(false)); + } +} diff --git a/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogramVectorAggregator.java b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogramVectorAggregator.java new file mode 100644 index 00000000000..8bfbe6fe194 --- /dev/null +++ b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogramVectorAggregator.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.histogram; + +import org.apache.druid.query.aggregation.VectorAggregator; +import org.apache.druid.segment.vector.VectorValueSelector; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; + +public class FixedBucketsHistogramVectorAggregator implements VectorAggregator +{ + private final VectorValueSelector selector; + private final FixedBucketsHistogramBufferAggregatorHelper innerAggregator; + + public FixedBucketsHistogramVectorAggregator( + VectorValueSelector selector, + double lowerLimit, + double upperLimit, + int numBuckets, + FixedBucketsHistogram.OutlierHandlingMode outlierHandlingMode + ) + { + this.selector = selector; + this.innerAggregator = new FixedBucketsHistogramBufferAggregatorHelper( + lowerLimit, + upperLimit, + numBuckets, + outlierHandlingMode + ); + } + + @Override + public void init(ByteBuffer buf, int position) + { + innerAggregator.init(buf, position); + } + + @Override + public void aggregate(ByteBuffer buf, int position, int startRow, int endRow) + { + double[] vector = selector.getDoubleVector(); + boolean[] isNull = selector.getNullVector(); + FixedBucketsHistogram histogram = innerAggregator.get(buf, position); + for (int i = startRow; i < endRow; i++) { + histogram.combine(toObject(vector, isNull, i)); + } + innerAggregator.put(buf, position, histogram); + } + + @Override + public void aggregate(ByteBuffer buf, int numRows, int[] positions, @Nullable int[] rows, int positionOffset) + { + double[] vector = selector.getDoubleVector(); + boolean[] isNull = selector.getNullVector(); + for (int i = 0; i < numRows; i++) { + int position = positions[i] + positionOffset; + int index = rows != null ? rows[i] : i; + Double val = toObject(vector, isNull, index); + innerAggregator.aggregate(buf, position, val); + } + } + + @Nullable + @Override + public Object get(ByteBuffer buf, int position) + { + return innerAggregator.get(buf, position); + } + + @Override + public void close() + { + // Nothing to close + } + + @Nullable + private Double toObject(double[] vector, @Nullable boolean[] isNull, int index) + { + return (isNull != null && isNull[index]) ? null : vector[index]; + } +} diff --git a/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramFoldingVectorAggregatorTest.java b/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramFoldingVectorAggregatorTest.java new file mode 100644 index 00000000000..ee7283b20a8 --- /dev/null +++ b/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramFoldingVectorAggregatorTest.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.histogram; + +import org.apache.druid.query.aggregation.VectorAggregator; +import org.apache.druid.segment.column.ColumnCapabilitiesImpl; +import org.apache.druid.segment.column.ValueType; +import org.apache.druid.segment.vector.VectorColumnSelectorFactory; +import org.apache.druid.segment.vector.VectorObjectSelector; +import org.easymock.EasyMock; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.nio.ByteBuffer; + +import static org.easymock.EasyMock.createMock; +import static org.easymock.EasyMock.expect; + +public class ApproximateHistogramFoldingVectorAggregatorTest +{ + private static final float[] FLOATS = {23, 19, 10, 16, 36, 2, 9, 32, 30, 45}; + private VectorColumnSelectorFactory vectorColumnSelectorFactory; + private ApproximateHistogram h1; + private ApproximateHistogram h2; + + @Before + public void setup() + { + + h1 = new ApproximateHistogram(5); + h2 = new ApproximateHistogram(5); + + for (int i = 0; i < 5; ++i) { + h1.offer(FLOATS[i]); + } + for (int i = 5; i < FLOATS.length; ++i) { + h2.offer(FLOATS[i]); + } + + VectorObjectSelector vectorObjectSelector = createMock(VectorObjectSelector.class); + expect(vectorObjectSelector.getObjectVector()).andReturn(new Object[]{h1, null, h2, null}).anyTimes(); + + EasyMock.replay(vectorObjectSelector); + + vectorColumnSelectorFactory = createMock(VectorColumnSelectorFactory.class); + expect(vectorColumnSelectorFactory.makeObjectSelector("field")) + .andReturn(vectorObjectSelector).anyTimes(); + expect(vectorColumnSelectorFactory.getColumnCapabilities("field")).andReturn( + new ColumnCapabilitiesImpl().setType(ValueType.COMPLEX) + ); + expect(vectorColumnSelectorFactory.getColumnCapabilities("string_field")).andReturn( + new ColumnCapabilitiesImpl().setType(ValueType.STRING) + ); + expect(vectorColumnSelectorFactory.getColumnCapabilities("double_field")).andReturn( + new ColumnCapabilitiesImpl().setType(ValueType.STRING) + ); + EasyMock.replay(vectorColumnSelectorFactory); + } + + @Test + public void doNotVectorizedNonComplexTypes() + { + ApproximateHistogramFoldingAggregatorFactory factory = buildHistogramFactory("string_field"); + Assert.assertFalse(factory.canVectorize(vectorColumnSelectorFactory)); + + factory = buildHistogramFactory("double_field"); + Assert.assertFalse(factory.canVectorize(vectorColumnSelectorFactory)); + } + + @Test + public void testAggregateSinglePosition() + { + ApproximateHistogramFoldingAggregatorFactory factory = buildHistogramFactory(); + ByteBuffer byteBuffer = ByteBuffer.allocate(factory.getMaxIntermediateSize()); + Assert.assertTrue(factory.canVectorize(vectorColumnSelectorFactory)); + VectorAggregator vectorAggregator = factory.factorizeVector(vectorColumnSelectorFactory); + vectorAggregator.init(byteBuffer, 0); + vectorAggregator.aggregate(byteBuffer, 0, 0, 4); + ApproximateHistogram h = (ApproximateHistogram) vectorAggregator.get(byteBuffer, 0); + + Assert.assertArrayEquals(new float[]{19.6f, 45.0f}, h.positions(), 0.1f); + Assert.assertArrayEquals(new long[]{9, 1}, h.bins()); + Assert.assertEquals(10, h.count()); + Assert.assertEquals(2.0f, h.min(), 0.1f); + Assert.assertEquals(45.0f, h.max(), 0.1f); + } + + @Test + public void testAggregateMultiPositions() + { + ApproximateHistogramFoldingAggregatorFactory factory = buildHistogramFactory(); + ByteBuffer byteBuffer = ByteBuffer.allocate(factory.getMaxIntermediateSize() * 2); + int[] positions = new int[]{0, factory.getMaxIntermediateSize()}; + VectorAggregator vectorAggregator = factory.factorizeVector(vectorColumnSelectorFactory); + vectorAggregator.init(byteBuffer, 0); + vectorAggregator.init(byteBuffer, positions[1]); + + vectorAggregator.aggregate(byteBuffer, 2, positions, null, 0); + vectorAggregator.aggregate(byteBuffer, 2, positions, new int[]{1, 2}, 0); // indirection + ApproximateHistogram actualH1 = (ApproximateHistogram) vectorAggregator.get(byteBuffer, 0); + ApproximateHistogram actualH2 = (ApproximateHistogram) vectorAggregator.get(byteBuffer, positions[1]); + + Assert.assertEquals(actualH1, h1); + Assert.assertEquals(actualH2, h2); + + } + + private ApproximateHistogramFoldingAggregatorFactory buildHistogramFactory() + { + return buildHistogramFactory("field"); + } + + private ApproximateHistogramFoldingAggregatorFactory buildHistogramFactory(String fieldName) + { + return new ApproximateHistogramFoldingAggregatorFactory( + "approximateHistoFold", + fieldName, + 5, + 5, + 0f, + 50.0f, + false + ); + } +} diff --git a/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramVectorAggregatorTest.java b/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramVectorAggregatorTest.java new file mode 100644 index 00000000000..9958194a74e --- /dev/null +++ b/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramVectorAggregatorTest.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.histogram; + +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.query.aggregation.VectorAggregator; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.column.ColumnCapabilitiesImpl; +import org.apache.druid.segment.column.ValueType; +import org.apache.druid.segment.vector.VectorColumnSelectorFactory; +import org.apache.druid.segment.vector.VectorValueSelector; +import org.easymock.EasyMock; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.nio.ByteBuffer; + +import static org.easymock.EasyMock.createMock; +import static org.easymock.EasyMock.expect; + +public class ApproximateHistogramVectorAggregatorTest +{ + private static final float[] FLOATS = {23, 19, 10, 16, 36, 2, 9, 32, 30, 45, 33}; // Last value is never included + private static final boolean[] NULL_VECTOR = + {false, false, false, false, false, false, false, false, false, false, true}; + private VectorColumnSelectorFactory vectorColumnSelectorFactory; + + @Before + public void setup() + { + NullHandling.initializeForTests(); + VectorValueSelector vectorValueSelector_1 = createMock(VectorValueSelector.class); + expect(vectorValueSelector_1.getFloatVector()).andReturn(FLOATS).anyTimes(); + expect(vectorValueSelector_1.getNullVector()).andReturn(NULL_VECTOR).anyTimes(); + + VectorValueSelector vectorValueSelector_2 = createMock(VectorValueSelector.class); + expect(vectorValueSelector_2.getFloatVector()).andReturn(FLOATS).anyTimes(); + expect(vectorValueSelector_2.getNullVector()).andReturn(null).anyTimes(); + + EasyMock.replay(vectorValueSelector_1); + EasyMock.replay(vectorValueSelector_2); + + ColumnCapabilities columnCapabilities + = ColumnCapabilitiesImpl.createSimpleNumericColumnCapabilities(ValueType.DOUBLE); + vectorColumnSelectorFactory = createMock(VectorColumnSelectorFactory.class); + expect(vectorColumnSelectorFactory.getColumnCapabilities("field_1")).andReturn(columnCapabilities).anyTimes(); + expect(vectorColumnSelectorFactory.makeValueSelector("field_1")) + .andReturn(vectorValueSelector_1).anyTimes(); + expect(vectorColumnSelectorFactory.getColumnCapabilities("field_2")).andReturn(columnCapabilities).anyTimes(); + expect(vectorColumnSelectorFactory.makeValueSelector("field_2")) + .andReturn(vectorValueSelector_2).anyTimes(); + expect(vectorColumnSelectorFactory.getColumnCapabilities("string_field")).andReturn( + new ColumnCapabilitiesImpl().setType(ValueType.STRING) + ); + expect(vectorColumnSelectorFactory.getColumnCapabilities("complex_field")).andReturn( + new ColumnCapabilitiesImpl().setType(ValueType.COMPLEX) + ); + EasyMock.replay(vectorColumnSelectorFactory); + } + + @Test + public void doNotVectorizedNonNumericTypes() + { + ApproximateHistogramAggregatorFactory factory = buildHistogramAggFactory("string_field"); + Assert.assertFalse(factory.canVectorize(vectorColumnSelectorFactory)); + + factory = buildHistogramAggFactory("complex_field"); + Assert.assertFalse(factory.canVectorize(vectorColumnSelectorFactory)); + } + + @Test + public void testAggregateSinglePosition() + { + ApproximateHistogramAggregatorFactory factory = buildHistogramAggFactory("field_1"); + ByteBuffer byteBuffer = ByteBuffer.allocate(factory.getMaxIntermediateSizeWithNulls()); + Assert.assertTrue(factory.canVectorize(vectorColumnSelectorFactory)); + VectorAggregator vectorAggregator = factory.factorizeVector(vectorColumnSelectorFactory); + vectorAggregator.init(byteBuffer, 0); + vectorAggregator.aggregate(byteBuffer, 0, 0, 11); + ApproximateHistogram h = (ApproximateHistogram) vectorAggregator.get(byteBuffer, 0); + + // (2, 1), (9.5, 2), (19.33, 3), (32.67, 3), (45, 1) + Assert.assertArrayEquals(new float[]{2, 9.5f, 19.33f, 32.67f, 45f}, h.positions(), 0.1f); + Assert.assertArrayEquals(new long[]{1, 2, 3, 3, 1}, h.bins()); + + factory = buildHistogramAggFactory("field_2"); + vectorAggregator = factory.factorizeVector(vectorColumnSelectorFactory); + vectorAggregator.init(byteBuffer, 0); + vectorAggregator.aggregate(byteBuffer, 0, 0, 10); + h = (ApproximateHistogram) vectorAggregator.get(byteBuffer, 0); + + Assert.assertArrayEquals(new float[]{2, 9.5f, 19.33f, 32.67f, 45f}, h.positions(), 0.1f); + Assert.assertArrayEquals(new long[]{1, 2, 3, 3, 1}, h.bins()); + + } + + @Test + public void testAggregateMultiPositions() + { + ApproximateHistogramAggregatorFactory factory = buildHistogramAggFactory("field_2"); + int size = factory.getMaxIntermediateSize(); + ByteBuffer byteBuffer = ByteBuffer.allocate(size * 2); + VectorAggregator vectorAggregator = factory.factorizeVector(vectorColumnSelectorFactory); + int[] positions = new int[]{0, size}; + vectorAggregator.init(byteBuffer, positions[0]); + vectorAggregator.init(byteBuffer, positions[1]); + vectorAggregator.aggregate(byteBuffer, 2, positions, null, 0); + // Put rest of 10 elements using the access indirection. Second vector gets the same element always + for (int i = 1; i < 10; i++) { + vectorAggregator.aggregate(byteBuffer, 2, positions, new int[]{i, 1}, 0); + } + + ApproximateHistogram h0 = (ApproximateHistogram) vectorAggregator.get(byteBuffer, 0); + Assert.assertArrayEquals(new float[]{2, 9.5f, 19.33f, 32.67f, 45f}, h0.positions(), 0.1f); + Assert.assertArrayEquals(new long[]{1, 2, 3, 3, 1}, h0.bins()); + + ApproximateHistogram h2 = (ApproximateHistogram) vectorAggregator.get(byteBuffer, size); + Assert.assertArrayEquals(new float[]{19}, h2.positions(), 0.1f); + Assert.assertArrayEquals(new long[]{10}, h2.bins()); + } + + private ApproximateHistogramAggregatorFactory buildHistogramAggFactory(String fieldName) + { + return new ApproximateHistogramAggregatorFactory( + "approxHisto", + fieldName, + 5, + 5, + 0.0f, + 45.0f, + false + ); + } +} diff --git a/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogramTest.java b/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogramTest.java index bf929741863..630d6e86027 100644 --- a/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogramTest.java +++ b/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogramTest.java @@ -1242,6 +1242,94 @@ public class FixedBucketsHistogramTest Assert.assertEquals(0, hIgnore.getUpperOutlierCount()); } + @Test + public void testCombineBase64() + { + FixedBucketsHistogram h = buildHistogram( + 0, + 20, + 5, + FixedBucketsHistogram.OutlierHandlingMode.OVERFLOW, + new float[]{1, 2, 7, 12, 18} + ); + + FixedBucketsHistogram h2 = buildHistogram( + 0, + 20, + 7, + FixedBucketsHistogram.OutlierHandlingMode.OVERFLOW, + new float[]{3, 8, 9, 19, 99, -50} + ); + + h.combine(h2.toBase64()); + Assert.assertEquals(5, h.getNumBuckets()); + Assert.assertEquals(4.0, h.getBucketSize(), 0.01); + Assert.assertEquals(0, h.getLowerLimit(), 0.01); + Assert.assertEquals(20, h.getUpperLimit(), 0.01); + Assert.assertEquals(FixedBucketsHistogram.OutlierHandlingMode.OVERFLOW, h.getOutlierHandlingMode()); + Assert.assertArrayEquals(new long[]{2, 3, 1, 1, 2}, h.getHistogram()); + Assert.assertEquals(9, h.getCount()); + Assert.assertEquals(1, h.getMin(), 0.01); + Assert.assertEquals(18, h.getMax(), 0.01); + Assert.assertEquals(0, h.getMissingValueCount()); + Assert.assertEquals(1, h.getLowerOutlierCount()); + Assert.assertEquals(1, h.getUpperOutlierCount()); + } + + @Test + public void testCombineAnotherHistogram() + { + FixedBucketsHistogram h = buildHistogram( + 0, + 20, + 5, + FixedBucketsHistogram.OutlierHandlingMode.OVERFLOW, + new float[]{1, 2, 7, 12, 18} + ); + + FixedBucketsHistogram h2 = buildHistogram( + 0, + 20, + 7, + FixedBucketsHistogram.OutlierHandlingMode.OVERFLOW, + new float[]{3, 8, 9, 19, 99, -50} + ); + + h.combine(h2); + Assert.assertEquals(5, h.getNumBuckets()); + Assert.assertEquals(4.0, h.getBucketSize(), 0.01); + Assert.assertEquals(0, h.getLowerLimit(), 0.01); + Assert.assertEquals(20, h.getUpperLimit(), 0.01); + Assert.assertEquals(FixedBucketsHistogram.OutlierHandlingMode.OVERFLOW, h.getOutlierHandlingMode()); + Assert.assertArrayEquals(new long[]{2, 3, 1, 1, 2}, h.getHistogram()); + Assert.assertEquals(9, h.getCount()); + Assert.assertEquals(1, h.getMin(), 0.01); + Assert.assertEquals(18, h.getMax(), 0.01); + Assert.assertEquals(0, h.getMissingValueCount()); + Assert.assertEquals(1, h.getLowerOutlierCount()); + Assert.assertEquals(1, h.getUpperOutlierCount()); + } + + @Test + public void testCombineNumber() + { + FixedBucketsHistogram h = new FixedBucketsHistogram( + 0, + 200, + 200, + FixedBucketsHistogram.OutlierHandlingMode.IGNORE + ); + + h.combine(10); + h.combine(20); + + Assert.assertEquals(0, h.getUpperOutlierCount()); + Assert.assertEquals(0, h.getLowerOutlierCount()); + Assert.assertEquals(2, h.getCount()); + Assert.assertEquals(10, h.getMin(), 0.01); + Assert.assertEquals(20, h.getMax(), 0.01); + } + @Test public void testMissing() { diff --git a/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogramVectorAggregatorTest.java b/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogramVectorAggregatorTest.java new file mode 100644 index 00000000000..78f74edf748 --- /dev/null +++ b/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogramVectorAggregatorTest.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.histogram; + +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.query.aggregation.VectorAggregator; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.column.ColumnCapabilitiesImpl; +import org.apache.druid.segment.column.ValueType; +import org.apache.druid.segment.vector.VectorColumnSelectorFactory; +import org.apache.druid.segment.vector.VectorValueSelector; +import org.easymock.EasyMock; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.nio.ByteBuffer; + +import static org.easymock.EasyMock.createMock; +import static org.easymock.EasyMock.expect; + +public class FixedBucketsHistogramVectorAggregatorTest +{ + private static final double[] DOUBLES = {1.0, 12.0, 3.0, 14.0, 15.0, 16.0}; + private static final boolean[] NULL_VECTOR = {false, false, false, false, true, false}; + private VectorColumnSelectorFactory vectorColumnSelectorFactory; + + @Before + public void setup() + { + NullHandling.initializeForTests(); + VectorValueSelector vectorValueSelector_1 = createMock(VectorValueSelector.class); + expect(vectorValueSelector_1.getDoubleVector()).andReturn(DOUBLES).anyTimes(); + expect(vectorValueSelector_1.getNullVector()).andReturn(NULL_VECTOR).anyTimes(); + + VectorValueSelector vectorValueSelector_2 = createMock(VectorValueSelector.class); + expect(vectorValueSelector_2.getDoubleVector()).andReturn(DOUBLES).anyTimes(); + expect(vectorValueSelector_2.getNullVector()).andReturn(null).anyTimes(); + + EasyMock.replay(vectorValueSelector_1); + EasyMock.replay(vectorValueSelector_2); + + ColumnCapabilities columnCapabilities + = ColumnCapabilitiesImpl.createSimpleNumericColumnCapabilities(ValueType.DOUBLE); + vectorColumnSelectorFactory = createMock(VectorColumnSelectorFactory.class); + expect(vectorColumnSelectorFactory.getColumnCapabilities("field_1")).andReturn(columnCapabilities).anyTimes(); + expect(vectorColumnSelectorFactory.makeValueSelector("field_1")) + .andReturn(vectorValueSelector_1).anyTimes(); + expect(vectorColumnSelectorFactory.getColumnCapabilities("field_2")).andReturn(columnCapabilities).anyTimes(); + expect(vectorColumnSelectorFactory.makeValueSelector("field_2")) + .andReturn(vectorValueSelector_2).anyTimes(); + EasyMock.replay(vectorColumnSelectorFactory); + } + + @Test + public void testAggregateSinglePosition() + { + ByteBuffer byteBuffer = ByteBuffer.allocate(FixedBucketsHistogram.getFullStorageSize(2)); + FixedBucketsHistogramAggregatorFactory factory = buildHistogramAggFactory("field_1"); + Assert.assertTrue(factory.canVectorize(vectorColumnSelectorFactory)); + VectorAggregator vectorAggregator = factory.factorizeVector(vectorColumnSelectorFactory); + vectorAggregator.init(byteBuffer, 0); + vectorAggregator.aggregate(byteBuffer, 0, 0, 6); + FixedBucketsHistogram h = (FixedBucketsHistogram) vectorAggregator.get(byteBuffer, 0); + + Assert.assertEquals(2, h.getNumBuckets()); + Assert.assertEquals(10.0, h.getBucketSize(), 0.01); + Assert.assertEquals(1, h.getLowerLimit(), 0.01); + Assert.assertEquals(21, h.getUpperLimit(), 0.01); + Assert.assertEquals(FixedBucketsHistogram.OutlierHandlingMode.OVERFLOW, h.getOutlierHandlingMode()); + Assert.assertArrayEquals(new long[]{2, 3}, h.getHistogram()); + Assert.assertEquals(5, h.getCount()); + Assert.assertEquals(1.0, h.getMin(), 0.01); + Assert.assertEquals(16.0, h.getMax(), 0.01); + // Default value of null is 0 which is an outlier. + Assert.assertEquals(NullHandling.replaceWithDefault() ? 0 : 1, h.getMissingValueCount()); + Assert.assertEquals(NullHandling.replaceWithDefault() ? 1 : 0, h.getLowerOutlierCount()); + Assert.assertEquals(0, h.getUpperOutlierCount()); + + factory = buildHistogramAggFactory("field_2"); + vectorAggregator = factory.factorizeVector(vectorColumnSelectorFactory); + vectorAggregator.init(byteBuffer, 0); + vectorAggregator.aggregate(byteBuffer, 0, 0, 6); + h = (FixedBucketsHistogram) vectorAggregator.get(byteBuffer, 0); + + Assert.assertEquals(2, h.getNumBuckets()); + Assert.assertEquals(10.0, h.getBucketSize(), 0.01); + Assert.assertEquals(1, h.getLowerLimit(), 0.01); + Assert.assertEquals(21, h.getUpperLimit(), 0.01); + Assert.assertEquals(FixedBucketsHistogram.OutlierHandlingMode.OVERFLOW, h.getOutlierHandlingMode()); + Assert.assertArrayEquals(new long[]{2, 4}, h.getHistogram()); + Assert.assertEquals(6, h.getCount()); + Assert.assertEquals(1.0, h.getMin(), 0.01); + Assert.assertEquals(16.0, h.getMax(), 0.01); + Assert.assertEquals(0, h.getMissingValueCount()); + Assert.assertEquals(0, h.getLowerOutlierCount()); + Assert.assertEquals(0, h.getUpperOutlierCount()); + } + + @Test + public void testAggregateMultiPositions() + { + int size = FixedBucketsHistogram.getFullStorageSize(2); + ByteBuffer byteBuffer = ByteBuffer.allocate(size * 2); + FixedBucketsHistogramAggregatorFactory factory = buildHistogramAggFactory("field_2"); + VectorAggregator vectorAggregator = factory.factorizeVector(vectorColumnSelectorFactory); + int[] positions = new int[]{0, size}; + vectorAggregator.init(byteBuffer, positions[0]); + vectorAggregator.init(byteBuffer, positions[1]); + vectorAggregator.aggregate(byteBuffer, 2, positions, null, 0); + + FixedBucketsHistogram h0 = (FixedBucketsHistogram) vectorAggregator.get(byteBuffer, 0); + + Assert.assertEquals(2, h0.getNumBuckets()); + Assert.assertEquals(10.0, h0.getBucketSize(), 0.01); + Assert.assertEquals(1, h0.getLowerLimit(), 0.01); + Assert.assertEquals(21, h0.getUpperLimit(), 0.01); + Assert.assertEquals(FixedBucketsHistogram.OutlierHandlingMode.OVERFLOW, h0.getOutlierHandlingMode()); + Assert.assertArrayEquals(new long[]{1, 0}, h0.getHistogram()); + Assert.assertEquals(1, h0.getCount()); + Assert.assertEquals(1.0, h0.getMin(), 0.01); + Assert.assertEquals(1.0, h0.getMax(), 0.01); + Assert.assertEquals(0, h0.getMissingValueCount()); + Assert.assertEquals(0, h0.getLowerOutlierCount()); + Assert.assertEquals(0, h0.getUpperOutlierCount()); + + FixedBucketsHistogram h1 = (FixedBucketsHistogram) vectorAggregator.get(byteBuffer, positions[1]); + + Assert.assertEquals(2, h1.getNumBuckets()); + Assert.assertEquals(10.0, h1.getBucketSize(), 0.01); + Assert.assertEquals(1, h1.getLowerLimit(), 0.01); + Assert.assertEquals(21, h1.getUpperLimit(), 0.01); + Assert.assertEquals(FixedBucketsHistogram.OutlierHandlingMode.OVERFLOW, h1.getOutlierHandlingMode()); + Assert.assertArrayEquals(new long[]{0, 1}, h1.getHistogram()); + Assert.assertEquals(1, h1.getCount()); + Assert.assertEquals(12.0, h1.getMin(), 0.01); + Assert.assertEquals(12.0, h1.getMax(), 0.01); + Assert.assertEquals(0, h1.getMissingValueCount()); + Assert.assertEquals(0, h1.getLowerOutlierCount()); + Assert.assertEquals(0, h1.getUpperOutlierCount()); + + // Tests when there is a level of indirection in accessing the vector + byteBuffer = ByteBuffer.allocate(size * 2); + vectorAggregator.init(byteBuffer, positions[0]); + vectorAggregator.init(byteBuffer, positions[1]); + vectorAggregator.aggregate(byteBuffer, 2, positions, new int[]{2, 3}, 0); + + FixedBucketsHistogram h2 = (FixedBucketsHistogram) vectorAggregator.get(byteBuffer, 0); + + Assert.assertEquals(2, h2.getNumBuckets()); + Assert.assertEquals(10.0, h2.getBucketSize(), 0.01); + Assert.assertEquals(1, h2.getLowerLimit(), 0.01); + Assert.assertEquals(21, h2.getUpperLimit(), 0.01); + Assert.assertEquals(FixedBucketsHistogram.OutlierHandlingMode.OVERFLOW, h2.getOutlierHandlingMode()); + Assert.assertArrayEquals(new long[]{1, 0}, h2.getHistogram()); + Assert.assertEquals(1, h2.getCount()); + Assert.assertEquals(3.0, h2.getMin(), 0.01); + Assert.assertEquals(3.0, h2.getMax(), 0.01); + Assert.assertEquals(0, h2.getMissingValueCount()); + Assert.assertEquals(0, h2.getLowerOutlierCount()); + Assert.assertEquals(0, h2.getUpperOutlierCount()); + + FixedBucketsHistogram h3 = (FixedBucketsHistogram) vectorAggregator.get(byteBuffer, positions[1]); + + Assert.assertEquals(2, h3.getNumBuckets()); + Assert.assertEquals(10.0, h3.getBucketSize(), 0.01); + Assert.assertEquals(1, h3.getLowerLimit(), 0.01); + Assert.assertEquals(21, h3.getUpperLimit(), 0.01); + Assert.assertEquals(FixedBucketsHistogram.OutlierHandlingMode.OVERFLOW, h3.getOutlierHandlingMode()); + Assert.assertArrayEquals(new long[]{0, 1}, h3.getHistogram()); + Assert.assertEquals(1, h3.getCount()); + Assert.assertEquals(14.0, h3.getMin(), 0.01); + Assert.assertEquals(14.0, h3.getMax(), 0.01); + Assert.assertEquals(0, h3.getMissingValueCount()); + Assert.assertEquals(0, h3.getLowerOutlierCount()); + Assert.assertEquals(0, h3.getUpperOutlierCount()); + + } + + private FixedBucketsHistogramAggregatorFactory buildHistogramAggFactory(String fieldName) + { + return new FixedBucketsHistogramAggregatorFactory( + "fixedHisto", + fieldName, + 2, + 1, + 21, + FixedBucketsHistogram.OutlierHandlingMode.OVERFLOW, + false + ); + } +} diff --git a/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/sql/QuantileSqlAggregatorTest.java b/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/sql/QuantileSqlAggregatorTest.java index 49fd806c271..beb275f3fac 100644 --- a/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/sql/QuantileSqlAggregatorTest.java +++ b/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/sql/QuantileSqlAggregatorTest.java @@ -48,6 +48,7 @@ import org.apache.druid.query.groupby.GroupByQuery; import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; import org.apache.druid.segment.IndexBuilder; import org.apache.druid.segment.QueryableIndex; +import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.column.ValueType; import org.apache.druid.segment.incremental.IncrementalIndexSchema; import org.apache.druid.segment.virtual.ExpressionVirtualColumn; @@ -120,6 +121,7 @@ public class QuantileSqlAggregatorTest extends CalciteTestBase ApproximateHistogramDruidModule.registerSerde(); for (Module mod : new ApproximateHistogramDruidModule().getJacksonModules()) { CalciteTests.getJsonMapper().registerModule(mod); + TestHelper.JSON_MAPPER.registerModule(mod); } final QueryableIndex index = IndexBuilder.create() diff --git a/website/.spelling b/website/.spelling index 249db61de5f..14fd910f4a7 100644 --- a/website/.spelling +++ b/website/.spelling @@ -596,6 +596,7 @@ url - ../docs/development/extensions-core/approximate-histograms.md approxHistogram approxHistogramFold +fixedBucketsHistogram bucketNum lowerLimit numBuckets