doubleMean aggregator to be used at query time (#8459)

* doubleMean aggregator for computing mean

* make docs

* build fixes

* address review comment: handle null args
This commit is contained in:
Himanshu 2019-09-26 08:04:33 -07:00 committed by GitHub
parent f4605f45be
commit 9f1f5e115c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 884 additions and 3 deletions

View File

@ -81,4 +81,13 @@ public class Utils
return true; return true;
} }
public static String safeObjectClassGetName(@Nullable Object o)
{
if (o == null) {
return "NULL";
} else {
return o.getClass().getName();
}
}
} }

View File

@ -120,6 +120,14 @@ Computes and stores the sum of values as 32-bit floating point value. Similar to
{ "type" : "longMax", "name" : <output_name>, "fieldName" : <metric_name> } { "type" : "longMax", "name" : <output_name>, "fieldName" : <metric_name> }
``` ```
### `doubleMean` aggregator
Computes and returns arithmetic mean of a column values as 64 bit float value. This is a query time aggregator only and should not be used during indexing.
```json
{ "type" : "doubleMean", "name" : <output_name>, "fieldName" : <metric_name> }
```
### First / Last aggregator ### First / Last aggregator
(Double/Float/Long) First and Last aggregator cannot be used in ingestion spec, and should only be specified as part of queries. (Double/Float/Long) First and Last aggregator cannot be used in ingestion spec, and should only be specified as part of queries.

View File

@ -53,6 +53,8 @@ import org.apache.druid.query.aggregation.last.FloatLastAggregatorFactory;
import org.apache.druid.query.aggregation.last.LongLastAggregatorFactory; import org.apache.druid.query.aggregation.last.LongLastAggregatorFactory;
import org.apache.druid.query.aggregation.last.StringLastAggregatorFactory; import org.apache.druid.query.aggregation.last.StringLastAggregatorFactory;
import org.apache.druid.query.aggregation.last.StringLastFoldingAggregatorFactory; import org.apache.druid.query.aggregation.last.StringLastFoldingAggregatorFactory;
import org.apache.druid.query.aggregation.mean.DoubleMeanAggregatorFactory;
import org.apache.druid.query.aggregation.mean.DoubleMeanHolder;
import org.apache.druid.query.aggregation.post.ArithmeticPostAggregator; import org.apache.druid.query.aggregation.post.ArithmeticPostAggregator;
import org.apache.druid.query.aggregation.post.ConstantPostAggregator; import org.apache.druid.query.aggregation.post.ConstantPostAggregator;
import org.apache.druid.query.aggregation.post.DoubleGreatestPostAggregator; import org.apache.druid.query.aggregation.post.DoubleGreatestPostAggregator;
@ -79,6 +81,8 @@ public class AggregatorsModule extends SimpleModule
setMixInAnnotation(AggregatorFactory.class, AggregatorFactoryMixin.class); setMixInAnnotation(AggregatorFactory.class, AggregatorFactoryMixin.class);
setMixInAnnotation(PostAggregator.class, PostAggregatorMixin.class); setMixInAnnotation(PostAggregator.class, PostAggregatorMixin.class);
addSerializer(DoubleMeanHolder.class, DoubleMeanHolder.Serializer.INSTANCE);
} }
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@ -105,6 +109,7 @@ public class AggregatorsModule extends SimpleModule
@JsonSubTypes.Type(name = "stringFirstFold", value = StringFirstFoldingAggregatorFactory.class), @JsonSubTypes.Type(name = "stringFirstFold", value = StringFirstFoldingAggregatorFactory.class),
@JsonSubTypes.Type(name = "longLast", value = LongLastAggregatorFactory.class), @JsonSubTypes.Type(name = "longLast", value = LongLastAggregatorFactory.class),
@JsonSubTypes.Type(name = "doubleLast", value = DoubleLastAggregatorFactory.class), @JsonSubTypes.Type(name = "doubleLast", value = DoubleLastAggregatorFactory.class),
@JsonSubTypes.Type(name = "doubleMean", value = DoubleMeanAggregatorFactory.class),
@JsonSubTypes.Type(name = "floatLast", value = FloatLastAggregatorFactory.class), @JsonSubTypes.Type(name = "floatLast", value = FloatLastAggregatorFactory.class),
@JsonSubTypes.Type(name = "stringLast", value = StringLastAggregatorFactory.class), @JsonSubTypes.Type(name = "stringLast", value = StringLastAggregatorFactory.class),
@JsonSubTypes.Type(name = "stringLastFold", value = StringLastFoldingAggregatorFactory.class) @JsonSubTypes.Type(name = "stringLastFold", value = StringLastFoldingAggregatorFactory.class)

