diff --git a/core/src/main/java/org/apache/druid/java/util/common/collect/Utils.java b/core/src/main/java/org/apache/druid/java/util/common/collect/Utils.java index de7a2399f13..429660fa401 100644 --- a/core/src/main/java/org/apache/druid/java/util/common/collect/Utils.java +++ b/core/src/main/java/org/apache/druid/java/util/common/collect/Utils.java @@ -81,4 +81,13 @@ public class Utils return true; } + + public static String safeObjectClassGetName(@Nullable Object o) + { + if (o == null) { + return "NULL"; + } else { + return o.getClass().getName(); + } + } } diff --git a/docs/querying/aggregations.md b/docs/querying/aggregations.md index 3f6309586cb..4bc76052949 100644 --- a/docs/querying/aggregations.md +++ b/docs/querying/aggregations.md @@ -120,6 +120,14 @@ Computes and stores the sum of values as 32-bit floating point value. Similar to { "type" : "longMax", "name" : , "fieldName" : } ``` +### `doubleMean` aggregator + +Computes and returns arithmetic mean of a column values as 64 bit float value. This is a query time aggregator only and should not be used during indexing. + +```json +{ "type" : "doubleMean", "name" : , "fieldName" : } +``` + ### First / Last aggregator (Double/Float/Long) First and Last aggregator cannot be used in ingestion spec, and should only be specified as part of queries. diff --git a/processing/src/main/java/org/apache/druid/jackson/AggregatorsModule.java b/processing/src/main/java/org/apache/druid/jackson/AggregatorsModule.java index 6689d27aa03..aba52a0fd48 100644 --- a/processing/src/main/java/org/apache/druid/jackson/AggregatorsModule.java +++ b/processing/src/main/java/org/apache/druid/jackson/AggregatorsModule.java @@ -53,6 +53,8 @@ import org.apache.druid.query.aggregation.last.FloatLastAggregatorFactory; import org.apache.druid.query.aggregation.last.LongLastAggregatorFactory; import org.apache.druid.query.aggregation.last.StringLastAggregatorFactory; import org.apache.druid.query.aggregation.last.StringLastFoldingAggregatorFactory; +import org.apache.druid.query.aggregation.mean.DoubleMeanAggregatorFactory; +import org.apache.druid.query.aggregation.mean.DoubleMeanHolder; import org.apache.druid.query.aggregation.post.ArithmeticPostAggregator; import org.apache.druid.query.aggregation.post.ConstantPostAggregator; import org.apache.druid.query.aggregation.post.DoubleGreatestPostAggregator; @@ -79,6 +81,8 @@ public class AggregatorsModule extends SimpleModule setMixInAnnotation(AggregatorFactory.class, AggregatorFactoryMixin.class); setMixInAnnotation(PostAggregator.class, PostAggregatorMixin.class); + + addSerializer(DoubleMeanHolder.class, DoubleMeanHolder.Serializer.INSTANCE); } @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") @@ -105,6 +109,7 @@ public class AggregatorsModule extends SimpleModule @JsonSubTypes.Type(name = "stringFirstFold", value = StringFirstFoldingAggregatorFactory.class), @JsonSubTypes.Type(name = "longLast", value = LongLastAggregatorFactory.class), @JsonSubTypes.Type(name = "doubleLast", value = DoubleLastAggregatorFactory.class), + @JsonSubTypes.Type(name = "doubleMean", value = DoubleMeanAggregatorFactory.class), @JsonSubTypes.Type(name = "floatLast", value = FloatLastAggregatorFactory.class), @JsonSubTypes.Type(name = "stringLast", value = StringLastAggregatorFactory.class), @JsonSubTypes.Type(name = "stringLastFold", value = StringLastFoldingAggregatorFactory.class) diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorUtil.java b/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorUtil.java index e37f87b04f6..8be5e53d8ce 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorUtil.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorUtil.java @@ -119,6 +119,8 @@ public class AggregatorUtil // TDigest sketch aggregators public static final byte TDIGEST_BUILD_SKETCH_CACHE_TYPE_ID = 0x38; + public static final byte MEAN_CACHE_TYPE_ID = 0x41; + /** * returns the list of dependent postAggregators that should be calculated in order to calculate given postAgg * diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/SimpleDoubleBufferAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/SimpleDoubleBufferAggregator.java index 29db6424de1..01f3901ba6f 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/SimpleDoubleBufferAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/SimpleDoubleBufferAggregator.java @@ -29,7 +29,7 @@ public abstract class SimpleDoubleBufferAggregator implements BufferAggregator { final BaseDoubleColumnValueSelector selector; - SimpleDoubleBufferAggregator(BaseDoubleColumnValueSelector selector) + public SimpleDoubleBufferAggregator(BaseDoubleColumnValueSelector selector) { this.selector = selector; } diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/mean/DoubleMeanAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/mean/DoubleMeanAggregator.java new file mode 100644 index 00000000000..a4f7b3d5f64 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/aggregation/mean/DoubleMeanAggregator.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.mean; + +import org.apache.druid.java.util.common.Numbers; +import org.apache.druid.query.aggregation.Aggregator; +import org.apache.druid.segment.ColumnValueSelector; + +import java.util.List; + +/** + */ +public class DoubleMeanAggregator implements Aggregator +{ + private final ColumnValueSelector selector; + + private final DoubleMeanHolder value = new DoubleMeanHolder(0, 0); + + public DoubleMeanAggregator(ColumnValueSelector selector) + { + this.selector = selector; + } + + @Override + public void aggregate() + { + Object update = selector.getObject(); + + if (update instanceof DoubleMeanHolder) { + value.update((DoubleMeanHolder) update); + } else if (update instanceof List) { + for (Object o : (List) update) { + value.update(Numbers.tryParseDouble(o, 0)); + } + } else { + value.update(Numbers.tryParseDouble(update, 0)); + } + } + + @Override + public Object get() + { + return value; + } + + @Override + public float getFloat() + { + throw new UnsupportedOperationException("not supported"); + } + + @Override + public long getLong() + { + throw new UnsupportedOperationException("not supported"); + } + + @Override + public double getDouble() + { + throw new UnsupportedOperationException("not supported"); + } + + @Override + public void close() + { + // no resources to cleanup + } +} diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/mean/DoubleMeanAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/mean/DoubleMeanAggregatorFactory.java new file mode 100644 index 00000000000..5ed87bee66c --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/aggregation/mean/DoubleMeanAggregatorFactory.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.mean; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.collect.Utils; +import org.apache.druid.query.aggregation.Aggregator; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.aggregation.AggregatorUtil; +import org.apache.druid.query.aggregation.BufferAggregator; +import org.apache.druid.query.aggregation.VectorAggregator; +import org.apache.druid.query.cache.CacheKeyBuilder; +import org.apache.druid.segment.ColumnSelectorFactory; +import org.apache.druid.segment.vector.VectorColumnSelectorFactory; + +import javax.annotation.Nullable; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; + +/** + */ +public class DoubleMeanAggregatorFactory extends AggregatorFactory +{ + private final String name; + private final String fieldName; + + @JsonCreator + public DoubleMeanAggregatorFactory( + @JsonProperty("name") String name, + @JsonProperty("fieldName") final String fieldName + ) + { + this.name = Preconditions.checkNotNull(name, "null name"); + this.fieldName = Preconditions.checkNotNull(fieldName, "null fieldName"); + } + + @Override + @JsonProperty + public String getName() + { + return name; + } + + @JsonProperty + public String getFieldName() + { + return fieldName; + } + + @Override + public List requiredFields() + { + return Collections.singletonList(fieldName); + } + + @Override + public String getTypeName() + { + return "doubleMean"; + } + + @Override + public int getMaxIntermediateSize() + { + return DoubleMeanHolder.MAX_INTERMEDIATE_SIZE; + } + + @Override + public Aggregator factorize(ColumnSelectorFactory metricFactory) + { + return new DoubleMeanAggregator(metricFactory.makeColumnValueSelector(fieldName)); + } + + @Override + public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) + { + return new DoubleMeanBufferAggregator(metricFactory.makeColumnValueSelector(fieldName)); + } + + @Override + public VectorAggregator factorizeVector(final VectorColumnSelectorFactory selectorFactory) + { + return new DoubleMeanVectorAggregator(selectorFactory.makeValueSelector(fieldName)); + } + + @Override + public boolean canVectorize() + { + return true; + } + + @Override + public Comparator getComparator() + { + return DoubleMeanHolder.COMPARATOR; + } + + @Nullable + @Override + public Object combine(@Nullable Object lhs, @Nullable Object rhs) + { + if (lhs instanceof DoubleMeanHolder && rhs instanceof DoubleMeanHolder) { + return ((DoubleMeanHolder) lhs).update((DoubleMeanHolder) rhs); + } else { + throw new IAE( + "lhs[%s] or rhs[%s] not of type [%s]", + Utils.safeObjectClassGetName(lhs), + Utils.safeObjectClassGetName(rhs), + DoubleMeanHolder.class.getName() + ); + } + } + + @Override + public AggregatorFactory getCombiningFactory() + { + return new DoubleMeanAggregatorFactory(name, name); + } + + @Override + public List getRequiredColumns() + { + return Collections.singletonList(new DoubleMeanAggregatorFactory(fieldName, fieldName)); + } + + @Override + public Object deserialize(Object object) + { + if (object instanceof String) { + return DoubleMeanHolder.fromBytes(StringUtils.decodeBase64(StringUtils.toUtf8((String) object))); + } else if (object instanceof DoubleMeanHolder) { + return object; + } else { + throw new IAE("Unknown object type [%s]", Utils.safeObjectClassGetName(object)); + } + } + + @Nullable + @Override + public Object finalizeComputation(@Nullable Object object) + { + if (object instanceof DoubleMeanHolder) { + return ((DoubleMeanHolder) object).mean(); + } else if (object == null) { + return null; + } else { + throw new IAE("Unknown object type [%s]", object.getClass().getName()); + } + } + + @Override + public byte[] getCacheKey() + { + return new CacheKeyBuilder(AggregatorUtil.MEAN_CACHE_TYPE_ID) + .appendString(name) + .appendString(fieldName) + .build(); + } +} diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/mean/DoubleMeanBufferAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/mean/DoubleMeanBufferAggregator.java new file mode 100644 index 00000000000..5ff7f81a734 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/aggregation/mean/DoubleMeanBufferAggregator.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.mean; + +import org.apache.druid.java.util.common.Numbers; +import org.apache.druid.query.aggregation.BufferAggregator; +import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; +import org.apache.druid.segment.ColumnValueSelector; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; +import java.util.List; + +/** + */ +public class DoubleMeanBufferAggregator implements BufferAggregator +{ + + private final ColumnValueSelector selector; + + public DoubleMeanBufferAggregator(ColumnValueSelector selector) + { + this.selector = selector; + } + + @Override + public void init(ByteBuffer buf, int position) + { + DoubleMeanHolder.init(buf, position); + } + + @Override + public void aggregate(ByteBuffer buf, int position) + { + Object update = selector.getObject(); + + if (update instanceof DoubleMeanHolder) { + DoubleMeanHolder.update(buf, position, (DoubleMeanHolder) update); + } else if (update instanceof List) { + for (Object o : (List) update) { + DoubleMeanHolder.update(buf, position, Numbers.tryParseDouble(o, 0)); + } + } else { + DoubleMeanHolder.update(buf, position, Numbers.tryParseDouble(update, 0)); + } + } + + @Nullable + @Override + public Object get(ByteBuffer buf, int position) + { + return DoubleMeanHolder.get(buf, position); + } + + @Override + public float getFloat(ByteBuffer buf, int position) + { + throw new UnsupportedOperationException("not supported"); + } + + @Override + public long getLong(ByteBuffer buf, int position) + { + throw new UnsupportedOperationException("not supported"); + } + + @Override + public double getDouble(ByteBuffer buf, int position) + { + throw new UnsupportedOperationException("not supported"); + } + + @Override + public void close() + { + + } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + inspector.visit("selector", selector); + } +} diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/mean/DoubleMeanHolder.java b/processing/src/main/java/org/apache/druid/query/aggregation/mean/DoubleMeanHolder.java new file mode 100644 index 00000000000..f42c993b68a --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/aggregation/mean/DoubleMeanHolder.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.mean; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.google.common.primitives.Doubles; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Comparator; + +public class DoubleMeanHolder +{ + public static final int MAX_INTERMEDIATE_SIZE = Long.SIZE + Double.SIZE; + public static final Comparator COMPARATOR = (o1, o2) -> Doubles.compare(o1.mean(), o2.mean()); + + private double sum; + private long count; + + public DoubleMeanHolder(double sum, long count) + { + this.sum = sum; + this.count = count; + } + + public void update(double sum) + { + this.sum += sum; + count++; + } + + public DoubleMeanHolder update(DoubleMeanHolder other) + { + sum += other.sum; + count += other.count; + return this; + } + + public double mean() + { + return count == 0 ? 0 : sum / count; + } + + public byte[] toBytes() + { + ByteBuffer buf = ByteBuffer.allocate(Double.SIZE + Long.SIZE); + buf.putDouble(0, sum); + buf.putLong(Double.SIZE, count); + return buf.array(); + } + + public static DoubleMeanHolder fromBytes(byte[] data) + { + ByteBuffer buf = ByteBuffer.wrap(data); + return new DoubleMeanHolder(buf.getDouble(0), buf.getLong(Double.SIZE)); + } + + public static void init(ByteBuffer buf, int position) + { + writeSum(buf, position, 0d); + writeCount(buf, position, 0); + } + + public static void update(ByteBuffer buf, int position, double sum) + { + writeSum(buf, position, getSum(buf, position) + sum); + writeCount(buf, position, getCount(buf, position) + 1); + } + + public static void update(ByteBuffer buf, int position, DoubleMeanHolder other) + { + writeSum(buf, position, getSum(buf, position) + other.sum); + writeCount(buf, position, getCount(buf, position) + other.count); + } + + public static DoubleMeanHolder get(ByteBuffer buf, int position) + { + return new DoubleMeanHolder(getSum(buf, position), getCount(buf, position)); + } + + private static void writeSum(ByteBuffer buf, int position, double sum) + { + buf.putDouble(position, sum); + } + + private static double getSum(ByteBuffer buf, int position) + { + return buf.getDouble(position); + } + + private static void writeCount(ByteBuffer buf, int position, long count) + { + buf.putLong(position + Double.SIZE, count); + } + + private static long getCount(ByteBuffer buf, int position) + { + return buf.getLong(position + Double.SIZE); + } + + public static class Serializer extends JsonSerializer + { + public static final Serializer INSTANCE = new Serializer(); + + private Serializer() + { + + } + + @Override + public void serialize(DoubleMeanHolder obj, JsonGenerator jgen, SerializerProvider provider) + throws IOException + { + jgen.writeBinary(obj.toBytes()); + } + } +} diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/mean/DoubleMeanVectorAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/mean/DoubleMeanVectorAggregator.java new file mode 100644 index 00000000000..0a1a93ca990 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/aggregation/mean/DoubleMeanVectorAggregator.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.mean; + +import com.google.common.base.Preconditions; +import org.apache.druid.query.aggregation.VectorAggregator; +import org.apache.druid.segment.vector.VectorValueSelector; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; + +public class DoubleMeanVectorAggregator implements VectorAggregator +{ + private final VectorValueSelector selector; + + public DoubleMeanVectorAggregator(final VectorValueSelector selector) + { + this.selector = Preconditions.checkNotNull(selector, "selector"); + } + + @Override + public void init(final ByteBuffer buf, final int position) + { + DoubleMeanHolder.init(buf, position); + } + + @Override + public void aggregate(final ByteBuffer buf, final int position, final int startRow, final int endRow) + { + final double[] vector = selector.getDoubleVector(); + for (int i = startRow; i < endRow; i++) { + DoubleMeanHolder.update(buf, position, vector[i]); + } + } + + @Override + public void aggregate( + final ByteBuffer buf, + final int numRows, + final int[] positions, + @Nullable final int[] rows, + final int positionOffset + ) + { + final double[] vector = selector.getDoubleVector(); + + for (int i = 0; i < numRows; i++) { + final double val = vector[rows != null ? rows[i] : i]; + DoubleMeanHolder.update(buf, positions[i] + positionOffset, val); + } + } + + @Override + public Object get(final ByteBuffer buf, final int position) + { + return DoubleMeanHolder.get(buf, position); + } + + @Override + public void close() + { + // Nothing to close. + } +} diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryConfig.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryConfig.java index b272ab16581..92d9e9e4d05 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryConfig.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryConfig.java @@ -43,7 +43,7 @@ public class GroupByQueryConfig private static final String CTX_KEY_FORCE_HASH_AGGREGATION = "forceHashAggregation"; private static final String CTX_KEY_INTERMEDIATE_COMBINE_DEGREE = "intermediateCombineDegree"; private static final String CTX_KEY_NUM_PARALLEL_COMBINE_THREADS = "numParallelCombineThreads"; - private static final String CTX_KEY_VECTORIZE = "vectorize"; + public static final String CTX_KEY_VECTORIZE = "vectorize"; @JsonProperty private String defaultStrategy = GroupByStrategySelector.STRATEGY_V2; diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java b/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java index 61212813b54..af936d1454f 100644 --- a/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java +++ b/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java @@ -571,7 +571,7 @@ public class AggregationTestHelper implements Closeable } } - public IncrementalIndex createIncrementalIndex( + public static IncrementalIndex createIncrementalIndex( Iterator rows, InputRowParser parser, final AggregatorFactory[] metrics, diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/mean/DoubleMeanAggregationTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/mean/DoubleMeanAggregationTest.java new file mode 100644 index 00000000000..a71c6f0efa3 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/aggregation/mean/DoubleMeanAggregationTest.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.mean; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import org.apache.druid.data.input.Row; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.query.Druids; +import org.apache.druid.query.Query; +import org.apache.druid.query.Result; +import org.apache.druid.query.aggregation.AggregationTestHelper; +import org.apache.druid.query.groupby.GroupByQuery; +import org.apache.druid.query.groupby.GroupByQueryConfig; +import org.apache.druid.query.groupby.ResultRow; +import org.apache.druid.query.timeseries.TimeseriesQuery; +import org.apache.druid.query.timeseries.TimeseriesResultValue; +import org.apache.druid.segment.IncrementalIndexSegment; +import org.apache.druid.segment.QueryableIndexSegment; +import org.apache.druid.segment.Segment; +import org.apache.druid.timeline.SegmentId; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.util.Collections; +import java.util.List; + +public class DoubleMeanAggregationTest +{ + @Rule + public final TemporaryFolder tempFolder = new TemporaryFolder(); + + private final AggregationTestHelper groupByQueryTestHelper; + private final AggregationTestHelper timeseriesQueryTestHelper; + + private final List segments; + + public DoubleMeanAggregationTest() throws Exception + { + + groupByQueryTestHelper = AggregationTestHelper.createGroupByQueryAggregationTestHelper( + Collections.EMPTY_LIST, + new GroupByQueryConfig(), + tempFolder + ); + + timeseriesQueryTestHelper = AggregationTestHelper.createTimeseriesQueryAggregationTestHelper( + Collections.EMPTY_LIST, + tempFolder + ); + + segments = ImmutableList.of( + new IncrementalIndexSegment(SimpleTestIndex.getIncrementalTestIndex(), SegmentId.dummy("test1")), + new QueryableIndexSegment(SimpleTestIndex.getMMappedTestIndex(), SegmentId.dummy("test2")) + ); + } + + @Test + public void testBufferAggretatorUsingGroupByQuery() throws Exception + { + GroupByQuery query = new GroupByQuery.Builder() + .setDataSource("test") + .setGranularity(Granularities.ALL) + .setInterval("1970/2050") + .setAggregatorSpecs( + new DoubleMeanAggregatorFactory("meanOnDouble", SimpleTestIndex.DOUBLE_COL), + new DoubleMeanAggregatorFactory("meanOnString", SimpleTestIndex.SINGLE_VALUE_DOUBLE_AS_STRING_DIM), + new DoubleMeanAggregatorFactory("meanOnMultiValue", SimpleTestIndex.MULTI_VALUE_DOUBLE_AS_STRING_DIM) + ) + .build(); + + // do json serialization and deserialization of query to ensure there are no serde issues + ObjectMapper jsonMapper = groupByQueryTestHelper.getObjectMapper(); + query = (GroupByQuery) jsonMapper.readValue(jsonMapper.writeValueAsString(query), Query.class); + + Sequence seq = groupByQueryTestHelper.runQueryOnSegmentsObjs(segments, query); + Row result = Iterables.getOnlyElement(seq.toList()).toMapBasedRow(query); + + Assert.assertEquals(6.2d, result.getMetric("meanOnDouble").doubleValue(), 0.0001d); + Assert.assertEquals(6.2d, result.getMetric("meanOnString").doubleValue(), 0.0001d); + Assert.assertEquals(4.1333d, result.getMetric("meanOnMultiValue").doubleValue(), 0.0001d); + } + + @Test + public void testVectorAggretatorUsingGroupByQueryOnDoubleColumn() throws Exception + { + GroupByQuery query = new GroupByQuery.Builder() + .setDataSource("test") + .setGranularity(Granularities.ALL) + .setInterval("1970/2050") + .setAggregatorSpecs( + new DoubleMeanAggregatorFactory("meanOnDouble", SimpleTestIndex.DOUBLE_COL) + ) + .setContext(Collections.singletonMap(GroupByQueryConfig.CTX_KEY_VECTORIZE, true)) + .build(); + + // do json serialization and deserialization of query to ensure there are no serde issues + ObjectMapper jsonMapper = groupByQueryTestHelper.getObjectMapper(); + query = (GroupByQuery) jsonMapper.readValue(jsonMapper.writeValueAsString(query), Query.class); + + Sequence seq = groupByQueryTestHelper.runQueryOnSegmentsObjs(segments, query); + Row result = Iterables.getOnlyElement(seq.toList()).toMapBasedRow(query); + + Assert.assertEquals(6.2d, result.getMetric("meanOnDouble").doubleValue(), 0.0001d); + } + + @Test + public void testAggretatorUsingTimeseriesQuery() throws Exception + { + TimeseriesQuery query = Druids.newTimeseriesQueryBuilder() + .dataSource("test") + .granularity(Granularities.ALL) + .intervals("1970/2050") + .aggregators( + new DoubleMeanAggregatorFactory("meanOnDouble", SimpleTestIndex.DOUBLE_COL), + new DoubleMeanAggregatorFactory( + "meanOnString", + SimpleTestIndex.SINGLE_VALUE_DOUBLE_AS_STRING_DIM + ), + new DoubleMeanAggregatorFactory( + "meanOnMultiValue", + SimpleTestIndex.MULTI_VALUE_DOUBLE_AS_STRING_DIM + ) + ) + .build(); + + // do json serialization and deserialization of query to ensure there are no serde issues + ObjectMapper jsonMapper = timeseriesQueryTestHelper.getObjectMapper(); + query = (TimeseriesQuery) jsonMapper.readValue(jsonMapper.writeValueAsString(query), Query.class); + + Sequence seq = timeseriesQueryTestHelper.runQueryOnSegmentsObjs(segments, query); + TimeseriesResultValue result = ((Result) Iterables.getOnlyElement(seq.toList())).getValue(); + + Assert.assertEquals(6.2d, result.getDoubleMetric("meanOnDouble").doubleValue(), 0.0001d); + Assert.assertEquals(6.2d, result.getDoubleMetric("meanOnString").doubleValue(), 0.0001d); + Assert.assertEquals(4.1333d, result.getDoubleMetric("meanOnMultiValue").doubleValue(), 0.0001d); + } +} diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/mean/SimpleTestIndex.java b/processing/src/test/java/org/apache/druid/query/aggregation/mean/SimpleTestIndex.java new file mode 100644 index 00000000000..786a6fc176c --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/aggregation/mean/SimpleTestIndex.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.mean; + +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.MapBasedInputRow; +import org.apache.druid.data.input.impl.NoopInputRowParser; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.query.aggregation.AggregationTestHelper; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.aggregation.CountAggregatorFactory; +import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory; +import org.apache.druid.segment.QueryableIndex; +import org.apache.druid.segment.TestIndex; +import org.apache.druid.segment.incremental.IncrementalIndex; +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; + +import java.util.List; + +public class SimpleTestIndex +{ + public static final int NUM_ROWS = 10; + + public static final String SINGLE_VALUE_DOUBLE_AS_STRING_DIM = "singleValueDoubleAsStringDim"; + public static final String MULTI_VALUE_DOUBLE_AS_STRING_DIM = "multiValueDoubleAsStringDim"; + + public static final String DOUBLE_COL = "doubleCol"; + + public static final List DIMENSIONS = ImmutableList.of( + SINGLE_VALUE_DOUBLE_AS_STRING_DIM, + MULTI_VALUE_DOUBLE_AS_STRING_DIM + ); + + private static Supplier realtimeIndex = Suppliers.memoize( + () -> makeRealtimeIndex() + ); + + private static Supplier mmappedIndex = Suppliers.memoize( + () -> TestIndex.persistRealtimeAndLoadMMapped(realtimeIndex.get()) + ); + + + public static IncrementalIndex getIncrementalTestIndex() + { + return realtimeIndex.get(); + } + + public static QueryableIndex getMMappedTestIndex() + { + return mmappedIndex.get(); + } + + private static IncrementalIndex makeRealtimeIndex() + { + try { + List inputRows = Lists.newArrayListWithExpectedSize(NUM_ROWS); + for (int i = 1; i <= NUM_ROWS; i++) { + double doubleVal = i + 0.7d; + String stringVal = String.valueOf(doubleVal); + + inputRows.add(new MapBasedInputRow( + DateTime.now(DateTimeZone.UTC), + DIMENSIONS, + ImmutableMap.of( + DOUBLE_COL, doubleVal, + SINGLE_VALUE_DOUBLE_AS_STRING_DIM, stringVal, + MULTI_VALUE_DOUBLE_AS_STRING_DIM, Lists.newArrayList(stringVal, null, stringVal) + ) + )); + } + + return AggregationTestHelper.createIncrementalIndex( + inputRows.iterator(), + new NoopInputRowParser(null), + new AggregatorFactory[]{ + new CountAggregatorFactory("count"), + new DoubleSumAggregatorFactory(DOUBLE_COL, DOUBLE_COL) + }, + 0, + Granularities.NONE, + false, + 100, + false + ); + } + catch (Exception ex) { + throw new RuntimeException(ex); + } + } +}