From 176da53996220776979f4ef7672ca4d31acb00b6 Mon Sep 17 00:00:00 2001 From: Himanshu Date: Tue, 13 Aug 2019 15:55:14 -0700 Subject: [PATCH] make double sum/min/max agg work on string columns (#8243) * make double sum/min/max agg work on string columns * style and compilation fixes * fix tests * address review comments * add comment on SimpleDoubleAggregatorFactory * make checkstyle happy --- .../druid/java/util/common/Numbers.java | 24 +++ .../query/aggregation/AggregatorUtil.java | 3 +- .../aggregation/DelegatingAggregator.java | 74 +++++++ .../DelegatingBufferAggregator.java | 95 +++++++++ .../DoubleMaxAggregatorFactory.java | 15 +- .../DoubleMinAggregatorFactory.java | 15 +- .../DoubleSumAggregatorFactory.java | 36 ++-- .../SimpleDoubleAggregatorFactory.java | 62 +++++- .../StringColumnDoubleAggregatorWrapper.java | 69 ++++++ ...ngColumnDoubleBufferAggregatorWrapper.java | 70 ++++++ ...ettableValueDoubleColumnValueSelector.java | 56 +++++ .../druid/query/SchemaEvolutionTest.java | 8 +- .../aggregation/AggregationTestHelper.java | 54 +++++ .../aggregation/DoubleMaxAggregationTest.java | 1 + .../aggregation/DoubleMinAggregationTest.java | 1 + .../StringColumnAggregationTest.java | 201 ++++++++++++++++++ 16 files changed, 732 insertions(+), 52 deletions(-) create mode 100644 processing/src/main/java/org/apache/druid/query/aggregation/DelegatingAggregator.java create mode 100644 processing/src/main/java/org/apache/druid/query/aggregation/DelegatingBufferAggregator.java create mode 100644 processing/src/main/java/org/apache/druid/query/aggregation/StringColumnDoubleAggregatorWrapper.java create mode 100644 processing/src/main/java/org/apache/druid/query/aggregation/StringColumnDoubleBufferAggregatorWrapper.java create mode 100644 processing/src/main/java/org/apache/druid/segment/selector/settable/SettableValueDoubleColumnValueSelector.java create mode 100644 processing/src/test/java/org/apache/druid/query/aggregation/StringColumnAggregationTest.java diff --git a/core/src/main/java/org/apache/druid/java/util/common/Numbers.java b/core/src/main/java/org/apache/druid/java/util/common/Numbers.java index c9d40d7df33..c40cb8eee61 100644 --- a/core/src/main/java/org/apache/druid/java/util/common/Numbers.java +++ b/core/src/main/java/org/apache/druid/java/util/common/Numbers.java @@ -19,6 +19,10 @@ package org.apache.druid.java.util.common; +import com.google.common.primitives.Doubles; + +import javax.annotation.Nullable; + public final class Numbers { /** @@ -92,6 +96,26 @@ public final class Numbers } } + /** + * Try parsing the given Number or String object val as double. + * @param val + * @param nullValue value to return when input was string type but not parseable into double value + * @return parsed double value + */ + public static double tryParseDouble(@Nullable Object val, double nullValue) + { + if (val == null) { + return nullValue; + } else if (val instanceof Number) { + return ((Number) val).doubleValue(); + } else if (val instanceof String) { + Double d = Doubles.tryParse((String) val); + return d == null ? nullValue : d.doubleValue(); + } else { + throw new IAE("Unknown object type [%s]", val.getClass().getName()); + } + } + public static int toIntExact(long value, String error) { if ((int) value != value) { diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorUtil.java b/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorUtil.java index ca0eb3ae22b..13a16d09468 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorUtil.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorUtil.java @@ -25,7 +25,6 @@ import org.apache.druid.java.util.common.Pair; import org.apache.druid.math.expr.Expr; import org.apache.druid.math.expr.ExprEval; import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; -import org.apache.druid.segment.BaseDoubleColumnValueSelector; import org.apache.druid.segment.BaseFloatColumnValueSelector; import org.apache.druid.segment.BaseLongColumnValueSelector; import org.apache.druid.segment.ColumnSelectorFactory; @@ -266,7 +265,7 @@ public class AggregatorUtil /** * Only one of fieldName and fieldExpression should be non-null */ - static BaseDoubleColumnValueSelector makeColumnValueSelectorWithDoubleDefault( + static ColumnValueSelector makeColumnValueSelectorWithDoubleDefault( final ColumnSelectorFactory metricFactory, @Nullable final String fieldName, @Nullable final Expr fieldExpression, diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/DelegatingAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/DelegatingAggregator.java new file mode 100644 index 00000000000..c1b4b409023 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/aggregation/DelegatingAggregator.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation; + +import javax.annotation.Nullable; + +/** + * An Aggregator that delegates everything. It is used by Aggregator wrappers e.g. + * {@link StringColumnDoubleAggregatorWrapper} that modify some behavior of a delegate. + */ +public abstract class DelegatingAggregator implements Aggregator +{ + protected Aggregator delegate; + + @Override + public void aggregate() + { + delegate.aggregate(); + } + + @Nullable + @Override + public Object get() + { + return delegate.get(); + } + + @Override + public float getFloat() + { + return delegate.getFloat(); + } + + @Override + public long getLong() + { + return delegate.getLong(); + } + + @Override + public double getDouble() + { + return delegate.getDouble(); + } + + @Override + public boolean isNull() + { + return delegate.isNull(); + } + + @Override + public void close() + { + delegate.close(); + } +} diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/DelegatingBufferAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/DelegatingBufferAggregator.java new file mode 100644 index 00000000000..9b1aa8086b2 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/aggregation/DelegatingBufferAggregator.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation; + +import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; + +/** + * A BufferAggregator that delegates everything. It is used by BufferAggregator wrappers e.g. + * {@link StringColumnDoubleBufferAggregatorWrapper} that modify some behavior of a delegate. + */ +public abstract class DelegatingBufferAggregator implements BufferAggregator +{ + protected BufferAggregator delegate; + + @Override + public void init(ByteBuffer buf, int position) + { + delegate.init(buf, position); + } + + @Override + public void aggregate(ByteBuffer buf, int position) + { + delegate.aggregate(buf, position); + } + + @Nullable + @Override + public Object get(ByteBuffer buf, int position) + { + return delegate.get(buf, position); + } + + @Override + public float getFloat(ByteBuffer buf, int position) + { + return delegate.getFloat(buf, position); + } + + @Override + public long getLong(ByteBuffer buf, int position) + { + return delegate.getLong(buf, position); + } + + @Override + public double getDouble(ByteBuffer buf, int position) + { + return delegate.getDouble(buf, position); + } + + @Override + public void close() + { + delegate.close(); + } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + delegate.inspectRuntimeShape(inspector); + } + + @Override + public void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, ByteBuffer newBuffer) + { + delegate.relocate(oldPosition, newPosition, oldBuffer, newBuffer); + } + + @Override + public boolean isNull(ByteBuffer buf, int position) + { + return delegate.isNull(buf, position); + } +} diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/DoubleMaxAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/DoubleMaxAggregatorFactory.java index 21562af0672..1c697c8c0dd 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/DoubleMaxAggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/DoubleMaxAggregatorFactory.java @@ -25,7 +25,6 @@ import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.math.expr.ExprMacroTable; import org.apache.druid.segment.BaseDoubleColumnValueSelector; -import org.apache.druid.segment.ColumnSelectorFactory; import javax.annotation.Nullable; import java.nio.ByteBuffer; @@ -53,25 +52,19 @@ public class DoubleMaxAggregatorFactory extends SimpleDoubleAggregatorFactory } @Override - protected BaseDoubleColumnValueSelector selector(ColumnSelectorFactory metricFactory) + protected double nullValue() { - return getDoubleColumnSelector( - metricFactory, - Double.NEGATIVE_INFINITY - ); + return Double.NEGATIVE_INFINITY; } @Override - protected Aggregator factorize(ColumnSelectorFactory metricFactory, BaseDoubleColumnValueSelector selector) + protected Aggregator buildAggregator(BaseDoubleColumnValueSelector selector) { return new DoubleMaxAggregator(selector); } @Override - protected BufferAggregator factorizeBuffered( - ColumnSelectorFactory metricFactory, - BaseDoubleColumnValueSelector selector - ) + protected BufferAggregator buildBufferAggregator(BaseDoubleColumnValueSelector selector) { return new DoubleMaxBufferAggregator(selector); } diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/DoubleMinAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/DoubleMinAggregatorFactory.java index f308db4efb7..d56d8ed00ee 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/DoubleMinAggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/DoubleMinAggregatorFactory.java @@ -25,7 +25,6 @@ import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.math.expr.ExprMacroTable; import org.apache.druid.segment.BaseDoubleColumnValueSelector; -import org.apache.druid.segment.ColumnSelectorFactory; import javax.annotation.Nullable; import java.nio.ByteBuffer; @@ -53,25 +52,19 @@ public class DoubleMinAggregatorFactory extends SimpleDoubleAggregatorFactory } @Override - protected BaseDoubleColumnValueSelector selector(ColumnSelectorFactory metricFactory) + protected double nullValue() { - return getDoubleColumnSelector( - metricFactory, - Double.POSITIVE_INFINITY - ); + return Double.POSITIVE_INFINITY; } @Override - protected Aggregator factorize(ColumnSelectorFactory metricFactory, BaseDoubleColumnValueSelector selector) + protected Aggregator buildAggregator(BaseDoubleColumnValueSelector selector) { return new DoubleMinAggregator(selector); } @Override - protected BufferAggregator factorizeBuffered( - ColumnSelectorFactory metricFactory, - BaseDoubleColumnValueSelector selector - ) + protected BufferAggregator buildBufferAggregator(BaseDoubleColumnValueSelector selector) { return new DoubleMinBufferAggregator(selector); } diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/DoubleSumAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/DoubleSumAggregatorFactory.java index 1a019d7b127..00bc89bfc26 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/DoubleSumAggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/DoubleSumAggregatorFactory.java @@ -25,7 +25,6 @@ import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.math.expr.ExprMacroTable; import org.apache.druid.segment.BaseDoubleColumnValueSelector; -import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.vector.VectorColumnSelectorFactory; import org.apache.druid.segment.vector.VectorValueSelector; @@ -55,41 +54,36 @@ public class DoubleSumAggregatorFactory extends SimpleDoubleAggregatorFactory } @Override - protected BaseDoubleColumnValueSelector selector(ColumnSelectorFactory metricFactory) + protected double nullValue() { - return getDoubleColumnSelector( - metricFactory, - 0.0d - ); + return 0.0d; } + @Override + protected Aggregator buildAggregator(BaseDoubleColumnValueSelector selector) + { + return new DoubleSumAggregator(selector); + } + + @Override + protected BufferAggregator buildBufferAggregator(BaseDoubleColumnValueSelector selector) + { + return new DoubleSumBufferAggregator(selector); + } + + @Override protected VectorValueSelector vectorSelector(VectorColumnSelectorFactory columnSelectorFactory) { return columnSelectorFactory.makeValueSelector(fieldName); } - @Override - protected Aggregator factorize(ColumnSelectorFactory metricFactory, BaseDoubleColumnValueSelector selector) - { - return new DoubleSumAggregator(selector); - } - @Override public boolean canVectorize() { return expression == null; } - @Override - protected BufferAggregator factorizeBuffered( - ColumnSelectorFactory metricFactory, - BaseDoubleColumnValueSelector selector - ) - { - return new DoubleSumBufferAggregator(selector); - } - @Override protected VectorAggregator factorizeVector( VectorColumnSelectorFactory columnSelectorFactory, diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/SimpleDoubleAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/SimpleDoubleAggregatorFactory.java index 6b3643abbee..f5586d0ff6a 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/SimpleDoubleAggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/SimpleDoubleAggregatorFactory.java @@ -29,7 +29,10 @@ import org.apache.druid.math.expr.ExprMacroTable; import org.apache.druid.math.expr.Parser; import org.apache.druid.segment.BaseDoubleColumnValueSelector; import org.apache.druid.segment.ColumnSelectorFactory; +import org.apache.druid.segment.ColumnValueSelector; +import org.apache.druid.segment.column.ColumnCapabilities; import org.apache.druid.segment.column.ColumnHolder; +import org.apache.druid.segment.column.ValueType; import javax.annotation.Nullable; import java.util.Collections; @@ -37,7 +40,13 @@ import java.util.Comparator; import java.util.List; import java.util.Objects; -public abstract class SimpleDoubleAggregatorFactory extends NullableAggregatorFactory +/** + * This is an abstract class inherited by various {@link AggregatorFactory} implementations that consume double input + * and produce double output on aggregation. + * It extends "NullableAggregatorFactory" instead of "NullableAggregatorFactory" + * to additionally support aggregation on single/multi value string column types. + */ +public abstract class SimpleDoubleAggregatorFactory extends NullableAggregatorFactory { protected final String name; @Nullable @@ -68,16 +77,57 @@ public abstract class SimpleDoubleAggregatorFactory extends NullableAggregatorFa ); } - protected BaseDoubleColumnValueSelector getDoubleColumnSelector(ColumnSelectorFactory metricFactory, double nullValue) + @Override + protected Aggregator factorize(ColumnSelectorFactory metricFactory, ColumnValueSelector selector) + { + if (shouldUseStringColumnAggregatorWrapper(metricFactory)) { + return new StringColumnDoubleAggregatorWrapper( + selector, + SimpleDoubleAggregatorFactory.this::buildAggregator, + nullValue() + ); + } else { + return buildAggregator(selector); + } + } + + @Override + protected BufferAggregator factorizeBuffered( + ColumnSelectorFactory metricFactory, + ColumnValueSelector selector + ) + { + if (shouldUseStringColumnAggregatorWrapper(metricFactory)) { + return new StringColumnDoubleBufferAggregatorWrapper( + selector, + SimpleDoubleAggregatorFactory.this::buildBufferAggregator, + nullValue() + ); + } else { + return buildBufferAggregator(selector); + } + } + + @Override + protected ColumnValueSelector selector(ColumnSelectorFactory metricFactory) { return AggregatorUtil.makeColumnValueSelectorWithDoubleDefault( metricFactory, fieldName, fieldExpression.get(), - nullValue + nullValue() ); } + private boolean shouldUseStringColumnAggregatorWrapper(ColumnSelectorFactory columnSelectorFactory) + { + if (fieldName != null) { + ColumnCapabilities capabilities = columnSelectorFactory.getColumnCapabilities(fieldName); + return capabilities != null && capabilities.getType() == ValueType.STRING; + } + return false; + } + @Override public Object deserialize(Object object) { @@ -184,4 +234,10 @@ public abstract class SimpleDoubleAggregatorFactory extends NullableAggregatorFa { return expression; } + + protected abstract double nullValue(); + + protected abstract Aggregator buildAggregator(BaseDoubleColumnValueSelector selector); + + protected abstract BufferAggregator buildBufferAggregator(BaseDoubleColumnValueSelector selector); } diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/StringColumnDoubleAggregatorWrapper.java b/processing/src/main/java/org/apache/druid/query/aggregation/StringColumnDoubleAggregatorWrapper.java new file mode 100644 index 00000000000..e970c94a028 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/aggregation/StringColumnDoubleAggregatorWrapper.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation; + +import org.apache.druid.java.util.common.Numbers; +import org.apache.druid.segment.BaseDoubleColumnValueSelector; +import org.apache.druid.segment.BaseObjectColumnValueSelector; +import org.apache.druid.segment.selector.settable.SettableValueDoubleColumnValueSelector; + +import java.util.List; +import java.util.function.Function; + +/** + * This class can be used to wrap Double Aggregator that consume double type columns to handle String type. + */ +public class StringColumnDoubleAggregatorWrapper extends DelegatingAggregator +{ + private final BaseObjectColumnValueSelector selector; + private final double nullValue; + private final SettableValueDoubleColumnValueSelector doubleSelector; + + public StringColumnDoubleAggregatorWrapper( + BaseObjectColumnValueSelector selector, + Function delegateBuilder, + double nullValue + ) + { + this.doubleSelector = new SettableValueDoubleColumnValueSelector(); + this.selector = selector; + this.nullValue = nullValue; + this.delegate = delegateBuilder.apply(doubleSelector); + } + + @Override + public void aggregate() + { + Object update = selector.getObject(); + + if (update == null) { + doubleSelector.setValue(nullValue); + delegate.aggregate(); + } else if (update instanceof List) { + for (Object o : (List) update) { + doubleSelector.setValue(Numbers.tryParseDouble(o, nullValue)); + delegate.aggregate(); + } + } else { + doubleSelector.setValue(Numbers.tryParseDouble(update, nullValue)); + delegate.aggregate(); + } + } +} diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/StringColumnDoubleBufferAggregatorWrapper.java b/processing/src/main/java/org/apache/druid/query/aggregation/StringColumnDoubleBufferAggregatorWrapper.java new file mode 100644 index 00000000000..fb58ad5cc49 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/aggregation/StringColumnDoubleBufferAggregatorWrapper.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation; + +import org.apache.druid.java.util.common.Numbers; +import org.apache.druid.segment.BaseDoubleColumnValueSelector; +import org.apache.druid.segment.BaseObjectColumnValueSelector; +import org.apache.druid.segment.selector.settable.SettableValueDoubleColumnValueSelector; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.function.Function; + +/** + * This class can be used to wrap Double BufferAggregator that consume double type columns to handle String type. + */ +public class StringColumnDoubleBufferAggregatorWrapper extends DelegatingBufferAggregator +{ + private final BaseObjectColumnValueSelector selector; + private final double nullValue; + private final SettableValueDoubleColumnValueSelector doubleSelector; + + public StringColumnDoubleBufferAggregatorWrapper( + BaseObjectColumnValueSelector selector, + Function delegateBuilder, + double nullValue + ) + { + this.doubleSelector = new SettableValueDoubleColumnValueSelector(); + this.selector = selector; + this.nullValue = nullValue; + this.delegate = delegateBuilder.apply(doubleSelector); + } + + @Override + public void aggregate(ByteBuffer buf, int position) + { + Object update = selector.getObject(); + + if (update == null) { + doubleSelector.setValue(nullValue); + delegate.aggregate(buf, position); + } else if (update instanceof List) { + for (Object o : (List) update) { + doubleSelector.setValue(Numbers.tryParseDouble(o, nullValue)); + delegate.aggregate(buf, position); + } + } else { + doubleSelector.setValue(Numbers.tryParseDouble(update, nullValue)); + delegate.aggregate(buf, position); + } + } +} diff --git a/processing/src/main/java/org/apache/druid/segment/selector/settable/SettableValueDoubleColumnValueSelector.java b/processing/src/main/java/org/apache/druid/segment/selector/settable/SettableValueDoubleColumnValueSelector.java new file mode 100644 index 00000000000..5b79c5289af --- /dev/null +++ b/processing/src/main/java/org/apache/druid/segment/selector/settable/SettableValueDoubleColumnValueSelector.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.selector.settable; + +import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; +import org.apache.druid.segment.BaseDoubleColumnValueSelector; +import org.apache.druid.segment.ColumnValueSelector; + +/** + * A BaseDoubleColumnValueSelector impl to return settable double value on calls to + * {@link ColumnValueSelector#getDouble()} + */ +public class SettableValueDoubleColumnValueSelector implements BaseDoubleColumnValueSelector +{ + private double value; + + @Override + public double getDouble() + { + return value; + } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + + } + + @Override + public boolean isNull() + { + return false; + } + + public void setValue(double value) + { + this.value = value; + } +} diff --git a/processing/src/test/java/org/apache/druid/query/SchemaEvolutionTest.java b/processing/src/test/java/org/apache/druid/query/SchemaEvolutionTest.java index 889e80f8a3a..157cb3ba0c3 100644 --- a/processing/src/test/java/org/apache/druid/query/SchemaEvolutionTest.java +++ b/processing/src/test/java/org/apache/druid/query/SchemaEvolutionTest.java @@ -261,7 +261,7 @@ public class SchemaEvolutionTest // Only string(1) // Note: Expressions implicitly cast strings to numbers, leading to the a/b vs c/d difference. Assert.assertEquals( - timeseriesResult(ImmutableMap.of("a", 0L, "b", 0.0, "c", 31L, "d", THIRTY_ONE_POINT_ONE)), + timeseriesResult(ImmutableMap.of("a", 0L, "b", THIRTY_ONE_POINT_ONE, "c", 31L, "d", THIRTY_ONE_POINT_ONE)), runQuery(query, factory, ImmutableList.of(index1)) ); @@ -293,7 +293,7 @@ public class SchemaEvolutionTest Assert.assertEquals( timeseriesResult(ImmutableMap.of( "a", 31L * 2, - "b", THIRTY_ONE_POINT_ONE + 31, + "b", THIRTY_ONE_POINT_ONE * 2 + 31, "c", 31L * 3, "d", THIRTY_ONE_POINT_ONE * 2 + 31 )), @@ -335,7 +335,7 @@ public class SchemaEvolutionTest // Only string(1) -- which we can filter but not aggregate Assert.assertEquals( - timeseriesResult(ImmutableMap.of("a", 0L, "b", 0.0, "c", 2L)), + timeseriesResult(ImmutableMap.of("a", 0L, "b", 19.1, "c", 2L)), runQuery(query, factory, ImmutableList.of(index1)) ); @@ -368,7 +368,7 @@ public class SchemaEvolutionTest Assert.assertEquals( timeseriesResult(ImmutableMap.of( "a", 38L, - "b", 38.1, + "b", 57.2, "c", 6L )), runQuery(query, factory, ImmutableList.of(index1, index2, index3, index4)) diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java b/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java index 717876976e5..61212813b54 100644 --- a/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java +++ b/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java @@ -571,6 +571,60 @@ public class AggregationTestHelper implements Closeable } } + public IncrementalIndex createIncrementalIndex( + Iterator rows, + InputRowParser parser, + final AggregatorFactory[] metrics, + long minTimestamp, + Granularity gran, + boolean deserializeComplexMetrics, + int maxRowCount, + boolean rollup + ) throws Exception + { + IncrementalIndex index = new IncrementalIndex.Builder() + .setIndexSchema( + new IncrementalIndexSchema.Builder() + .withMinTimestamp(minTimestamp) + .withQueryGranularity(gran) + .withMetrics(metrics) + .withRollup(rollup) + .build() + ) + .setDeserializeComplexMetrics(deserializeComplexMetrics) + .setMaxRowCount(maxRowCount) + .buildOnheap(); + + while (rows.hasNext()) { + Object row = rows.next(); + if (!index.canAppendRow()) { + throw new IAE("Can't add row to index"); + } + if (row instanceof String && parser instanceof StringInputRowParser) { + //Note: this is required because StringInputRowParser is InputRowParser as opposed to + //InputRowsParser + index.add(((StringInputRowParser) parser).parse((String) row)); + } else { + index.add(((List) parser.parseBatch(row)).get(0)); + } + } + + return index; + } + + public Segment persistIncrementalIndex( + IncrementalIndex index, + File outDir + ) throws Exception + { + if (outDir == null) { + outDir = tempFolder.newFolder(); + } + indexMerger.persist(index, outDir, new IndexSpec(), null); + + return new QueryableIndexSegment(indexIO.loadIndex(outDir), SegmentId.dummy("")); + } + //Simulates running group-by query on individual segments as historicals would do, json serialize the results //from each segment, later deserialize and merge and finally return the results public Sequence runQueryOnSegments(final List segmentDirs, final String queryJson) diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/DoubleMaxAggregationTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/DoubleMaxAggregationTest.java index dde8feec355..7342627a32b 100644 --- a/processing/src/test/java/org/apache/druid/query/aggregation/DoubleMaxAggregationTest.java +++ b/processing/src/test/java/org/apache/druid/query/aggregation/DoubleMaxAggregationTest.java @@ -50,6 +50,7 @@ public class DoubleMaxAggregationTest selector = new TestDoubleColumnSelectorImpl(values); colSelectorFactory = EasyMock.createMock(ColumnSelectorFactory.class); EasyMock.expect(colSelectorFactory.makeColumnValueSelector("nilly")).andReturn(selector); + EasyMock.expect(colSelectorFactory.getColumnCapabilities("nilly")).andReturn(null); EasyMock.replay(colSelectorFactory); } diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/DoubleMinAggregationTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/DoubleMinAggregationTest.java index a35ad336c2d..89a42896068 100644 --- a/processing/src/test/java/org/apache/druid/query/aggregation/DoubleMinAggregationTest.java +++ b/processing/src/test/java/org/apache/druid/query/aggregation/DoubleMinAggregationTest.java @@ -50,6 +50,7 @@ public class DoubleMinAggregationTest selector = new TestDoubleColumnSelectorImpl(values); colSelectorFactory = EasyMock.createMock(ColumnSelectorFactory.class); EasyMock.expect(colSelectorFactory.makeColumnValueSelector("nilly")).andReturn(selector); + EasyMock.expect(colSelectorFactory.getColumnCapabilities("nilly")).andReturn(null); EasyMock.replay(colSelectorFactory); } diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/StringColumnAggregationTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/StringColumnAggregationTest.java new file mode 100644 index 00000000000..69c1390179c --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/aggregation/StringColumnAggregationTest.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.MapBasedInputRow; +import org.apache.druid.data.input.Row; +import org.apache.druid.data.input.impl.NoopInputRowParser; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.common.guava.CloseQuietly; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.query.Druids; +import org.apache.druid.query.Result; +import org.apache.druid.query.groupby.GroupByQuery; +import org.apache.druid.query.groupby.GroupByQueryConfig; +import org.apache.druid.query.groupby.ResultRow; +import org.apache.druid.query.timeseries.TimeseriesQuery; +import org.apache.druid.query.timeseries.TimeseriesResultValue; +import org.apache.druid.segment.IncrementalIndexSegment; +import org.apache.druid.segment.Segment; +import org.apache.druid.segment.incremental.IncrementalIndex; +import org.apache.druid.timeline.SegmentId; +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +public class StringColumnAggregationTest +{ + @Rule + public final TemporaryFolder tempFolder = new TemporaryFolder(); + + private final String singleValue = "singleValue"; + private final String multiValue = "multiValue"; + + private final int n = 10; + + // results after aggregation + private long numRows; + private double singleValueSum; + private double multiValueSum; + private double singleValueMax; + private double multiValueMax; + private double singleValueMin; + private double multiValueMin; + + private List segments; + + private AggregationTestHelper aggregationTestHelper; + + @Before + public void setup() throws Exception + { + List dimensions = ImmutableList.of(singleValue, multiValue); + List inputRows = new ArrayList<>(n); + for (int i = 1; i <= n; i++) { + String val = String.valueOf(i * 1.0d); + + inputRows.add(new MapBasedInputRow( + DateTime.now(DateTimeZone.UTC), + dimensions, + ImmutableMap.of( + singleValue, val, + multiValue, Lists.newArrayList(val, null, val) + ) + )); + } + + aggregationTestHelper = AggregationTestHelper.createGroupByQueryAggregationTestHelper( + Collections.EMPTY_LIST, + new GroupByQueryConfig(), + tempFolder + ); + + IncrementalIndex index = aggregationTestHelper.createIncrementalIndex( + inputRows.iterator(), + new NoopInputRowParser(null), + new AggregatorFactory[]{new CountAggregatorFactory("count")}, + 0, + Granularities.NONE, + false, + 100, + false + ); + + this.segments = ImmutableList.of( + new IncrementalIndexSegment(index, SegmentId.dummy("test")), + aggregationTestHelper.persistIncrementalIndex(index, null) + ); + + // we have ingested arithmetic progression from 1 to 10, so sums can be computed using following + // All sum values are multiplied by 2 because we are running query on duplicated segment twice. + numRows = 2 * n; + singleValueSum = n * (n + 1); + multiValueSum = 2 * n * (n + 1); + singleValueMax = n; + multiValueMax = n; + singleValueMin = 1; + multiValueMin = 1; + } + + @After + public void tearDown() throws Exception + { + if (segments != null) { + for (Segment seg : segments) { + CloseQuietly.close(seg); + } + } + } + + @Test + public void testGroupBy() throws Exception + { + GroupByQuery query = new GroupByQuery.Builder() + .setDataSource("test") + .setGranularity(Granularities.ALL) + .setInterval("1970/2050") + .setAggregatorSpecs( + new DoubleSumAggregatorFactory("singleDoubleSum", singleValue), + new DoubleSumAggregatorFactory("multiDoubleSum", multiValue), + new DoubleMaxAggregatorFactory("singleDoubleMax", singleValue), + new DoubleMaxAggregatorFactory("multiDoubleMax", multiValue), + new DoubleMinAggregatorFactory("singleDoubleMin", singleValue), + new DoubleMinAggregatorFactory("multiDoubleMin", multiValue), + new LongSumAggregatorFactory("count", "count") + ) + .build(); + + Sequence seq = aggregationTestHelper.runQueryOnSegmentsObjs(segments, query); + Row result = Iterables.getOnlyElement(seq.toList()).toMapBasedRow(query); + + Assert.assertEquals(numRows, result.getMetric("count").longValue()); + Assert.assertEquals(singleValueSum, result.getMetric("singleDoubleSum").doubleValue(), 0.0001d); + Assert.assertEquals(multiValueSum, result.getMetric("multiDoubleSum").doubleValue(), 0.0001d); + Assert.assertEquals(singleValueMax, result.getMetric("singleDoubleMax").doubleValue(), 0.0001d); + Assert.assertEquals(multiValueMax, result.getMetric("multiDoubleMax").doubleValue(), 0.0001d); + Assert.assertEquals(singleValueMin, result.getMetric("singleDoubleMin").doubleValue(), 0.0001d); + Assert.assertEquals(multiValueMin, result.getMetric("multiDoubleMin").doubleValue(), 0.0001d); + } + + @Test + public void testTimeseries() throws Exception + { + TimeseriesQuery query = Druids.newTimeseriesQueryBuilder() + .dataSource("test") + .granularity(Granularities.ALL) + .intervals("1970/2050") + .aggregators( + new DoubleSumAggregatorFactory("singleDoubleSum", singleValue), + new DoubleSumAggregatorFactory("multiDoubleSum", multiValue), + new DoubleMaxAggregatorFactory("singleDoubleMax", singleValue), + new DoubleMaxAggregatorFactory("multiDoubleMax", multiValue), + new DoubleMinAggregatorFactory("singleDoubleMin", singleValue), + new DoubleMinAggregatorFactory("multiDoubleMin", multiValue), + new LongSumAggregatorFactory("count", "count") + ) + .build(); + + Sequence seq = AggregationTestHelper.createTimeseriesQueryAggregationTestHelper(Collections.EMPTY_LIST, tempFolder) + .runQueryOnSegmentsObjs(segments, query); + TimeseriesResultValue result = ((Result) Iterables.getOnlyElement(seq.toList())).getValue(); + + Assert.assertEquals(numRows, result.getLongMetric("count").longValue()); + Assert.assertEquals(singleValueSum, result.getDoubleMetric("singleDoubleSum").doubleValue(), 0.0001d); + Assert.assertEquals(multiValueSum, result.getDoubleMetric("multiDoubleSum").doubleValue(), 0.0001d); + Assert.assertEquals(singleValueMax, result.getDoubleMetric("singleDoubleMax").doubleValue(), 0.0001d); + Assert.assertEquals(multiValueMax, result.getDoubleMetric("multiDoubleMax").doubleValue(), 0.0001d); + Assert.assertEquals(singleValueMin, result.getDoubleMetric("singleDoubleMin").doubleValue(), 0.0001d); + Assert.assertEquals(multiValueMin, result.getDoubleMetric("multiDoubleMin").doubleValue(), 0.0001d); + } +}