View File

@ -119,6 +119,8 @@ public class AggregatorUtil
// TDigest sketch aggregators // TDigest sketch aggregators
public static final byte TDIGEST_BUILD_SKETCH_CACHE_TYPE_ID = 0x38; public static final byte TDIGEST_BUILD_SKETCH_CACHE_TYPE_ID = 0x38;
public static final byte MEAN_CACHE_TYPE_ID = 0x41;
/** /**
* returns the list of dependent postAggregators that should be calculated in order to calculate given postAgg * returns the list of dependent postAggregators that should be calculated in order to calculate given postAgg
* *

View File

@ -29,7 +29,7 @@ public abstract class SimpleDoubleBufferAggregator implements BufferAggregator
{ {
final BaseDoubleColumnValueSelector selector; final BaseDoubleColumnValueSelector selector;
SimpleDoubleBufferAggregator(BaseDoubleColumnValueSelector selector) public SimpleDoubleBufferAggregator(BaseDoubleColumnValueSelector selector)
{ {
this.selector = selector; this.selector = selector;
} }

View File

@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.mean;
import org.apache.druid.java.util.common.Numbers;
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.segment.ColumnValueSelector;
import java.util.List;
/**
*/
public class DoubleMeanAggregator implements Aggregator
{
private final ColumnValueSelector selector;
private final DoubleMeanHolder value = new DoubleMeanHolder(0, 0);
public DoubleMeanAggregator(ColumnValueSelector selector)
{
this.selector = selector;
}
@Override
public void aggregate()
{
Object update = selector.getObject();
if (update instanceof DoubleMeanHolder) {
value.update((DoubleMeanHolder) update);
} else if (update instanceof List) {
for (Object o : (List) update) {
value.update(Numbers.tryParseDouble(o, 0));
}
} else {
value.update(Numbers.tryParseDouble(update, 0));
}
}
@Override
public Object get()
{
return value;
}
@Override
public float getFloat()
{
throw new UnsupportedOperationException("not supported");
}
@Override
public long getLong()
{
throw new UnsupportedOperationException("not supported");
}
@Override
public double getDouble()
{
throw new UnsupportedOperationException("not supported");
}
@Override
public void close()
{
// no resources to cleanup
}
}

View File

