mirror of
https://github.com/apache/druid.git
synced 2025-02-17 07:25:02 +00:00
All aggregators should check if column can be vectorize (#10026)
* All aggregators should use vectorization-aware column processor * All aggregators should use vectorization-aware column processor * fix canVectorize * fix canVectorize * add tests * revert back default * address comment * address comments * address comment * address comment
This commit is contained in:
parent
1a2620606d
commit
7569ee3ec6
@ -24,6 +24,7 @@ import org.apache.druid.java.util.common.Cacheable;
|
||||
import org.apache.druid.java.util.common.UOE;
|
||||
import org.apache.druid.java.util.common.logger.Logger;
|
||||
import org.apache.druid.query.PerSegmentQueryOptimizationContext;
|
||||
import org.apache.druid.segment.ColumnInspector;
|
||||
import org.apache.druid.segment.ColumnSelectorFactory;
|
||||
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
|
||||
|
||||
@ -68,7 +69,7 @@ public abstract class AggregatorFactory implements Cacheable
|
||||
/**
|
||||
* Returns whether or not this aggregation class supports vectorization. The default implementation returns false.
|
||||
*/
|
||||
public boolean canVectorize()
|
||||
public boolean canVectorize(ColumnInspector columnInspector)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import org.apache.druid.segment.ColumnInspector;
|
||||
import org.apache.druid.segment.ColumnSelectorFactory;
|
||||
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
|
||||
|
||||
@ -72,7 +73,7 @@ public class CountAggregatorFactory extends AggregatorFactory
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean canVectorize()
|
||||
public boolean canVectorize(ColumnInspector columnInspector)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
@ -25,6 +25,9 @@ import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
import org.apache.druid.math.expr.ExprMacroTable;
|
||||
import org.apache.druid.segment.BaseDoubleColumnValueSelector;
|
||||
import org.apache.druid.segment.ColumnInspector;
|
||||
import org.apache.druid.segment.column.ColumnCapabilities;
|
||||
import org.apache.druid.segment.column.ValueType;
|
||||
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
|
||||
import org.apache.druid.segment.vector.VectorValueSelector;
|
||||
|
||||
@ -79,8 +82,12 @@ public class DoubleSumAggregatorFactory extends SimpleDoubleAggregatorFactory
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean canVectorize()
|
||||
public boolean canVectorize(ColumnInspector columnInspector)
|
||||
{
|
||||
if (fieldName != null) {
|
||||
final ColumnCapabilities capabilities = columnInspector.getColumnCapabilities(fieldName);
|
||||
return expression == null && (capabilities == null || ValueType.isNumeric(capabilities.getType()));
|
||||
}
|
||||
return expression == null;
|
||||
}
|
||||
|
||||
|
@ -29,6 +29,7 @@ import org.apache.druid.query.filter.Filter;
|
||||
import org.apache.druid.query.filter.IntervalDimFilter;
|
||||
import org.apache.druid.query.filter.ValueMatcher;
|
||||
import org.apache.druid.query.filter.vector.VectorValueMatcher;
|
||||
import org.apache.druid.segment.ColumnInspector;
|
||||
import org.apache.druid.segment.ColumnSelectorFactory;
|
||||
import org.apache.druid.segment.column.ColumnHolder;
|
||||
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
|
||||
@ -98,7 +99,7 @@ public class FilteredAggregatorFactory extends AggregatorFactory
|
||||
@Override
|
||||
public VectorAggregator factorizeVector(VectorColumnSelectorFactory columnSelectorFactory)
|
||||
{
|
||||
Preconditions.checkState(canVectorize(), "Cannot vectorize");
|
||||
Preconditions.checkState(canVectorize(columnSelectorFactory), "Cannot vectorize");
|
||||
final VectorValueMatcher valueMatcher = filter.makeVectorMatcher(columnSelectorFactory);
|
||||
return new FilteredVectorAggregator(
|
||||
valueMatcher,
|
||||
@ -107,9 +108,9 @@ public class FilteredAggregatorFactory extends AggregatorFactory
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean canVectorize()
|
||||
public boolean canVectorize(ColumnInspector columnInspector)
|
||||
{
|
||||
return delegate.canVectorize() && filter.canVectorizeMatcher();
|
||||
return delegate.canVectorize(columnInspector) && filter.canVectorizeMatcher();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -25,6 +25,8 @@ import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
import org.apache.druid.math.expr.ExprMacroTable;
|
||||
import org.apache.druid.segment.BaseFloatColumnValueSelector;
|
||||
import org.apache.druid.segment.ColumnInspector;
|
||||
import org.apache.druid.segment.column.ColumnCapabilities;
|
||||
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
|
||||
import org.apache.druid.segment.vector.VectorValueSelector;
|
||||
|
||||
@ -78,12 +80,15 @@ public class FloatSumAggregatorFactory extends SimpleFloatAggregatorFactory
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean canVectorize()
|
||||
public boolean canVectorize(ColumnInspector columnInspector)
|
||||
{
|
||||
if (fieldName != null) {
|
||||
final ColumnCapabilities capabilities = columnInspector.getColumnCapabilities(fieldName);
|
||||
return expression == null && (capabilities == null || capabilities.getType().isNumeric());
|
||||
}
|
||||
return expression == null;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
protected VectorAggregator factorizeVector(
|
||||
VectorColumnSelectorFactory columnSelectorFactory,
|
||||
|
@ -25,6 +25,8 @@ import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
import org.apache.druid.math.expr.ExprMacroTable;
|
||||
import org.apache.druid.segment.BaseLongColumnValueSelector;
|
||||
import org.apache.druid.segment.ColumnInspector;
|
||||
import org.apache.druid.segment.column.ColumnCapabilities;
|
||||
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
|
||||
import org.apache.druid.segment.vector.VectorValueSelector;
|
||||
|
||||
@ -87,8 +89,12 @@ public class LongSumAggregatorFactory extends SimpleLongAggregatorFactory
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean canVectorize()
|
||||
public boolean canVectorize(ColumnInspector columnInspector)
|
||||
{
|
||||
if (fieldName != null) {
|
||||
final ColumnCapabilities capabilities = columnInspector.getColumnCapabilities(fieldName);
|
||||
return expression == null && (capabilities == null || capabilities.getType().isNumeric());
|
||||
}
|
||||
return expression == null;
|
||||
}
|
||||
|
||||
|
@ -65,7 +65,7 @@ public abstract class NullableNumericAggregatorFactory<T extends BaseNullableCol
|
||||
@Override
|
||||
public final VectorAggregator factorizeVector(VectorColumnSelectorFactory columnSelectorFactory)
|
||||
{
|
||||
Preconditions.checkState(canVectorize(), "Cannot vectorize");
|
||||
Preconditions.checkState(canVectorize(columnSelectorFactory), "Cannot vectorize");
|
||||
VectorValueSelector selector = vectorSelector(columnSelectorFactory);
|
||||
VectorAggregator aggregator = factorizeVector(columnSelectorFactory, selector);
|
||||
return NullHandling.replaceWithDefault() ? aggregator : new NullableNumericVectorAggregator(aggregator, selector);
|
||||
@ -135,12 +135,11 @@ public abstract class NullableNumericAggregatorFactory<T extends BaseNullableCol
|
||||
* @see BufferAggregator
|
||||
*/
|
||||
protected VectorAggregator factorizeVector(
|
||||
// Not used by current aggregators, but here for parity with "factorizeBuffered".
|
||||
@SuppressWarnings("unused") VectorColumnSelectorFactory columnSelectorFactory,
|
||||
VectorColumnSelectorFactory columnSelectorFactory,
|
||||
VectorValueSelector selector
|
||||
)
|
||||
{
|
||||
if (!canVectorize()) {
|
||||
if (!canVectorize(columnSelectorFactory)) {
|
||||
throw new UnsupportedOperationException("Cannot vectorize");
|
||||
} else {
|
||||
throw new UnsupportedOperationException("canVectorize returned true but 'factorizeVector' is not implemented");
|
||||
|
@ -22,6 +22,7 @@ package org.apache.druid.query.aggregation;
|
||||
import org.apache.druid.query.PerSegmentQueryOptimizationContext;
|
||||
import org.apache.druid.query.cache.CacheKeyBuilder;
|
||||
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
|
||||
import org.apache.druid.segment.ColumnInspector;
|
||||
import org.apache.druid.segment.ColumnSelectorFactory;
|
||||
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
|
||||
|
||||
@ -70,9 +71,9 @@ public class SuppressedAggregatorFactory extends AggregatorFactory
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean canVectorize()
|
||||
public boolean canVectorize(ColumnInspector columnInspector)
|
||||
{
|
||||
return delegate.canVectorize();
|
||||
return delegate.canVectorize(columnInspector);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -38,6 +38,7 @@ import org.apache.druid.query.aggregation.VectorAggregator;
|
||||
import org.apache.druid.query.aggregation.cardinality.HyperLogLogCollectorAggregateCombiner;
|
||||
import org.apache.druid.query.cache.CacheKeyBuilder;
|
||||
import org.apache.druid.segment.BaseObjectColumnValueSelector;
|
||||
import org.apache.druid.segment.ColumnInspector;
|
||||
import org.apache.druid.segment.ColumnSelectorFactory;
|
||||
import org.apache.druid.segment.NilColumnValueSelector;
|
||||
import org.apache.druid.segment.column.ColumnCapabilities;
|
||||
@ -140,7 +141,7 @@ public class HyperUniquesAggregatorFactory extends AggregatorFactory
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean canVectorize()
|
||||
public boolean canVectorize(ColumnInspector columnInspector)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
@ -31,7 +31,9 @@ import org.apache.druid.query.aggregation.AggregatorUtil;
|
||||
import org.apache.druid.query.aggregation.BufferAggregator;
|
||||
import org.apache.druid.query.aggregation.VectorAggregator;
|
||||
import org.apache.druid.query.cache.CacheKeyBuilder;
|
||||
import org.apache.druid.segment.ColumnInspector;
|
||||
import org.apache.druid.segment.ColumnSelectorFactory;
|
||||
import org.apache.druid.segment.column.ColumnCapabilities;
|
||||
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
@ -106,9 +108,10 @@ public class DoubleMeanAggregatorFactory extends AggregatorFactory
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean canVectorize()
|
||||
public boolean canVectorize(ColumnInspector columnInspector)
|
||||
{
|
||||
return true;
|
||||
final ColumnCapabilities capabilities = columnInspector.getColumnCapabilities(fieldName);
|
||||
return capabilities == null || capabilities.getType().isNumeric();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -28,7 +28,6 @@ import org.apache.druid.java.util.common.guava.Sequence;
|
||||
import org.apache.druid.java.util.common.parsers.CloseableIterator;
|
||||
import org.apache.druid.query.QueryConfig;
|
||||
import org.apache.druid.query.aggregation.AggregatorAdapters;
|
||||
import org.apache.druid.query.aggregation.AggregatorFactory;
|
||||
import org.apache.druid.query.dimension.DimensionSpec;
|
||||
import org.apache.druid.query.filter.Filter;
|
||||
import org.apache.druid.query.groupby.GroupByQuery;
|
||||
@ -85,7 +84,7 @@ public class VectorGroupByEngine
|
||||
|
||||
return GroupByQueryEngineV2.isAllSingleValueDims(adapter::getColumnCapabilities, query.getDimensions(), true)
|
||||
&& query.getDimensions().stream().allMatch(DimensionSpec::canVectorize)
|
||||
&& query.getAggregatorSpecs().stream().allMatch(AggregatorFactory::canVectorize)
|
||||
&& query.getAggregatorSpecs().stream().allMatch(aggregatorFactory -> aggregatorFactory.canVectorize(adapter))
|
||||
&& adapter.canVectorize(filter, query.getVirtualColumns(), false);
|
||||
}
|
||||
|
||||
|
@ -102,7 +102,7 @@ public class TimeseriesQueryEngine
|
||||
|
||||
final boolean doVectorize = queryConfigToUse.getVectorize().shouldVectorize(
|
||||
adapter.canVectorize(filter, query.getVirtualColumns(), descending)
|
||||
&& query.getAggregatorSpecs().stream().allMatch(AggregatorFactory::canVectorize)
|
||||
&& query.getAggregatorSpecs().stream().allMatch(aggregatorFactory -> aggregatorFactory.canVectorize(adapter))
|
||||
);
|
||||
|
||||
final Sequence<Result<TimeseriesResultValue>> result;
|
||||
|
@ -0,0 +1,37 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.segment;
|
||||
|
||||
import org.apache.druid.segment.column.ColumnCapabilities;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
|
||||
public interface ColumnInspector
|
||||
{
|
||||
/**
|
||||
* Returns capabilities of a particular column.
|
||||
*
|
||||
* @param column column name
|
||||
*
|
||||
* @return capabilities, or null
|
||||
*/
|
||||
@Nullable
|
||||
ColumnCapabilities getColumnCapabilities(String column);
|
||||
}
|
@ -31,7 +31,7 @@ import javax.annotation.Nullable;
|
||||
* @see org.apache.druid.segment.vector.VectorColumnSelectorFactory, the vectorized version
|
||||
*/
|
||||
@PublicApi
|
||||
public interface ColumnSelectorFactory
|
||||
public interface ColumnSelectorFactory extends ColumnInspector
|
||||
{
|
||||
DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec);
|
||||
|
||||
@ -50,6 +50,7 @@ public interface ColumnSelectorFactory
|
||||
*
|
||||
* @return capabilities, or null
|
||||
*/
|
||||
@Override
|
||||
@Nullable
|
||||
ColumnCapabilities getColumnCapabilities(String column);
|
||||
}
|
||||
|
@ -30,7 +30,7 @@ import javax.annotation.Nullable;
|
||||
/**
|
||||
*/
|
||||
@PublicApi
|
||||
public interface StorageAdapter extends CursorFactory
|
||||
public interface StorageAdapter extends CursorFactory, ColumnInspector
|
||||
{
|
||||
Interval getInterval();
|
||||
Indexed<String> getAvailableDimensions();
|
||||
@ -62,6 +62,7 @@ public interface StorageAdapter extends CursorFactory
|
||||
*
|
||||
* @return capabilities, or null
|
||||
*/
|
||||
@Override
|
||||
@Nullable
|
||||
ColumnCapabilities getColumnCapabilities(String column);
|
||||
|
||||
|
@ -20,6 +20,7 @@
|
||||
package org.apache.druid.segment.vector;
|
||||
|
||||
import org.apache.druid.query.dimension.DimensionSpec;
|
||||
import org.apache.druid.segment.ColumnInspector;
|
||||
import org.apache.druid.segment.column.ColumnCapabilities;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
@ -29,7 +30,7 @@ import javax.annotation.Nullable;
|
||||
*
|
||||
* @see org.apache.druid.segment.ColumnSelectorFactory, the non-vectorized version.
|
||||
*/
|
||||
public interface VectorColumnSelectorFactory
|
||||
public interface VectorColumnSelectorFactory extends ColumnInspector
|
||||
{
|
||||
/**
|
||||
* Returns a {@link VectorSizeInspector} for the {@link VectorCursor} that generated this object.
|
||||
@ -72,6 +73,7 @@ public interface VectorColumnSelectorFactory
|
||||
*
|
||||
* @return capabilities, or null if the column doesn't exist.
|
||||
*/
|
||||
@Override
|
||||
@Nullable
|
||||
ColumnCapabilities getColumnCapabilities(String column);
|
||||
}
|
||||
|
@ -21,7 +21,10 @@ package org.apache.druid.query;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.io.Closeables;
|
||||
import junitparams.JUnitParamsRunner;
|
||||
import junitparams.Parameters;
|
||||
import org.apache.druid.common.config.NullHandling;
|
||||
import org.apache.druid.data.input.InputRow;
|
||||
import org.apache.druid.data.input.impl.DimensionsSpec;
|
||||
@ -35,6 +38,7 @@ import org.apache.druid.java.util.common.guava.FunctionalIterable;
|
||||
import org.apache.druid.java.util.common.guava.Sequence;
|
||||
import org.apache.druid.query.aggregation.CountAggregatorFactory;
|
||||
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
|
||||
import org.apache.druid.query.aggregation.FloatSumAggregatorFactory;
|
||||
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
|
||||
import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
|
||||
import org.apache.druid.query.expression.TestExprMacroTable;
|
||||
@ -55,6 +59,7 @@ import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.TemporaryFolder;
|
||||
import org.junit.runner.RunWith;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
@ -64,12 +69,18 @@ import java.util.Map;
|
||||
/**
|
||||
* Tests designed to exercise changing column types, adding columns, removing columns, etc.
|
||||
*/
|
||||
@RunWith(JUnitParamsRunner.class)
|
||||
public class SchemaEvolutionTest
|
||||
{
|
||||
private static final String DATA_SOURCE = "foo";
|
||||
private static final String TIMESTAMP_COLUMN = "t";
|
||||
private static final double THIRTY_ONE_POINT_ONE = 31.1d;
|
||||
|
||||
public Object[] doVectorize()
|
||||
{
|
||||
return Lists.newArrayList(true, false).toArray();
|
||||
}
|
||||
|
||||
public static List<Result<TimeseriesResultValue>> timeseriesResult(final Map<String, ?> map)
|
||||
{
|
||||
return ImmutableList.of(new Result<>(DateTimes.of("2000"), new TimeseriesResultValue((Map<String, Object>) map)));
|
||||
@ -137,6 +148,8 @@ public class SchemaEvolutionTest
|
||||
@Before
|
||||
public void setUp() throws IOException
|
||||
{
|
||||
NullHandling.initializeForTests();
|
||||
|
||||
// Index1: c1 is a string, c2 nonexistent, "uniques" nonexistent
|
||||
index1 = IndexBuilder.create()
|
||||
.tmpDir(temporaryFolder.newFolder())
|
||||
@ -209,7 +222,8 @@ public class SchemaEvolutionTest
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHyperUniqueEvolutionTimeseries()
|
||||
@Parameters(method = "doVectorize")
|
||||
public void testHyperUniqueEvolutionTimeseries(boolean doVectorize)
|
||||
{
|
||||
final TimeseriesQueryRunnerFactory factory = QueryRunnerTestHelper.newTimeseriesQueryRunnerFactory();
|
||||
|
||||
@ -222,6 +236,7 @@ public class SchemaEvolutionTest
|
||||
new HyperUniquesAggregatorFactory("uniques", "uniques")
|
||||
)
|
||||
)
|
||||
.context(ImmutableMap.of(QueryContexts.VECTORIZE_KEY, doVectorize))
|
||||
.build();
|
||||
|
||||
// index1 has no "uniques" column
|
||||
@ -238,7 +253,8 @@ public class SchemaEvolutionTest
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNumericEvolutionTimeseriesAggregation()
|
||||
@Parameters(method = "doVectorize")
|
||||
public void testNumericEvolutionTimeseriesAggregation(boolean doVectorize)
|
||||
{
|
||||
final TimeseriesQueryRunnerFactory factory = QueryRunnerTestHelper.newTimeseriesQueryRunnerFactory();
|
||||
|
||||
@ -256,6 +272,7 @@ public class SchemaEvolutionTest
|
||||
new DoubleSumAggregatorFactory("d", null, "c1 * 1", TestExprMacroTable.INSTANCE)
|
||||
)
|
||||
)
|
||||
.context(ImmutableMap.of(QueryContexts.VECTORIZE_KEY, doVectorize))
|
||||
.build();
|
||||
|
||||
// Only string(1)
|
||||
@ -313,7 +330,8 @@ public class SchemaEvolutionTest
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNumericEvolutionFiltering()
|
||||
@Parameters(method = "doVectorize")
|
||||
public void testNumericEvolutionFiltering(boolean doVectorize)
|
||||
{
|
||||
final TimeseriesQueryRunnerFactory factory = QueryRunnerTestHelper.newTimeseriesQueryRunnerFactory();
|
||||
|
||||
@ -328,26 +346,28 @@ public class SchemaEvolutionTest
|
||||
ImmutableList.of(
|
||||
new LongSumAggregatorFactory("a", "c1"),
|
||||
new DoubleSumAggregatorFactory("b", "c1"),
|
||||
new FloatSumAggregatorFactory("d", "c1"),
|
||||
new CountAggregatorFactory("c")
|
||||
)
|
||||
)
|
||||
.context(ImmutableMap.of(QueryContexts.VECTORIZE_KEY, doVectorize))
|
||||
.build();
|
||||
|
||||
// Only string(1) -- which we can filter but not aggregate
|
||||
Assert.assertEquals(
|
||||
timeseriesResult(ImmutableMap.of("a", 19L, "b", 19.1, "c", 2L)),
|
||||
timeseriesResult(ImmutableMap.of("a", 19L, "b", 19.1, "c", 2L, "d", 19.1f)),
|
||||
runQuery(query, factory, ImmutableList.of(index1))
|
||||
);
|
||||
|
||||
// Only long(2) -- which we can filter and aggregate
|
||||
Assert.assertEquals(
|
||||
timeseriesResult(ImmutableMap.of("a", 19L, "b", 19.0, "c", 2L)),
|
||||
timeseriesResult(ImmutableMap.of("a", 19L, "b", 19.0, "c", 2L, "d", 19.0f)),
|
||||
runQuery(query, factory, ImmutableList.of(index2))
|
||||
);
|
||||
|
||||
// Only float(3) -- which we can't filter, but can aggregate
|
||||
Assert.assertEquals(
|
||||
timeseriesResult(ImmutableMap.of("a", 19L, "b", 19.1, "c", 2L)),
|
||||
timeseriesResult(ImmutableMap.of("a", 19L, "b", 19.1, "c", 2L, "d", 19.1f)),
|
||||
runQuery(query, factory, ImmutableList.of(index3))
|
||||
);
|
||||
|
||||
@ -359,7 +379,9 @@ public class SchemaEvolutionTest
|
||||
"b",
|
||||
NullHandling.defaultDoubleValue(),
|
||||
"c",
|
||||
0L
|
||||
0L,
|
||||
"d",
|
||||
NullHandling.defaultFloatValue()
|
||||
)),
|
||||
runQuery(query, factory, ImmutableList.of(index4))
|
||||
);
|
||||
@ -369,7 +391,8 @@ public class SchemaEvolutionTest
|
||||
timeseriesResult(ImmutableMap.of(
|
||||
"a", 57L,
|
||||
"b", 57.2,
|
||||
"c", 6L
|
||||
"c", 6L,
|
||||
"d", 57.20000076293945
|
||||
)),
|
||||
runQuery(query, factory, ImmutableList.of(index1, index2, index3, index4))
|
||||
);
|
||||
|
@ -21,12 +21,17 @@ package org.apache.druid.query.aggregation.mean;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Lists;
|
||||
import junitparams.JUnitParamsRunner;
|
||||
import junitparams.Parameters;
|
||||
import org.apache.druid.data.input.Row;
|
||||
import org.apache.druid.java.util.common.granularity.Granularities;
|
||||
import org.apache.druid.java.util.common.guava.Sequence;
|
||||
import org.apache.druid.query.Druids;
|
||||
import org.apache.druid.query.Query;
|
||||
import org.apache.druid.query.QueryContexts;
|
||||
import org.apache.druid.query.Result;
|
||||
import org.apache.druid.query.aggregation.AggregationTestHelper;
|
||||
import org.apache.druid.query.groupby.GroupByQuery;
|
||||
@ -42,12 +47,19 @@ import org.junit.Assert;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.TemporaryFolder;
|
||||
import org.junit.runner.RunWith;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
@RunWith(JUnitParamsRunner.class)
|
||||
public class DoubleMeanAggregationTest
|
||||
{
|
||||
public Object[] doVectorize()
|
||||
{
|
||||
return Lists.newArrayList(true, false).toArray();
|
||||
}
|
||||
|
||||
@Rule
|
||||
public final TemporaryFolder tempFolder = new TemporaryFolder();
|
||||
|
||||
@ -77,7 +89,8 @@ public class DoubleMeanAggregationTest
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBufferAggretatorUsingGroupByQuery() throws Exception
|
||||
@Parameters(method = "doVectorize")
|
||||
public void testBufferAggretatorUsingGroupByQuery(boolean doVectorize) throws Exception
|
||||
{
|
||||
GroupByQuery query = new GroupByQuery.Builder()
|
||||
.setDataSource("test")
|
||||
@ -88,6 +101,7 @@ public class DoubleMeanAggregationTest
|
||||
new DoubleMeanAggregatorFactory("meanOnString", SimpleTestIndex.SINGLE_VALUE_DOUBLE_AS_STRING_DIM),
|
||||
new DoubleMeanAggregatorFactory("meanOnMultiValue", SimpleTestIndex.MULTI_VALUE_DOUBLE_AS_STRING_DIM)
|
||||
)
|
||||
.setContext(ImmutableMap.of(QueryContexts.VECTORIZE_KEY, doVectorize))
|
||||
.build();
|
||||
|
||||
// do json serialization and deserialization of query to ensure there are no serde issues
|
||||
@ -103,7 +117,8 @@ public class DoubleMeanAggregationTest
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testVectorAggretatorUsingGroupByQueryOnDoubleColumn() throws Exception
|
||||
@Parameters(method = "doVectorize")
|
||||
public void testVectorAggretatorUsingGroupByQueryOnDoubleColumn(boolean doVectorize) throws Exception
|
||||
{
|
||||
GroupByQuery query = new GroupByQuery.Builder()
|
||||
.setDataSource("test")
|
||||
@ -112,7 +127,7 @@ public class DoubleMeanAggregationTest
|
||||
.setAggregatorSpecs(
|
||||
new DoubleMeanAggregatorFactory("meanOnDouble", SimpleTestIndex.DOUBLE_COL)
|
||||
)
|
||||
.setContext(Collections.singletonMap(GroupByQueryConfig.CTX_KEY_VECTORIZE, true))
|
||||
.setContext(Collections.singletonMap(GroupByQueryConfig.CTX_KEY_VECTORIZE, doVectorize))
|
||||
.build();
|
||||
|
||||
// do json serialization and deserialization of query to ensure there are no serde issues
|
||||
@ -126,7 +141,8 @@ public class DoubleMeanAggregationTest
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAggretatorUsingTimeseriesQuery() throws Exception
|
||||
@Parameters(method = "doVectorize")
|
||||
public void testAggretatorUsingTimeseriesQuery(boolean doVectorize) throws Exception
|
||||
{
|
||||
TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
|
||||
.dataSource("test")
|
||||
@ -143,6 +159,7 @@ public class DoubleMeanAggregationTest
|
||||
SimpleTestIndex.MULTI_VALUE_DOUBLE_AS_STRING_DIM
|
||||
)
|
||||
)
|
||||
.context(ImmutableMap.of(QueryContexts.VECTORIZE_KEY, doVectorize))
|
||||
.build();
|
||||
|
||||
// do json serialization and deserialization of query to ensure there are no serde issues
|
||||
|
Loading…
x
Reference in New Issue
Block a user