make double sum/min/max agg work on string columns (#8243)

* make double sum/min/max agg work on string columns

* style and compilation fixes

* fix tests

* address review comments

* add comment on SimpleDoubleAggregatorFactory

* make checkstyle happy
This commit is contained in:
Himanshu 2019-08-13 15:55:14 -07:00 committed by GitHub
parent a5c9c2950f
commit 176da53996
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 732 additions and 52 deletions

View File

@ -19,6 +19,10 @@
package org.apache.druid.java.util.common;
import com.google.common.primitives.Doubles;
import javax.annotation.Nullable;
public final class Numbers
{
/**
@ -92,6 +96,26 @@ public final class Numbers
}
}
/**
* Try parsing the given Number or String object val as double.
* @param val
* @param nullValue value to return when input was string type but not parseable into double value
* @return parsed double value
*/
public static double tryParseDouble(@Nullable Object val, double nullValue)
{
if (val == null) {
return nullValue;
} else if (val instanceof Number) {
return ((Number) val).doubleValue();
} else if (val instanceof String) {
Double d = Doubles.tryParse((String) val);
return d == null ? nullValue : d.doubleValue();
} else {
throw new IAE("Unknown object type [%s]", val.getClass().getName());
}
}
public static int toIntExact(long value, String error)
{
if ((int) value != value) {

View File

@ -25,7 +25,6 @@ import org.apache.druid.java.util.common.Pair;
import org.apache.druid.math.expr.Expr;
import org.apache.druid.math.expr.ExprEval;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.BaseDoubleColumnValueSelector;
import org.apache.druid.segment.BaseFloatColumnValueSelector;
import org.apache.druid.segment.BaseLongColumnValueSelector;
import org.apache.druid.segment.ColumnSelectorFactory;
@ -266,7 +265,7 @@ public class AggregatorUtil
/**
* Only one of fieldName and fieldExpression should be non-null
*/
static BaseDoubleColumnValueSelector makeColumnValueSelectorWithDoubleDefault(
static ColumnValueSelector makeColumnValueSelectorWithDoubleDefault(
final ColumnSelectorFactory metricFactory,
@Nullable final String fieldName,
@Nullable final Expr fieldExpression,

View File

@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation;
import javax.annotation.Nullable;
/**
* An Aggregator that delegates everything. It is used by Aggregator wrappers e.g.
* {@link StringColumnDoubleAggregatorWrapper} that modify some behavior of a delegate.
*/
public abstract class DelegatingAggregator implements Aggregator
{
protected Aggregator delegate;
@Override
public void aggregate()
{
delegate.aggregate();
}
@Nullable
@Override
public Object get()
{
return delegate.get();
}
@Override
public float getFloat()
{
return delegate.getFloat();
}
@Override
public long getLong()
{
return delegate.getLong();
}
@Override
public double getDouble()
{
return delegate.getDouble();
}
@Override
public boolean isNull()
{
return delegate.isNull();
}
@Override
public void close()
{
delegate.close();
}
}

View File

@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
/**
* A BufferAggregator that delegates everything. It is used by BufferAggregator wrappers e.g.
* {@link StringColumnDoubleBufferAggregatorWrapper} that modify some behavior of a delegate.
*/
public abstract class DelegatingBufferAggregator implements BufferAggregator
{
protected BufferAggregator delegate;
@Override
public void init(ByteBuffer buf, int position)
{
delegate.init(buf, position);
}
@Override
public void aggregate(ByteBuffer buf, int position)
{
delegate.aggregate(buf, position);
}
@Nullable
@Override
public Object get(ByteBuffer buf, int position)
{
return delegate.get(buf, position);
}
@Override
public float getFloat(ByteBuffer buf, int position)
{
return delegate.getFloat(buf, position);
}
@Override
public long getLong(ByteBuffer buf, int position)
{
return delegate.getLong(buf, position);
}
@Override
public double getDouble(ByteBuffer buf, int position)
{
return delegate.getDouble(buf, position);
}
@Override
public void close()
{
delegate.close();
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
delegate.inspectRuntimeShape(inspector);
}
@Override
public void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, ByteBuffer newBuffer)
{
delegate.relocate(oldPosition, newPosition, oldBuffer, newBuffer);
}
@Override
public boolean isNull(ByteBuffer buf, int position)
{
return delegate.isNull(buf, position);
}
}

View File

@ -25,7 +25,6 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.segment.BaseDoubleColumnValueSelector;
import org.apache.druid.segment.ColumnSelectorFactory;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
@ -53,25 +52,19 @@ public class DoubleMaxAggregatorFactory extends SimpleDoubleAggregatorFactory
}
@Override
protected BaseDoubleColumnValueSelector selector(ColumnSelectorFactory metricFactory)
protected double nullValue()
{
return getDoubleColumnSelector(
metricFactory,
Double.NEGATIVE_INFINITY
);
return Double.NEGATIVE_INFINITY;
}
@Override
protected Aggregator factorize(ColumnSelectorFactory metricFactory, BaseDoubleColumnValueSelector selector)
protected Aggregator buildAggregator(BaseDoubleColumnValueSelector selector)
{
return new DoubleMaxAggregator(selector);
}
@Override
protected BufferAggregator factorizeBuffered(
ColumnSelectorFactory metricFactory,
BaseDoubleColumnValueSelector selector
)
protected BufferAggregator buildBufferAggregator(BaseDoubleColumnValueSelector selector)
{
return new DoubleMaxBufferAggregator(selector);
}

View File

@ -25,7 +25,6 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.segment.BaseDoubleColumnValueSelector;
import org.apache.druid.segment.ColumnSelectorFactory;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
@ -53,25 +52,19 @@ public class DoubleMinAggregatorFactory extends SimpleDoubleAggregatorFactory
}
@Override
protected BaseDoubleColumnValueSelector selector(ColumnSelectorFactory metricFactory)
protected double nullValue()
{
return getDoubleColumnSelector(
metricFactory,
Double.POSITIVE_INFINITY
);
return Double.POSITIVE_INFINITY;
}
@Override
protected Aggregator factorize(ColumnSelectorFactory metricFactory, BaseDoubleColumnValueSelector selector)
protected Aggregator buildAggregator(BaseDoubleColumnValueSelector selector)
{
return new DoubleMinAggregator(selector);
}
@Override
protected BufferAggregator factorizeBuffered(
ColumnSelectorFactory metricFactory,
BaseDoubleColumnValueSelector selector
)
protected BufferAggregator buildBufferAggregator(BaseDoubleColumnValueSelector selector)
{
return new DoubleMinBufferAggregator(selector);
}

View File

@ -25,7 +25,6 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.segment.BaseDoubleColumnValueSelector;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
import org.apache.druid.segment.vector.VectorValueSelector;
@ -55,41 +54,36 @@ public class DoubleSumAggregatorFactory extends SimpleDoubleAggregatorFactory
}
@Override
protected BaseDoubleColumnValueSelector selector(ColumnSelectorFactory metricFactory)
protected double nullValue()
{
return getDoubleColumnSelector(
metricFactory,
0.0d
);
return 0.0d;
}
@Override
protected Aggregator buildAggregator(BaseDoubleColumnValueSelector selector)
{
return new DoubleSumAggregator(selector);
}
@Override
protected BufferAggregator buildBufferAggregator(BaseDoubleColumnValueSelector selector)
{
return new DoubleSumBufferAggregator(selector);
}
@Override
protected VectorValueSelector vectorSelector(VectorColumnSelectorFactory columnSelectorFactory)
{
return columnSelectorFactory.makeValueSelector(fieldName);
}
@Override
protected Aggregator factorize(ColumnSelectorFactory metricFactory, BaseDoubleColumnValueSelector selector)
{
return new DoubleSumAggregator(selector);
}
@Override
public boolean canVectorize()
{
return expression == null;
}
@Override
protected BufferAggregator factorizeBuffered(
ColumnSelectorFactory metricFactory,
BaseDoubleColumnValueSelector selector
)
{
return new DoubleSumBufferAggregator(selector);
}
@Override
protected VectorAggregator factorizeVector(
VectorColumnSelectorFactory columnSelectorFactory,

View File

@ -29,7 +29,10 @@ import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.math.expr.Parser;
import org.apache.druid.segment.BaseDoubleColumnValueSelector;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ValueType;
import javax.annotation.Nullable;
import java.util.Collections;
@ -37,7 +40,13 @@ import java.util.Comparator;
import java.util.List;
import java.util.Objects;
public abstract class SimpleDoubleAggregatorFactory extends NullableAggregatorFactory<BaseDoubleColumnValueSelector>
/**
* This is an abstract class inherited by various {@link AggregatorFactory} implementations that consume double input
* and produce double output on aggregation.
* It extends "NullableAggregatorFactory<ColumnValueSelector>" instead of "NullableAggregatorFactory<BaseDoubleColumnValueSelector>"
* to additionally support aggregation on single/multi value string column types.
*/
public abstract class SimpleDoubleAggregatorFactory extends NullableAggregatorFactory<ColumnValueSelector>
{
protected final String name;
@Nullable
@ -68,16 +77,57 @@ public abstract class SimpleDoubleAggregatorFactory extends NullableAggregatorFa
);
}
protected BaseDoubleColumnValueSelector getDoubleColumnSelector(ColumnSelectorFactory metricFactory, double nullValue)
@Override
protected Aggregator factorize(ColumnSelectorFactory metricFactory, ColumnValueSelector selector)
{
if (shouldUseStringColumnAggregatorWrapper(metricFactory)) {
return new StringColumnDoubleAggregatorWrapper(
selector,
SimpleDoubleAggregatorFactory.this::buildAggregator,
nullValue()
);
} else {
return buildAggregator(selector);
}
}
@Override
protected BufferAggregator factorizeBuffered(
ColumnSelectorFactory metricFactory,
ColumnValueSelector selector
)
{
if (shouldUseStringColumnAggregatorWrapper(metricFactory)) {
return new StringColumnDoubleBufferAggregatorWrapper(
selector,
SimpleDoubleAggregatorFactory.this::buildBufferAggregator,
nullValue()
);
} else {
return buildBufferAggregator(selector);
}
}
@Override
protected ColumnValueSelector selector(ColumnSelectorFactory metricFactory)
{
return AggregatorUtil.makeColumnValueSelectorWithDoubleDefault(
metricFactory,
fieldName,
fieldExpression.get(),
nullValue
nullValue()
);
}
private boolean shouldUseStringColumnAggregatorWrapper(ColumnSelectorFactory columnSelectorFactory)
{
if (fieldName != null) {
ColumnCapabilities capabilities = columnSelectorFactory.getColumnCapabilities(fieldName);
return capabilities != null && capabilities.getType() == ValueType.STRING;
}
return false;
}
@Override
public Object deserialize(Object object)
{
@ -184,4 +234,10 @@ public abstract class SimpleDoubleAggregatorFactory extends NullableAggregatorFa
{
return expression;
}
protected abstract double nullValue();
protected abstract Aggregator buildAggregator(BaseDoubleColumnValueSelector selector);
protected abstract BufferAggregator buildBufferAggregator(BaseDoubleColumnValueSelector selector);
}

View File

@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation;
import org.apache.druid.java.util.common.Numbers;
import org.apache.druid.segment.BaseDoubleColumnValueSelector;
import org.apache.druid.segment.BaseObjectColumnValueSelector;
import org.apache.druid.segment.selector.settable.SettableValueDoubleColumnValueSelector;
import java.util.List;
import java.util.function.Function;
/**
* This class can be used to wrap Double Aggregator that consume double type columns to handle String type.
*/
public class StringColumnDoubleAggregatorWrapper extends DelegatingAggregator
{
private final BaseObjectColumnValueSelector selector;
private final double nullValue;
private final SettableValueDoubleColumnValueSelector doubleSelector;
public StringColumnDoubleAggregatorWrapper(
BaseObjectColumnValueSelector selector,
Function<BaseDoubleColumnValueSelector, Aggregator> delegateBuilder,
double nullValue
)
{
this.doubleSelector = new SettableValueDoubleColumnValueSelector();
this.selector = selector;
this.nullValue = nullValue;
this.delegate = delegateBuilder.apply(doubleSelector);
}
@Override
public void aggregate()
{
Object update = selector.getObject();
if (update == null) {
doubleSelector.setValue(nullValue);
delegate.aggregate();
} else if (update instanceof List) {
for (Object o : (List) update) {
doubleSelector.setValue(Numbers.tryParseDouble(o, nullValue));
delegate.aggregate();
}
} else {
doubleSelector.setValue(Numbers.tryParseDouble(update, nullValue));
delegate.aggregate();
}
}
}

View File

@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation;
import org.apache.druid.java.util.common.Numbers;
import org.apache.druid.segment.BaseDoubleColumnValueSelector;
import org.apache.druid.segment.BaseObjectColumnValueSelector;
import org.apache.druid.segment.selector.settable.SettableValueDoubleColumnValueSelector;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.function.Function;
/**
* This class can be used to wrap Double BufferAggregator that consume double type columns to handle String type.
*/
public class StringColumnDoubleBufferAggregatorWrapper extends DelegatingBufferAggregator
{
private final BaseObjectColumnValueSelector selector;
private final double nullValue;
private final SettableValueDoubleColumnValueSelector doubleSelector;
public StringColumnDoubleBufferAggregatorWrapper(
BaseObjectColumnValueSelector selector,
Function<BaseDoubleColumnValueSelector, BufferAggregator> delegateBuilder,
double nullValue
)
{
this.doubleSelector = new SettableValueDoubleColumnValueSelector();
this.selector = selector;
this.nullValue = nullValue;
this.delegate = delegateBuilder.apply(doubleSelector);
}
@Override
public void aggregate(ByteBuffer buf, int position)
{
Object update = selector.getObject();
if (update == null) {
doubleSelector.setValue(nullValue);
delegate.aggregate(buf, position);
} else if (update instanceof List) {
for (Object o : (List) update) {
doubleSelector.setValue(Numbers.tryParseDouble(o, nullValue));
delegate.aggregate(buf, position);
}
} else {
doubleSelector.setValue(Numbers.tryParseDouble(update, nullValue));
delegate.aggregate(buf, position);
}
}
}

View File

@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.selector.settable;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.BaseDoubleColumnValueSelector;
import org.apache.druid.segment.ColumnValueSelector;
/**
* A BaseDoubleColumnValueSelector impl to return settable double value on calls to
* {@link ColumnValueSelector#getDouble()}
*/
public class SettableValueDoubleColumnValueSelector implements BaseDoubleColumnValueSelector
{
private double value;
@Override
public double getDouble()
{
return value;
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
}
@Override
public boolean isNull()
{
return false;
}
public void setValue(double value)
{
this.value = value;
}
}

View File

@ -261,7 +261,7 @@ public class SchemaEvolutionTest
// Only string(1)
// Note: Expressions implicitly cast strings to numbers, leading to the a/b vs c/d difference.
Assert.assertEquals(
timeseriesResult(ImmutableMap.of("a", 0L, "b", 0.0, "c", 31L, "d", THIRTY_ONE_POINT_ONE)),
timeseriesResult(ImmutableMap.of("a", 0L, "b", THIRTY_ONE_POINT_ONE, "c", 31L, "d", THIRTY_ONE_POINT_ONE)),
runQuery(query, factory, ImmutableList.of(index1))
);
@ -293,7 +293,7 @@ public class SchemaEvolutionTest
Assert.assertEquals(
timeseriesResult(ImmutableMap.of(
"a", 31L * 2,
"b", THIRTY_ONE_POINT_ONE + 31,
"b", THIRTY_ONE_POINT_ONE * 2 + 31,
"c", 31L * 3,
"d", THIRTY_ONE_POINT_ONE * 2 + 31
)),
@ -335,7 +335,7 @@ public class SchemaEvolutionTest
// Only string(1) -- which we can filter but not aggregate
Assert.assertEquals(
timeseriesResult(ImmutableMap.of("a", 0L, "b", 0.0, "c", 2L)),
timeseriesResult(ImmutableMap.of("a", 0L, "b", 19.1, "c", 2L)),
runQuery(query, factory, ImmutableList.of(index1))
);
@ -368,7 +368,7 @@ public class SchemaEvolutionTest
Assert.assertEquals(
timeseriesResult(ImmutableMap.of(
"a", 38L,
"b", 38.1,
"b", 57.2,
"c", 6L
)),
runQuery(query, factory, ImmutableList.of(index1, index2, index3, index4))

View File

@ -571,6 +571,60 @@ public class AggregationTestHelper implements Closeable
}
}
public IncrementalIndex createIncrementalIndex(
Iterator rows,
InputRowParser parser,
final AggregatorFactory[] metrics,
long minTimestamp,
Granularity gran,
boolean deserializeComplexMetrics,
int maxRowCount,
boolean rollup
) throws Exception
{
IncrementalIndex index = new IncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withMinTimestamp(minTimestamp)
.withQueryGranularity(gran)
.withMetrics(metrics)
.withRollup(rollup)
.build()
)
.setDeserializeComplexMetrics(deserializeComplexMetrics)
.setMaxRowCount(maxRowCount)
.buildOnheap();
while (rows.hasNext()) {
Object row = rows.next();
if (!index.canAppendRow()) {
throw new IAE("Can't add row to index");
}
if (row instanceof String && parser instanceof StringInputRowParser) {
//Note: this is required because StringInputRowParser is InputRowParser<ByteBuffer> as opposed to
//InputRowsParser<String>
index.add(((StringInputRowParser) parser).parse((String) row));
} else {
index.add(((List<InputRow>) parser.parseBatch(row)).get(0));
}
}
return index;
}
public Segment persistIncrementalIndex(
IncrementalIndex index,
File outDir
) throws Exception
{
if (outDir == null) {
outDir = tempFolder.newFolder();
}
indexMerger.persist(index, outDir, new IndexSpec(), null);
return new QueryableIndexSegment(indexIO.loadIndex(outDir), SegmentId.dummy(""));
}
//Simulates running group-by query on individual segments as historicals would do, json serialize the results
//from each segment, later deserialize and merge and finally return the results
public Sequence<ResultRow> runQueryOnSegments(final List<File> segmentDirs, final String queryJson)

View File

@ -50,6 +50,7 @@ public class DoubleMaxAggregationTest
selector = new TestDoubleColumnSelectorImpl(values);
colSelectorFactory = EasyMock.createMock(ColumnSelectorFactory.class);
EasyMock.expect(colSelectorFactory.makeColumnValueSelector("nilly")).andReturn(selector);
EasyMock.expect(colSelectorFactory.getColumnCapabilities("nilly")).andReturn(null);
EasyMock.replay(colSelectorFactory);
}