@ -0,0 +1,181 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.mean;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.collect.Utils;
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.AggregatorUtil;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.aggregation.VectorAggregator;
import org.apache.druid.query.cache.CacheKeyBuilder;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
import javax.annotation.Nullable;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
/**
*/
public class DoubleMeanAggregatorFactory extends AggregatorFactory
{
private final String name;
private final String fieldName;
@JsonCreator
public DoubleMeanAggregatorFactory(
@JsonProperty("name") String name,
@JsonProperty("fieldName") final String fieldName
)
{
this.name = Preconditions.checkNotNull(name, "null name");
this.fieldName = Preconditions.checkNotNull(fieldName, "null fieldName");
}
@Override
@JsonProperty
public String getName()
{
return name;
}
@JsonProperty
public String getFieldName()
{
return fieldName;
}
@Override
public List<String> requiredFields()
{
return Collections.singletonList(fieldName);
}
@Override
public String getTypeName()
{
return "doubleMean";
}
@Override
public int getMaxIntermediateSize()
{
return DoubleMeanHolder.MAX_INTERMEDIATE_SIZE;
}
@Override
public Aggregator factorize(ColumnSelectorFactory metricFactory)
{
return new DoubleMeanAggregator(metricFactory.makeColumnValueSelector(fieldName));
}
@Override
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
{
return new DoubleMeanBufferAggregator(metricFactory.makeColumnValueSelector(fieldName));
}
@Override
public VectorAggregator factorizeVector(final VectorColumnSelectorFactory selectorFactory)
{
return new DoubleMeanVectorAggregator(selectorFactory.makeValueSelector(fieldName));
}
@Override
public boolean canVectorize()
{
return true;
}
@Override
public Comparator getComparator()
{
return DoubleMeanHolder.COMPARATOR;
}
@Nullable
@Override
public Object combine(@Nullable Object lhs, @Nullable Object rhs)
{
if (lhs instanceof DoubleMeanHolder && rhs instanceof DoubleMeanHolder) {
return ((DoubleMeanHolder) lhs).update((DoubleMeanHolder) rhs);
} else {
throw new IAE(
"lhs[%s] or rhs[%s] not of type [%s]",
Utils.safeObjectClassGetName(lhs),
Utils.safeObjectClassGetName(rhs),
DoubleMeanHolder.class.getName()
);
}
}
@Override
public AggregatorFactory getCombiningFactory()
{
return new DoubleMeanAggregatorFactory(name, name);
}
@Override
public List<AggregatorFactory> getRequiredColumns()
{
return Collections.singletonList(new DoubleMeanAggregatorFactory(fieldName, fieldName));
}
@Override
public Object deserialize(Object object)
{
if (object instanceof String) {
return DoubleMeanHolder.fromBytes(StringUtils.decodeBase64(StringUtils.toUtf8((String) object)));
} else if (object instanceof DoubleMeanHolder) {
return object;
} else {
throw new IAE("Unknown object type [%s]", Utils.safeObjectClassGetName(object));
}
}
@Nullable
@Override
public Object finalizeComputation(@Nullable Object object)
{
if (object instanceof DoubleMeanHolder) {
return ((DoubleMeanHolder) object).mean();
} else if (object == null) {
return null;
} else {
throw new IAE("Unknown object type [%s]", object.getClass().getName());
}
}
@Override
public byte[] getCacheKey()
{
return new CacheKeyBuilder(AggregatorUtil.MEAN_CACHE_TYPE_ID)
.appendString(name)
.appendString(fieldName)
.build();
}
}

View File

@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.mean;
import org.apache.druid.java.util.common.Numbers;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.ColumnValueSelector;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.List;
/**
*/
public class DoubleMeanBufferAggregator implements BufferAggregator
{
private final ColumnValueSelector selector;
public DoubleMeanBufferAggregator(ColumnValueSelector selector)
{
this.selector = selector;
}
@Override
public void init(ByteBuffer buf, int position)
{
DoubleMeanHolder.init(buf, position);
}
@Override
public void aggregate(ByteBuffer buf, int position)
{
Object update = selector.getObject();
if (update instanceof DoubleMeanHolder) {
DoubleMeanHolder.update(buf, position, (DoubleMeanHolder) update);
} else if (update instanceof List) {
for (Object o : (List) update) {
DoubleMeanHolder.update(buf, position, Numbers.tryParseDouble(o, 0));
}
} else {
DoubleMeanHolder.update(buf, position, Numbers.tryParseDouble(update, 0));
}
}
@Nullable
@Override
public Object get(ByteBuffer buf, int position)
{
return DoubleMeanHolder.get(buf, position);
}
@Override
public float getFloat(ByteBuffer buf, int position)
{
throw new UnsupportedOperationException("not supported");
}
@Override
public long getLong(ByteBuffer buf, int position)
{
throw new UnsupportedOperationException("not supported");
}
@Override
public double getDouble(ByteBuffer buf, int position)
{
throw new UnsupportedOperationException("not supported");
}
@Override
public void close()
{
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("selector", selector);
}
}

View File

