mirror of https://github.com/apache/druid.git
Fix havingSpec on complex aggregators. (#5024)
* Fix havingSpec on complex aggregators. - Uses the technique from #4883 on DimFilterHavingSpec too. - Also uses Transformers from #4890, necessitating a move of that and other related classes from druid-server to druid-processing. They probably make more sense there anyway. - Adds a SQL query test. Fixes #4957. * Remove unused import.
This commit is contained in:
parent
e96daa2593
commit
6c725a7e06
|
@ -112,8 +112,8 @@ import io.druid.segment.QueryableIndex;
|
|||
import io.druid.segment.TestHelper;
|
||||
import io.druid.segment.column.DictionaryEncodedColumn;
|
||||
import io.druid.segment.indexing.DataSchema;
|
||||
import io.druid.segment.indexing.ExpressionTransform;
|
||||
import io.druid.segment.indexing.TransformSpec;
|
||||
import io.druid.segment.transform.ExpressionTransform;
|
||||
import io.druid.segment.transform.TransformSpec;
|
||||
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
|
||||
import io.druid.segment.loading.DataSegmentPusher;
|
||||
import io.druid.segment.loading.LocalDataSegmentPusher;
|
||||
|
|
|
@ -26,7 +26,7 @@ import com.google.common.collect.ImmutableList;
|
|||
import io.druid.java.util.common.JodaUtils;
|
||||
import io.druid.java.util.common.granularity.Granularity;
|
||||
import io.druid.query.filter.DimFilter;
|
||||
import io.druid.segment.indexing.TransformSpec;
|
||||
import io.druid.segment.transform.TransformSpec;
|
||||
import io.druid.timeline.DataSegment;
|
||||
import org.joda.time.Interval;
|
||||
|
||||
|
|
|
@ -47,7 +47,7 @@ import io.druid.segment.QueryableIndex;
|
|||
import io.druid.segment.QueryableIndexStorageAdapter;
|
||||
import io.druid.segment.StorageAdapter;
|
||||
import io.druid.segment.indexing.DataSchema;
|
||||
import io.druid.segment.indexing.TransformSpec;
|
||||
import io.druid.segment.transform.TransformSpec;
|
||||
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
|
||||
import io.druid.segment.loading.LocalDataSegmentPuller;
|
||||
import io.druid.segment.realtime.firehose.IngestSegmentFirehose;
|
||||
|
|
|
@ -36,8 +36,8 @@ import io.druid.query.aggregation.CountAggregatorFactory;
|
|||
import io.druid.query.filter.SelectorDimFilter;
|
||||
import io.druid.segment.TestHelper;
|
||||
import io.druid.segment.indexing.DataSchema;
|
||||
import io.druid.segment.indexing.ExpressionTransform;
|
||||
import io.druid.segment.indexing.TransformSpec;
|
||||
import io.druid.segment.transform.ExpressionTransform;
|
||||
import io.druid.segment.transform.TransformSpec;
|
||||
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.io.NullWritable;
|
||||
|
|
|
@ -42,7 +42,7 @@ import io.druid.java.util.common.parsers.ParseException;
|
|||
import io.druid.query.filter.DimFilter;
|
||||
import io.druid.segment.IndexIO;
|
||||
import io.druid.segment.QueryableIndexStorageAdapter;
|
||||
import io.druid.segment.indexing.TransformSpec;
|
||||
import io.druid.segment.transform.TransformSpec;
|
||||
import io.druid.segment.loading.SegmentLoadingException;
|
||||
import io.druid.segment.realtime.firehose.IngestSegmentFirehose;
|
||||
import io.druid.segment.realtime.firehose.WindowedStorageAdapter;
|
||||
|
|
|
@ -57,8 +57,8 @@ import io.druid.segment.IndexIO;
|
|||
import io.druid.segment.IndexMergerV9;
|
||||
import io.druid.segment.IndexSpec;
|
||||
import io.druid.segment.indexing.DataSchema;
|
||||
import io.druid.segment.indexing.ExpressionTransform;
|
||||
import io.druid.segment.indexing.TransformSpec;
|
||||
import io.druid.segment.transform.ExpressionTransform;
|
||||
import io.druid.segment.transform.TransformSpec;
|
||||
import io.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
|
||||
import io.druid.segment.indexing.granularity.GranularitySpec;
|
||||
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
|
||||
|
|
|
@ -103,10 +103,10 @@ import io.druid.query.timeseries.TimeseriesQueryRunnerFactory;
|
|||
import io.druid.query.timeseries.TimeseriesResultValue;
|
||||
import io.druid.segment.TestHelper;
|
||||
import io.druid.segment.indexing.DataSchema;
|
||||
import io.druid.segment.indexing.ExpressionTransform;
|
||||
import io.druid.segment.transform.ExpressionTransform;
|
||||
import io.druid.segment.indexing.RealtimeIOConfig;
|
||||
import io.druid.segment.indexing.RealtimeTuningConfig;
|
||||
import io.druid.segment.indexing.TransformSpec;
|
||||
import io.druid.segment.transform.TransformSpec;
|
||||
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
|
||||
import io.druid.segment.loading.SegmentLoaderConfig;
|
||||
import io.druid.segment.loading.SegmentLoaderLocalCacheManager;
|
||||
|
|
|
@ -71,8 +71,8 @@ import io.druid.segment.TestHelper;
|
|||
import io.druid.segment.column.Column;
|
||||
import io.druid.segment.incremental.IncrementalIndex;
|
||||
import io.druid.segment.incremental.IncrementalIndexSchema;
|
||||
import io.druid.segment.indexing.ExpressionTransform;
|
||||
import io.druid.segment.indexing.TransformSpec;
|
||||
import io.druid.segment.transform.ExpressionTransform;
|
||||
import io.druid.segment.transform.TransformSpec;
|
||||
import io.druid.segment.loading.DataSegmentArchiver;
|
||||
import io.druid.segment.loading.DataSegmentKiller;
|
||||
import io.druid.segment.loading.DataSegmentMover;
|
||||
|
|
|
@ -59,7 +59,7 @@ import io.druid.segment.IndexSpec;
|
|||
import io.druid.segment.incremental.IncrementalIndex;
|
||||
import io.druid.segment.incremental.IncrementalIndexSchema;
|
||||
import io.druid.segment.incremental.IndexSizeExceededException;
|
||||
import io.druid.segment.indexing.TransformSpec;
|
||||
import io.druid.segment.transform.TransformSpec;
|
||||
import io.druid.segment.loading.SegmentLoaderConfig;
|
||||
import io.druid.segment.loading.SegmentLoaderLocalCacheManager;
|
||||
import io.druid.segment.loading.StorageLocationConfig;
|
||||
|
|
|
@ -22,30 +22,44 @@ package io.druid.query.groupby.having;
|
|||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.base.Preconditions;
|
||||
import io.druid.common.guava.SettableSupplier;
|
||||
import io.druid.data.input.InputRow;
|
||||
import io.druid.data.input.Row;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import io.druid.query.filter.DimFilter;
|
||||
import io.druid.query.filter.ValueMatcher;
|
||||
import io.druid.query.groupby.RowBasedColumnSelectorFactory;
|
||||
import io.druid.segment.column.ValueType;
|
||||
import io.druid.segment.transform.RowFunction;
|
||||
import io.druid.segment.transform.Transform;
|
||||
import io.druid.segment.transform.TransformSpec;
|
||||
import io.druid.segment.transform.Transformer;
|
||||
import org.joda.time.DateTime;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
|
||||
public class DimFilterHavingSpec extends BaseHavingSpec
|
||||
{
|
||||
private final DimFilter dimFilter;
|
||||
private final SettableSupplier<Row> rowSupplier;
|
||||
private static final boolean DEFAULT_FINALIZE = true;
|
||||
|
||||
private ValueMatcher valueMatcher;
|
||||
private final DimFilter dimFilter;
|
||||
private final boolean finalize;
|
||||
|
||||
private Map<String, ValueType> rowSignature = new HashMap<>();
|
||||
private Map<String, AggregatorFactory> aggregators = new HashMap<>();
|
||||
private Transformer transformer = null;
|
||||
private int evalCount;
|
||||
|
||||
@JsonCreator
|
||||
public DimFilterHavingSpec(
|
||||
@JsonProperty("filter") final DimFilter dimFilter
|
||||
@JsonProperty("filter") final DimFilter dimFilter,
|
||||
@JsonProperty("finalize") final Boolean finalize
|
||||
)
|
||||
{
|
||||
this.dimFilter = Preconditions.checkNotNull(dimFilter, "filter");
|
||||
this.rowSupplier = new SettableSupplier<>();
|
||||
this.finalize = finalize == null ? DEFAULT_FINALIZE : finalize;
|
||||
}
|
||||
|
||||
@JsonProperty("filter")
|
||||
|
@ -54,11 +68,22 @@ public class DimFilterHavingSpec extends BaseHavingSpec
|
|||
return dimFilter;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public boolean isFinalize()
|
||||
{
|
||||
return finalize;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRowSignature(Map<String, ValueType> rowSignature)
|
||||
{
|
||||
this.valueMatcher = dimFilter.toFilter()
|
||||
.makeMatcher(RowBasedColumnSelectorFactory.create(rowSupplier, rowSignature));
|
||||
this.rowSignature = rowSignature;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setAggregators(final Map<String, AggregatorFactory> aggregators)
|
||||
{
|
||||
this.aggregators = aggregators;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -66,17 +91,23 @@ public class DimFilterHavingSpec extends BaseHavingSpec
|
|||
{
|
||||
int oldEvalCount = evalCount;
|
||||
evalCount++;
|
||||
rowSupplier.set(row);
|
||||
final boolean retVal = valueMatcher.matches();
|
||||
|
||||
if (transformer == null) {
|
||||
transformer = createTransformer(dimFilter, rowSignature, aggregators, finalize);
|
||||
}
|
||||
|
||||
final boolean retVal = transformer.transform(new RowAsInputRow(row)) != null;
|
||||
|
||||
if (evalCount != oldEvalCount + 1) {
|
||||
// Oops, someone was using this from two different threads, bad caller.
|
||||
throw new IllegalStateException("concurrent 'eval' calls not permitted!");
|
||||
}
|
||||
|
||||
return retVal;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o)
|
||||
public boolean equals(final Object o)
|
||||
{
|
||||
if (this == o) {
|
||||
return true;
|
||||
|
@ -84,16 +115,15 @@ public class DimFilterHavingSpec extends BaseHavingSpec
|
|||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
DimFilterHavingSpec that = (DimFilterHavingSpec) o;
|
||||
|
||||
return dimFilter.equals(that.dimFilter);
|
||||
final DimFilterHavingSpec that = (DimFilterHavingSpec) o;
|
||||
return finalize == that.finalize &&
|
||||
Objects.equals(dimFilter, that.dimFilter);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
return dimFilter.hashCode();
|
||||
return Objects.hash(dimFilter, finalize);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -101,6 +131,121 @@ public class DimFilterHavingSpec extends BaseHavingSpec
|
|||
{
|
||||
return "DimFilterHavingSpec{" +
|
||||
"dimFilter=" + dimFilter +
|
||||
", finalize=" + finalize +
|
||||
'}';
|
||||
}
|
||||
|
||||
private static Transformer createTransformer(
|
||||
final DimFilter filter,
|
||||
final Map<String, ValueType> rowSignature,
|
||||
final Map<String, AggregatorFactory> aggregators,
|
||||
final boolean finalize
|
||||
)
|
||||
{
|
||||
final List<Transform> transforms = new ArrayList<>();
|
||||
|
||||
if (finalize) {
|
||||
for (AggregatorFactory aggregator : aggregators.values()) {
|
||||
final String name = aggregator.getName();
|
||||
|
||||
transforms.add(
|
||||
new Transform()
|
||||
{
|
||||
@Override
|
||||
public String getName()
|
||||
{
|
||||
return name;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RowFunction getRowFunction()
|
||||
{
|
||||
return row -> aggregator.finalizeComputation(row.getRaw(name));
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
return new TransformSpec(filter, transforms).toTransformer(rowSignature);
|
||||
}
|
||||
|
||||
private static class RowAsInputRow implements InputRow
|
||||
{
|
||||
private final Row row;
|
||||
|
||||
public RowAsInputRow(final Row row)
|
||||
{
|
||||
this.row = row;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> getDimensions()
|
||||
{
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getTimestampFromEpoch()
|
||||
{
|
||||
return row.getTimestampFromEpoch();
|
||||
}
|
||||
|
||||
@Override
|
||||
public DateTime getTimestamp()
|
||||
{
|
||||
return row.getTimestamp();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> getDimension(final String dimension)
|
||||
{
|
||||
return row.getDimension(dimension);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object getRaw(final String dimension)
|
||||
{
|
||||
return row.getRaw(dimension);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Number getMetric(final String metric)
|
||||
{
|
||||
return row.getMetric(metric);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(final Row o)
|
||||
{
|
||||
return row.compareTo(o);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(final Object o)
|
||||
{
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
final RowAsInputRow that = (RowAsInputRow) o;
|
||||
return Objects.equals(row, that.row);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
return Objects.hash(row);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return "RowAsInputRow{" +
|
||||
"row=" + row +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.segment.indexing;
|
||||
package io.druid.segment.transform;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JacksonInject;
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
|
@ -17,7 +17,7 @@
|
|||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.segment.indexing;
|
||||
package io.druid.segment.transform;
|
||||
|
||||
import io.druid.data.input.Row;
|
||||
|
|
@ -17,7 +17,7 @@
|
|||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.segment.indexing;
|
||||
package io.druid.segment.transform;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonSubTypes;
|
||||
import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
|
@ -17,7 +17,7 @@
|
|||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.segment.indexing;
|
||||
package io.druid.segment.transform;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
|
@ -26,10 +26,12 @@ import io.druid.data.input.impl.InputRowParser;
|
|||
import io.druid.data.input.impl.StringInputRowParser;
|
||||
import io.druid.java.util.common.ISE;
|
||||
import io.druid.query.filter.DimFilter;
|
||||
import io.druid.segment.column.ValueType;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
|
||||
|
@ -111,9 +113,22 @@ public class TransformSpec
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a {@link Transformer} from this TransformSpec, when the rows to be transformed do not have a known
|
||||
* signature.
|
||||
*/
|
||||
public Transformer toTransformer()
|
||||
{
|
||||
return new Transformer(this);
|
||||
return new Transformer(this, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a {@link Transformer} from this TransformSpec, taking advantage of the known signature of the rows
|
||||
* to be transformed.
|
||||
*/
|
||||
public Transformer toTransformer(@Nullable final Map<String, ValueType> rowSignature)
|
||||
{
|
||||
return new Transformer(this, rowSignature);
|
||||
}
|
||||
|
||||
@Override
|
|
@ -17,7 +17,7 @@
|
|||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.segment.indexing;
|
||||
package io.druid.segment.transform;
|
||||
|
||||
import io.druid.data.input.InputRow;
|
||||
import io.druid.data.input.Row;
|
||||
|
@ -26,6 +26,7 @@ import io.druid.java.util.common.DateTimes;
|
|||
import io.druid.query.filter.ValueMatcher;
|
||||
import io.druid.query.groupby.RowBasedColumnSelectorFactory;
|
||||
import io.druid.segment.column.Column;
|
||||
import io.druid.segment.column.ValueType;
|
||||
import org.joda.time.DateTime;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
|
@ -43,7 +44,7 @@ public class Transformer
|
|||
private final ThreadLocal<Row> rowSupplierForValueMatcher = new ThreadLocal<>();
|
||||
private final ValueMatcher valueMatcher;
|
||||
|
||||
Transformer(final TransformSpec transformSpec)
|
||||
Transformer(final TransformSpec transformSpec, final Map<String, ValueType> rowSignature)
|
||||
{
|
||||
for (final Transform transform : transformSpec.getTransforms()) {
|
||||
transforms.put(transform.getName(), transform.getRowFunction());
|
||||
|
@ -54,7 +55,7 @@ public class Transformer
|
|||
.makeMatcher(
|
||||
RowBasedColumnSelectorFactory.create(
|
||||
rowSupplierForValueMatcher,
|
||||
null
|
||||
rowSignature
|
||||
)
|
||||
);
|
||||
} else {
|
|
@ -17,7 +17,7 @@
|
|||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.segment.indexing;
|
||||
package io.druid.segment.transform;
|
||||
|
||||
import io.druid.data.input.InputRow;
|
||||
import io.druid.data.input.impl.InputRowParser;
|
|
@ -17,7 +17,7 @@
|
|||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.segment.indexing;
|
||||
package io.druid.segment.transform;
|
||||
|
||||
import io.druid.data.input.InputRow;
|
||||
import io.druid.data.input.impl.ParseSpec;
|
|
@ -3878,7 +3878,8 @@ public class GroupByQueryRunnerTest
|
|||
),
|
||||
new SelectorDimFilter("__time", String.valueOf(DateTimes.of("2011-04-01").getMillis()), null)
|
||||
)
|
||||
)
|
||||
),
|
||||
null
|
||||
);
|
||||
|
||||
GroupByQuery.Builder builder = GroupByQuery
|
||||
|
@ -3924,7 +3925,8 @@ public class GroupByQueryRunnerTest
|
|||
new BoundDimFilter("rows", "12", null, true, false, null, extractionFn2, StringComparators.NUMERIC),
|
||||
new SelectorDimFilter("idx", "super-217", extractionFn)
|
||||
)
|
||||
)
|
||||
),
|
||||
null
|
||||
);
|
||||
|
||||
GroupByQuery.Builder builder = GroupByQuery
|
||||
|
@ -7955,7 +7957,8 @@ public class GroupByQueryRunnerTest
|
|||
StringComparators.NUMERIC
|
||||
)
|
||||
)
|
||||
)
|
||||
),
|
||||
null
|
||||
)
|
||||
)
|
||||
.setGranularity(QueryRunnerTestHelper.allGran)
|
||||
|
|
|
@ -48,7 +48,7 @@ public class DimFilterHavingSpecTest
|
|||
@Test
|
||||
public void testSimple()
|
||||
{
|
||||
final DimFilterHavingSpec havingSpec = new DimFilterHavingSpec(new SelectorDimFilter("foo", "bar", null));
|
||||
final DimFilterHavingSpec havingSpec = new DimFilterHavingSpec(new SelectorDimFilter("foo", "bar", null), null);
|
||||
havingSpec.setRowSignature(null);
|
||||
|
||||
Assert.assertTrue(havingSpec.eval(new MapBasedRow(0, ImmutableMap.<String, Object>of("foo", "bar"))));
|
||||
|
@ -58,7 +58,7 @@ public class DimFilterHavingSpecTest
|
|||
@Test
|
||||
public void testRowSignature()
|
||||
{
|
||||
final DimFilterHavingSpec havingSpec = new DimFilterHavingSpec(new SelectorDimFilter("foo", "1", null));
|
||||
final DimFilterHavingSpec havingSpec = new DimFilterHavingSpec(new SelectorDimFilter("foo", "1", null), null);
|
||||
havingSpec.setRowSignature(ImmutableMap.of("foo", ValueType.LONG));
|
||||
|
||||
Assert.assertTrue(havingSpec.eval(new MapBasedRow(0, ImmutableMap.<String, Object>of("foo", 1L))));
|
||||
|
@ -70,7 +70,7 @@ public class DimFilterHavingSpecTest
|
|||
public void testConcurrentUsage() throws Exception
|
||||
{
|
||||
final ExecutorService exec = Executors.newFixedThreadPool(2);
|
||||
final DimFilterHavingSpec havingSpec = new DimFilterHavingSpec(new SelectorDimFilter("foo", "1", null));
|
||||
final DimFilterHavingSpec havingSpec = new DimFilterHavingSpec(new SelectorDimFilter("foo", "1", null), null);
|
||||
final List<Future<?>> futures = new ArrayList<>();
|
||||
|
||||
for (int i = 0; i < 2; i++) {
|
||||
|
@ -114,7 +114,7 @@ public class DimFilterHavingSpecTest
|
|||
@Test
|
||||
public void testSerde() throws Exception
|
||||
{
|
||||
final DimFilterHavingSpec havingSpec = new DimFilterHavingSpec(new SelectorDimFilter("foo", "1", null));
|
||||
final DimFilterHavingSpec havingSpec = new DimFilterHavingSpec(new SelectorDimFilter("foo", "1", null), false);
|
||||
final ObjectMapper objectMapper = new DefaultObjectMapper();
|
||||
Assert.assertEquals(
|
||||
havingSpec,
|
||||
|
|
|
@ -34,6 +34,7 @@ import io.druid.java.util.common.logger.Logger;
|
|||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import io.druid.segment.indexing.granularity.GranularitySpec;
|
||||
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
|
||||
import io.druid.segment.transform.TransformSpec;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.HashSet;
|
||||
|
|
|
@ -43,8 +43,8 @@ import io.druid.segment.VirtualColumns;
|
|||
import io.druid.segment.column.Column;
|
||||
import io.druid.segment.data.IndexedInts;
|
||||
import io.druid.segment.filter.Filters;
|
||||
import io.druid.segment.indexing.TransformSpec;
|
||||
import io.druid.segment.indexing.Transformer;
|
||||
import io.druid.segment.transform.TransformSpec;
|
||||
import io.druid.segment.transform.Transformer;
|
||||
import io.druid.utils.Runnables;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
|
|
|
@ -41,6 +41,8 @@ import io.druid.query.expression.TestExprMacroTable;
|
|||
import io.druid.query.filter.SelectorDimFilter;
|
||||
import io.druid.segment.TestHelper;
|
||||
import io.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
|
||||
import io.druid.segment.transform.ExpressionTransform;
|
||||
import io.druid.segment.transform.TransformSpec;
|
||||
import org.hamcrest.CoreMatchers;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Rule;
|
||||
|
|
|
@ -33,6 +33,8 @@ import io.druid.query.expression.TestExprMacroTable;
|
|||
import io.druid.query.filter.AndDimFilter;
|
||||
import io.druid.query.filter.SelectorDimFilter;
|
||||
import io.druid.segment.TestHelper;
|
||||
import io.druid.segment.transform.ExpressionTransform;
|
||||
import io.druid.segment.transform.TransformSpec;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
|
|
|
@ -47,7 +47,7 @@ import io.druid.segment.TestHelper;
|
|||
import io.druid.segment.incremental.IncrementalIndex;
|
||||
import io.druid.segment.incremental.IncrementalIndexSchema;
|
||||
import io.druid.segment.incremental.IncrementalIndexStorageAdapter;
|
||||
import io.druid.segment.indexing.TransformSpec;
|
||||
import io.druid.segment.transform.TransformSpec;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
|
|
|
@ -804,7 +804,7 @@ public class DruidQuery
|
|||
grouping.getDimensionSpecs(),
|
||||
grouping.getAggregatorFactories(),
|
||||
grouping.getPostAggregators(),
|
||||
grouping.getHavingFilter() != null ? new DimFilterHavingSpec(grouping.getHavingFilter()) : null,
|
||||
grouping.getHavingFilter() != null ? new DimFilterHavingSpec(grouping.getHavingFilter(), true) : null,
|
||||
limitSpec,
|
||||
ImmutableSortedMap.copyOf(plannerContext.getQueryContext())
|
||||
);
|
||||
|
|
|
@ -1058,7 +1058,7 @@ public class CalciteQueryTest
|
|||
.setInterval(QSS(Filtration.eternity()))
|
||||
.setGranularity(Granularities.ALL)
|
||||
.setAggregatorSpecs(AGGS(new DoubleSumAggregatorFactory("a0", "m1")))
|
||||
.setHavingSpec(new DimFilterHavingSpec(NUMERIC_SELECTOR("a0", "21", null)))
|
||||
.setHavingSpec(HAVING(NUMERIC_SELECTOR("a0", "21", null)))
|
||||
.setContext(QUERY_CONTEXT_DEFAULT)
|
||||
.build()
|
||||
),
|
||||
|
@ -1081,7 +1081,7 @@ public class CalciteQueryTest
|
|||
.setDimensions(DIMS(new DefaultDimensionSpec("dim1", "d0")))
|
||||
.setAggregatorSpecs(AGGS(new DoubleSumAggregatorFactory("a0", "m1")))
|
||||
.setHavingSpec(
|
||||
new DimFilterHavingSpec(
|
||||
HAVING(
|
||||
new BoundDimFilter(
|
||||
"a0",
|
||||
"1",
|
||||
|
@ -1107,6 +1107,105 @@ public class CalciteQueryTest
|
|||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHavingOnApproximateCountDistinct() throws Exception
|
||||
{
|
||||
testQuery(
|
||||
"SELECT dim2, COUNT(DISTINCT m1) FROM druid.foo GROUP BY dim2 HAVING COUNT(DISTINCT m1) > 1",
|
||||
ImmutableList.of(
|
||||
GroupByQuery.builder()
|
||||
.setDataSource(CalciteTests.DATASOURCE1)
|
||||
.setInterval(QSS(Filtration.eternity()))
|
||||
.setGranularity(Granularities.ALL)
|
||||
.setDimensions(DIMS(new DefaultDimensionSpec("dim2", "d0")))
|
||||
.setAggregatorSpecs(
|
||||
AGGS(
|
||||
new CardinalityAggregatorFactory(
|
||||
"a0",
|
||||
null,
|
||||
ImmutableList.of(
|
||||
new DefaultDimensionSpec("m1", "m1", ValueType.FLOAT)
|
||||
),
|
||||
false,
|
||||
true
|
||||
)
|
||||
)
|
||||
)
|
||||
.setHavingSpec(
|
||||
HAVING(
|
||||
BOUND(
|
||||
"a0",
|
||||
"1",
|
||||
null,
|
||||
true,
|
||||
false,
|
||||
null,
|
||||
StringComparators.NUMERIC
|
||||
)
|
||||
)
|
||||
)
|
||||
.setContext(QUERY_CONTEXT_DEFAULT)
|
||||
.build()
|
||||
),
|
||||
ImmutableList.of(
|
||||
new Object[]{"", 3L},
|
||||
new Object[]{"a", 2L}
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHavingOnExactCountDistinct() throws Exception
|
||||
{
|
||||
testQuery(
|
||||
PLANNER_CONFIG_NO_HLL,
|
||||
"SELECT dim2, COUNT(DISTINCT m1) FROM druid.foo GROUP BY dim2 HAVING COUNT(DISTINCT m1) > 1",
|
||||
CalciteTests.REGULAR_USER_AUTH_RESULT,
|
||||
ImmutableList.of(
|
||||
GroupByQuery.builder()
|
||||
.setDataSource(
|
||||
new QueryDataSource(
|
||||
GroupByQuery.builder()
|
||||
.setDataSource(CalciteTests.DATASOURCE1)
|
||||
.setInterval(QSS(Filtration.eternity()))
|
||||
.setGranularity(Granularities.ALL)
|
||||
.setDimensions(
|
||||
DIMS(
|
||||
new DefaultDimensionSpec("dim2", "d0", ValueType.STRING),
|
||||
new DefaultDimensionSpec("m1", "d1", ValueType.FLOAT)
|
||||
)
|
||||
)
|
||||
.setContext(QUERY_CONTEXT_DEFAULT)
|
||||
.build()
|
||||
)
|
||||
)
|
||||
.setInterval(QSS(Filtration.eternity()))
|
||||
.setGranularity(Granularities.ALL)
|
||||
.setDimensions(DIMS(new DefaultDimensionSpec("d0", "_d0", ValueType.STRING)))
|
||||
.setAggregatorSpecs(AGGS(new CountAggregatorFactory("a0")))
|
||||
.setHavingSpec(
|
||||
HAVING(
|
||||
BOUND(
|
||||
"a0",
|
||||
"1",
|
||||
null,
|
||||
true,
|
||||
false,
|
||||
null,
|
||||
StringComparators.NUMERIC
|
||||
)
|
||||
)
|
||||
)
|
||||
.setContext(QUERY_CONTEXT_DEFAULT)
|
||||
.build()
|
||||
),
|
||||
ImmutableList.of(
|
||||
new Object[]{"", 3L},
|
||||
new Object[]{"a", 2L}
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHavingOnFloatSum() throws Exception
|
||||
{
|
||||
|
@ -1122,7 +1221,7 @@ public class CalciteQueryTest
|
|||
.setDimensions(DIMS(new DefaultDimensionSpec("dim1", "d0")))
|
||||
.setAggregatorSpecs(AGGS(new DoubleSumAggregatorFactory("a0", "m1")))
|
||||
.setHavingSpec(
|
||||
new DimFilterHavingSpec(
|
||||
HAVING(
|
||||
new BoundDimFilter(
|
||||
"a0",
|
||||
"1",
|
||||
|
@ -1202,7 +1301,7 @@ public class CalciteQueryTest
|
|||
.setPostAggregatorSpecs(ImmutableList.of(
|
||||
EXPRESSION_POST_AGG("p0", "(\"a0\" / \"a1\")")
|
||||
))
|
||||
.setHavingSpec(new DimFilterHavingSpec(EXPRESSION_FILTER("((\"a0\" / \"a1\") == 1)")))
|
||||
.setHavingSpec(HAVING(EXPRESSION_FILTER("((\"a0\" / \"a1\") == 1)")))
|
||||
.setContext(QUERY_CONTEXT_DEFAULT)
|
||||
.build()
|
||||
),
|
||||
|
@ -4437,7 +4536,7 @@ public class CalciteQueryTest
|
|||
4
|
||||
)
|
||||
)
|
||||
.setHavingSpec(new DimFilterHavingSpec(NUMERIC_SELECTOR("a0", "1", null)))
|
||||
.setHavingSpec(HAVING(NUMERIC_SELECTOR("a0", "1", null)))
|
||||
.setContext(QUERY_CONTEXT_DEFAULT)
|
||||
.build()
|
||||
),
|
||||
|
@ -5847,7 +5946,7 @@ public class CalciteQueryTest
|
|||
new DefaultDimensionSpec("dim2", "d1")
|
||||
))
|
||||
.setAggregatorSpecs(AGGS(new CountAggregatorFactory("a0")))
|
||||
.setHavingSpec(new DimFilterHavingSpec(NUMERIC_SELECTOR("a0", "1", null)))
|
||||
.setHavingSpec(HAVING(NUMERIC_SELECTOR("a0", "1", null)))
|
||||
.setContext(QUERY_CONTEXT_DEFAULT)
|
||||
.build(),
|
||||
newScanQueryBuilder()
|
||||
|
@ -6223,6 +6322,11 @@ public class CalciteQueryTest
|
|||
return Arrays.asList(aggregators);
|
||||
}
|
||||
|
||||
private static DimFilterHavingSpec HAVING(final DimFilter filter)
|
||||
{
|
||||
return new DimFilterHavingSpec(filter, true);
|
||||
}
|
||||
|
||||
private static ExpressionVirtualColumn EXPRESSION_VIRTUAL_COLUMN(
|
||||
final String name,
|
||||
final String expression,
|
||||
|
|
Loading…
Reference in New Issue