View File

@ -50,6 +50,7 @@ public class DoubleMinAggregationTest
selector = new TestDoubleColumnSelectorImpl(values);
colSelectorFactory = EasyMock.createMock(ColumnSelectorFactory.class);
EasyMock.expect(colSelectorFactory.makeColumnValueSelector("nilly")).andReturn(selector);
EasyMock.expect(colSelectorFactory.getColumnCapabilities("nilly")).andReturn(null);
EasyMock.replay(colSelectorFactory);
}

View File

@ -0,0 +1,201 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.data.input.Row;
import org.apache.druid.data.input.impl.NoopInputRowParser;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.query.Druids;
import org.apache.druid.query.Result;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.GroupByQueryConfig;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.query.timeseries.TimeseriesResultValue;
import org.apache.druid.segment.IncrementalIndexSegment;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.timeline.SegmentId;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
public class StringColumnAggregationTest
{
@Rule
public final TemporaryFolder tempFolder = new TemporaryFolder();
private final String singleValue = "singleValue";
private final String multiValue = "multiValue";
private final int n = 10;
// results after aggregation
private long numRows;
private double singleValueSum;
private double multiValueSum;
private double singleValueMax;
private double multiValueMax;
private double singleValueMin;
private double multiValueMin;
private List<Segment> segments;
private AggregationTestHelper aggregationTestHelper;
@Before
public void setup() throws Exception
{
List<String> dimensions = ImmutableList.of(singleValue, multiValue);
List<InputRow> inputRows = new ArrayList<>(n);
for (int i = 1; i <= n; i++) {
String val = String.valueOf(i * 1.0d);
inputRows.add(new MapBasedInputRow(
DateTime.now(DateTimeZone.UTC),
dimensions,
ImmutableMap.of(
singleValue, val,
multiValue, Lists.newArrayList(val, null, val)
)
));
}
aggregationTestHelper = AggregationTestHelper.createGroupByQueryAggregationTestHelper(
Collections.EMPTY_LIST,
new GroupByQueryConfig(),
tempFolder
);
IncrementalIndex index = aggregationTestHelper.createIncrementalIndex(
inputRows.iterator(),
new NoopInputRowParser(null),
new AggregatorFactory[]{new CountAggregatorFactory("count")},
0,
Granularities.NONE,
false,
100,
false
);
this.segments = ImmutableList.of(
new IncrementalIndexSegment(index, SegmentId.dummy("test")),
aggregationTestHelper.persistIncrementalIndex(index, null)
);
// we have ingested arithmetic progression from 1 to 10, so sums can be computed using following
// All sum values are multiplied by 2 because we are running query on duplicated segment twice.
numRows = 2 * n;
singleValueSum = n * (n + 1);
multiValueSum = 2 * n * (n + 1);
singleValueMax = n;
multiValueMax = n;
singleValueMin = 1;
multiValueMin = 1;
}
@After
public void tearDown() throws Exception
{
if (segments != null) {
for (Segment seg : segments) {
CloseQuietly.close(seg);
}
}
}
@Test
public void testGroupBy() throws Exception
{
GroupByQuery query = new GroupByQuery.Builder()
.setDataSource("test")
.setGranularity(Granularities.ALL)
.setInterval("1970/2050")
.setAggregatorSpecs(
new DoubleSumAggregatorFactory("singleDoubleSum", singleValue),
new DoubleSumAggregatorFactory("multiDoubleSum", multiValue),
new DoubleMaxAggregatorFactory("singleDoubleMax", singleValue),
new DoubleMaxAggregatorFactory("multiDoubleMax", multiValue),
new DoubleMinAggregatorFactory("singleDoubleMin", singleValue),
new DoubleMinAggregatorFactory("multiDoubleMin", multiValue),
new LongSumAggregatorFactory("count", "count")
)
.build();
Sequence<ResultRow> seq = aggregationTestHelper.runQueryOnSegmentsObjs(segments, query);
Row result = Iterables.getOnlyElement(seq.toList()).toMapBasedRow(query);
Assert.assertEquals(numRows, result.getMetric("count").longValue());
Assert.assertEquals(singleValueSum, result.getMetric("singleDoubleSum").doubleValue(), 0.0001d);
Assert.assertEquals(multiValueSum, result.getMetric("multiDoubleSum").doubleValue(), 0.0001d);
Assert.assertEquals(singleValueMax, result.getMetric("singleDoubleMax").doubleValue(), 0.0001d);
Assert.assertEquals(multiValueMax, result.getMetric("multiDoubleMax").doubleValue(), 0.0001d);
Assert.assertEquals(singleValueMin, result.getMetric("singleDoubleMin").doubleValue(), 0.0001d);
Assert.assertEquals(multiValueMin, result.getMetric("multiDoubleMin").doubleValue(), 0.0001d);
}
@Test
public void testTimeseries() throws Exception
{
TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
.dataSource("test")
.granularity(Granularities.ALL)
.intervals("1970/2050")
.aggregators(
new DoubleSumAggregatorFactory("singleDoubleSum", singleValue),
new DoubleSumAggregatorFactory("multiDoubleSum", multiValue),
new DoubleMaxAggregatorFactory("singleDoubleMax", singleValue),
new DoubleMaxAggregatorFactory("multiDoubleMax", multiValue),
new DoubleMinAggregatorFactory("singleDoubleMin", singleValue),
new DoubleMinAggregatorFactory("multiDoubleMin", multiValue),
new LongSumAggregatorFactory("count", "count")
)
.build();
Sequence seq = AggregationTestHelper.createTimeseriesQueryAggregationTestHelper(Collections.EMPTY_LIST, tempFolder)
.runQueryOnSegmentsObjs(segments, query);
TimeseriesResultValue result = ((Result<TimeseriesResultValue>) Iterables.getOnlyElement(seq.toList())).getValue();
Assert.assertEquals(numRows, result.getLongMetric("count").longValue());
Assert.assertEquals(singleValueSum, result.getDoubleMetric("singleDoubleSum").doubleValue(), 0.0001d);
Assert.assertEquals(multiValueSum, result.getDoubleMetric("multiDoubleSum").doubleValue(), 0.0001d);
Assert.assertEquals(singleValueMax, result.getDoubleMetric("singleDoubleMax").doubleValue(), 0.0001d);
Assert.assertEquals(multiValueMax, result.getDoubleMetric("multiDoubleMax").doubleValue(), 0.0001d);
Assert.assertEquals(singleValueMin, result.getDoubleMetric("singleDoubleMin").doubleValue(), 0.0001d);
Assert.assertEquals(multiValueMin, result.getDoubleMetric("multiDoubleMin").doubleValue(), 0.0001d);
}
}