@ -0,0 +1,136 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.mean;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.google.common.primitives.Doubles;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Comparator;
public class DoubleMeanHolder
{
public static final int MAX_INTERMEDIATE_SIZE = Long.SIZE + Double.SIZE;
public static final Comparator<DoubleMeanHolder> COMPARATOR = (o1, o2) -> Doubles.compare(o1.mean(), o2.mean());
private double sum;
private long count;
public DoubleMeanHolder(double sum, long count)
{
this.sum = sum;
this.count = count;
}
public void update(double sum)
{
this.sum += sum;
count++;
}
public DoubleMeanHolder update(DoubleMeanHolder other)
{
sum += other.sum;
count += other.count;
return this;
}
public double mean()
{
return count == 0 ? 0 : sum / count;
}
public byte[] toBytes()
{
ByteBuffer buf = ByteBuffer.allocate(Double.SIZE + Long.SIZE);
buf.putDouble(0, sum);
buf.putLong(Double.SIZE, count);
return buf.array();
}
public static DoubleMeanHolder fromBytes(byte[] data)
{
ByteBuffer buf = ByteBuffer.wrap(data);
return new DoubleMeanHolder(buf.getDouble(0), buf.getLong(Double.SIZE));
}
public static void init(ByteBuffer buf, int position)
{
writeSum(buf, position, 0d);
writeCount(buf, position, 0);
}
public static void update(ByteBuffer buf, int position, double sum)
{
writeSum(buf, position, getSum(buf, position) + sum);
writeCount(buf, position, getCount(buf, position) + 1);
}
public static void update(ByteBuffer buf, int position, DoubleMeanHolder other)
{
writeSum(buf, position, getSum(buf, position) + other.sum);
writeCount(buf, position, getCount(buf, position) + other.count);
}
public static DoubleMeanHolder get(ByteBuffer buf, int position)
{
return new DoubleMeanHolder(getSum(buf, position), getCount(buf, position));
}
private static void writeSum(ByteBuffer buf, int position, double sum)
{
buf.putDouble(position, sum);
}
private static double getSum(ByteBuffer buf, int position)
{
return buf.getDouble(position);
}
private static void writeCount(ByteBuffer buf, int position, long count)
{
buf.putLong(position + Double.SIZE, count);
}
private static long getCount(ByteBuffer buf, int position)
{
return buf.getLong(position + Double.SIZE);
}
public static class Serializer extends JsonSerializer<DoubleMeanHolder>
{
public static final Serializer INSTANCE = new Serializer();
private Serializer()
{
}
@Override
public void serialize(DoubleMeanHolder obj, JsonGenerator jgen, SerializerProvider provider)
throws IOException
{
jgen.writeBinary(obj.toBytes());
}
}
}

View File

@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.mean;
import com.google.common.base.Preconditions;
import org.apache.druid.query.aggregation.VectorAggregator;
import org.apache.druid.segment.vector.VectorValueSelector;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
public class DoubleMeanVectorAggregator implements VectorAggregator
{
private final VectorValueSelector selector;
public DoubleMeanVectorAggregator(final VectorValueSelector selector)
{
this.selector = Preconditions.checkNotNull(selector, "selector");
}
@Override
public void init(final ByteBuffer buf, final int position)
{
DoubleMeanHolder.init(buf, position);
}
@Override
public void aggregate(final ByteBuffer buf, final int position, final int startRow, final int endRow)
{
final double[] vector = selector.getDoubleVector();
for (int i = startRow; i < endRow; i++) {
DoubleMeanHolder.update(buf, position, vector[i]);
}
}
@Override
public void aggregate(
final ByteBuffer buf,
final int numRows,
final int[] positions,
@Nullable final int[] rows,
final int positionOffset
)
{
final double[] vector = selector.getDoubleVector();
for (int i = 0; i < numRows; i++) {
final double val = vector[rows != null ? rows[i] : i];
DoubleMeanHolder.update(buf, positions[i] + positionOffset, val);
}
}
@Override
public Object get(final ByteBuffer buf, final int position)
{
return DoubleMeanHolder.get(buf, position);
}
@Override
public void close()
{
// Nothing to close.
}
}

View File

@ -43,7 +43,7 @@ public class GroupByQueryConfig
private static final String CTX_KEY_FORCE_HASH_AGGREGATION = "forceHashAggregation"; private static final String CTX_KEY_FORCE_HASH_AGGREGATION = "forceHashAggregation";
private static final String CTX_KEY_INTERMEDIATE_COMBINE_DEGREE = "intermediateCombineDegree"; private static final String CTX_KEY_INTERMEDIATE_COMBINE_DEGREE = "intermediateCombineDegree";
private static final String CTX_KEY_NUM_PARALLEL_COMBINE_THREADS = "numParallelCombineThreads"; private static final String CTX_KEY_NUM_PARALLEL_COMBINE_THREADS = "numParallelCombineThreads";
private static final String CTX_KEY_VECTORIZE = "vectorize"; public static final String CTX_KEY_VECTORIZE = "vectorize";
@JsonProperty @JsonProperty
private String defaultStrategy = GroupByStrategySelector.STRATEGY_V2; private String defaultStrategy = GroupByStrategySelector.STRATEGY_V2;

View File

@ -571,7 +571,7 @@ public class AggregationTestHelper implements Closeable
} }
} }
public IncrementalIndex createIncrementalIndex( public static IncrementalIndex createIncrementalIndex(
Iterator rows, Iterator rows,
InputRowParser parser, InputRowParser parser,
final AggregatorFactory[] metrics, final AggregatorFactory[] metrics,

View File

@ -0,0 +1,159 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.mean;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import org.apache.druid.data.input.Row;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.query.Druids;
import org.apache.druid.query.Query;
import org.apache.druid.query.Result;
import org.apache.druid.query.aggregation.AggregationTestHelper;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.GroupByQueryConfig;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.query.timeseries.TimeseriesResultValue;
import org.apache.druid.segment.IncrementalIndexSegment;
import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.Segment;
import org.apache.druid.timeline.SegmentId;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.util.Collections;
import java.util.List;
public class DoubleMeanAggregationTest
{
@Rule
public final TemporaryFolder tempFolder = new TemporaryFolder();
private final AggregationTestHelper groupByQueryTestHelper;
private final AggregationTestHelper timeseriesQueryTestHelper;
private final List<Segment> segments;
public DoubleMeanAggregationTest() throws Exception
{
groupByQueryTestHelper = AggregationTestHelper.createGroupByQueryAggregationTestHelper(
Collections.EMPTY_LIST,
new GroupByQueryConfig(),
tempFolder
);
timeseriesQueryTestHelper = AggregationTestHelper.createTimeseriesQueryAggregationTestHelper(
Collections.EMPTY_LIST,
tempFolder
);
segments = ImmutableList.of(
new IncrementalIndexSegment(SimpleTestIndex.getIncrementalTestIndex(), SegmentId.dummy("test1")),
new QueryableIndexSegment(SimpleTestIndex.getMMappedTestIndex(), SegmentId.dummy("test2"))
);
}
@Test
public void testBufferAggretatorUsingGroupByQuery() throws Exception
{
GroupByQuery query = new GroupByQuery.Builder()
.setDataSource("test")
.setGranularity(Granularities.ALL)
.setInterval("1970/2050")
.setAggregatorSpecs(
new DoubleMeanAggregatorFactory("meanOnDouble", SimpleTestIndex.DOUBLE_COL),
new DoubleMeanAggregatorFactory("meanOnString", SimpleTestIndex.SINGLE_VALUE_DOUBLE_AS_STRING_DIM),
new DoubleMeanAggregatorFactory("meanOnMultiValue", SimpleTestIndex.MULTI_VALUE_DOUBLE_AS_STRING_DIM)
)
.build();
// do json serialization and deserialization of query to ensure there are no serde issues
ObjectMapper jsonMapper = groupByQueryTestHelper.getObjectMapper();
query = (GroupByQuery) jsonMapper.readValue(jsonMapper.writeValueAsString(query), Query.class);
Sequence<ResultRow> seq = groupByQueryTestHelper.runQueryOnSegmentsObjs(segments, query);
Row result = Iterables.getOnlyElement(seq.toList()).toMapBasedRow(query);
Assert.assertEquals(6.2d, result.getMetric("meanOnDouble").doubleValue(), 0.0001d);
Assert.assertEquals(6.2d, result.getMetric("meanOnString").doubleValue(), 0.0001d);
Assert.assertEquals(4.1333d, result.getMetric("meanOnMultiValue").doubleValue(), 0.0001d);
}
@Test
public void testVectorAggretatorUsingGroupByQueryOnDoubleColumn() throws Exception
{
GroupByQuery query = new GroupByQuery.Builder()
.setDataSource("test")
.setGranularity(Granularities.ALL)
.setInterval("1970/2050")
.setAggregatorSpecs(
new DoubleMeanAggregatorFactory("meanOnDouble", SimpleTestIndex.DOUBLE_COL)
)
.setContext(Collections.singletonMap(GroupByQueryConfig.CTX_KEY_VECTORIZE, true))
.build();
// do json serialization and deserialization of query to ensure there are no serde issues
ObjectMapper jsonMapper = groupByQueryTestHelper.getObjectMapper();
query = (GroupByQuery) jsonMapper.readValue(jsonMapper.writeValueAsString(query), Query.class);
Sequence<ResultRow> seq = groupByQueryTestHelper.runQueryOnSegmentsObjs(segments, query);
Row result = Iterables.getOnlyElement(seq.toList()).toMapBasedRow(query);
Assert.assertEquals(6.2d, result.getMetric("meanOnDouble").doubleValue(), 0.0001d);
}
@Test
public void testAggretatorUsingTimeseriesQuery() throws Exception
{
TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
.dataSource("test")
.granularity(Granularities.ALL)
.intervals("1970/2050")
.aggregators(
new DoubleMeanAggregatorFactory("meanOnDouble", SimpleTestIndex.DOUBLE_COL),
new DoubleMeanAggregatorFactory(
"meanOnString",
SimpleTestIndex.SINGLE_VALUE_DOUBLE_AS_STRING_DIM
),
new DoubleMeanAggregatorFactory(
"meanOnMultiValue",
SimpleTestIndex.MULTI_VALUE_DOUBLE_AS_STRING_DIM
)
)
.build();
// do json serialization and deserialization of query to ensure there are no serde issues
ObjectMapper jsonMapper = timeseriesQueryTestHelper.getObjectMapper();
query = (TimeseriesQuery) jsonMapper.readValue(jsonMapper.writeValueAsString(query), Query.class);
Sequence seq = timeseriesQueryTestHelper.runQueryOnSegmentsObjs(segments, query);
TimeseriesResultValue result = ((Result<TimeseriesResultValue>) Iterables.getOnlyElement(seq.toList())).getValue();
Assert.assertEquals(6.2d, result.getDoubleMetric("meanOnDouble").doubleValue(), 0.0001d);
Assert.assertEquals(6.2d, result.getDoubleMetric("meanOnString").doubleValue(), 0.0001d);
Assert.assertEquals(4.1333d, result.getDoubleMetric("meanOnMultiValue").doubleValue(), 0.0001d);
}
}

View File

@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.mean;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.data.input.impl.NoopInputRowParser;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.aggregation.AggregationTestHelper;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.TestIndex;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import java.util.List;
public class SimpleTestIndex
{
public static final int NUM_ROWS = 10;
public static final String SINGLE_VALUE_DOUBLE_AS_STRING_DIM = "singleValueDoubleAsStringDim";
public static final String MULTI_VALUE_DOUBLE_AS_STRING_DIM = "multiValueDoubleAsStringDim";
public static final String DOUBLE_COL = "doubleCol";
public static final List<String> DIMENSIONS = ImmutableList.of(
SINGLE_VALUE_DOUBLE_AS_STRING_DIM,
MULTI_VALUE_DOUBLE_AS_STRING_DIM
);
private static Supplier<IncrementalIndex> realtimeIndex = Suppliers.memoize(
() -> makeRealtimeIndex()
);
private static Supplier<QueryableIndex> mmappedIndex = Suppliers.memoize(
() -> TestIndex.persistRealtimeAndLoadMMapped(realtimeIndex.get())
);
public static IncrementalIndex getIncrementalTestIndex()
{
return realtimeIndex.get();
}
public static QueryableIndex getMMappedTestIndex()
{
return mmappedIndex.get();
}
private static IncrementalIndex makeRealtimeIndex()
{
try {
List<InputRow> inputRows = Lists.newArrayListWithExpectedSize(NUM_ROWS);
for (int i = 1; i <= NUM_ROWS; i++) {
double doubleVal = i + 0.7d;
String stringVal = String.valueOf(doubleVal);
inputRows.add(new MapBasedInputRow(
DateTime.now(DateTimeZone.UTC),
DIMENSIONS,
ImmutableMap.of(
DOUBLE_COL, doubleVal,
SINGLE_VALUE_DOUBLE_AS_STRING_DIM, stringVal,
MULTI_VALUE_DOUBLE_AS_STRING_DIM, Lists.newArrayList(stringVal, null, stringVal)
)
));
}
return AggregationTestHelper.createIncrementalIndex(
inputRows.iterator(),
new NoopInputRowParser(null),
new AggregatorFactory[]{
new CountAggregatorFactory("count"),
new DoubleSumAggregatorFactory(DOUBLE_COL, DOUBLE_COL)
},
0,
Granularities.NONE,
false,
100,
false
);
}
catch (Exception ex) {
throw new RuntimeException(ex);
}
}
}