Add virtual columns to timeseries, topN, and groupBy. (#3941)

* Add virtual columns to timeseries, topN, and groupBy.

* Fix GroupByTimeseriesQueryRunnerTest.

* Updates from review comments.
This commit is contained in:
Gian Merlino 2017-02-22 13:16:48 -08:00 committed by Fangjin Yang
parent 7200dce112
commit 372b84991c
27 changed files with 478 additions and 186 deletions

View File

@ -331,18 +331,20 @@ public class Druids
{
private DataSource dataSource;
private QuerySegmentSpec querySegmentSpec;
private boolean descending;
private VirtualColumns virtualColumns;
private DimFilter dimFilter;
private QueryGranularity granularity;
private List<AggregatorFactory> aggregatorSpecs;
private List<PostAggregator> postAggregatorSpecs;
private Map<String, Object> context;
private boolean descending;
private TimeseriesQueryBuilder()
{
dataSource = null;
querySegmentSpec = null;
descending = false;
virtualColumns = null;
dimFilter = null;
granularity = QueryGranularities.ALL;
aggregatorSpecs = Lists.newArrayList();
@ -356,6 +358,7 @@ public class Druids
dataSource,
querySegmentSpec,
descending,
virtualColumns,
dimFilter,
granularity,
aggregatorSpecs,
@ -460,6 +463,22 @@ public class Druids
return this;
}
public TimeseriesQueryBuilder virtualColumns(VirtualColumns virtualColumns)
{
this.virtualColumns = virtualColumns;
return this;
}
public TimeseriesQueryBuilder virtualColumns(List<VirtualColumn> virtualColumns)
{
return virtualColumns(VirtualColumns.create(virtualColumns));
}
public TimeseriesQueryBuilder virtualColumns(VirtualColumn... virtualColumns)
{
return virtualColumns(VirtualColumns.create(Arrays.asList(virtualColumns)));
}
public TimeseriesQueryBuilder filters(String dimensionName, String value)
{
dimFilter = new SelectorDimFilter(dimensionName, value, null);

View File

@ -56,11 +56,15 @@ import io.druid.query.groupby.orderby.NoopLimitSpec;
import io.druid.query.groupby.orderby.OrderByColumnSpec;
import io.druid.query.spec.LegacySegmentSpec;
import io.druid.query.spec.QuerySegmentSpec;
import io.druid.segment.VirtualColumn;
import io.druid.segment.VirtualColumns;
import org.joda.time.Interval;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
/**
@ -88,6 +92,7 @@ public class GroupByQuery extends BaseQuery<Row>
return new Builder();
}
private final VirtualColumns virtualColumns;
private final LimitSpec limitSpec;
private final HavingSpec havingSpec;
private final DimFilter dimFilter;
@ -102,6 +107,7 @@ public class GroupByQuery extends BaseQuery<Row>
public GroupByQuery(
@JsonProperty("dataSource") DataSource dataSource,
@JsonProperty("intervals") QuerySegmentSpec querySegmentSpec,
@JsonProperty("virtualColumns") VirtualColumns virtualColumns,
@JsonProperty("filter") DimFilter dimFilter,
@JsonProperty("granularity") QueryGranularity granularity,
@JsonProperty("dimensions") List<DimensionSpec> dimensions,
@ -113,6 +119,7 @@ public class GroupByQuery extends BaseQuery<Row>
)
{
super(dataSource, querySegmentSpec, false, context);
this.virtualColumns = VirtualColumns.nullToEmpty(virtualColumns);
this.dimFilter = dimFilter;
this.granularity = granularity;
this.dimensions = dimensions == null ? ImmutableList.<DimensionSpec>of() : dimensions;
@ -173,6 +180,7 @@ public class GroupByQuery extends BaseQuery<Row>
private GroupByQuery(
DataSource dataSource,
QuerySegmentSpec querySegmentSpec,
VirtualColumns virtualColumns,
DimFilter dimFilter,
QueryGranularity granularity,
List<DimensionSpec> dimensions,
@ -186,6 +194,7 @@ public class GroupByQuery extends BaseQuery<Row>
{
super(dataSource, querySegmentSpec, false, context);
this.virtualColumns = virtualColumns;
this.dimFilter = dimFilter;
this.granularity = granularity;
this.dimensions = dimensions;
@ -196,6 +205,12 @@ public class GroupByQuery extends BaseQuery<Row>
this.limitFn = limitFn;
}
@JsonProperty
public VirtualColumns getVirtualColumns()
{
return virtualColumns;
}
@JsonProperty("filter")
public DimFilter getDimFilter()
{
@ -393,6 +408,7 @@ public class GroupByQuery extends BaseQuery<Row>
return new GroupByQuery(
getDataSource(),
getQuerySegmentSpec(),
virtualColumns,
dimFilter,
granularity,
dimensions,
@ -411,6 +427,7 @@ public class GroupByQuery extends BaseQuery<Row>
return new GroupByQuery(
getDataSource(),
spec,
virtualColumns,
dimFilter,
granularity,
dimensions,
@ -428,6 +445,7 @@ public class GroupByQuery extends BaseQuery<Row>
return new GroupByQuery(
getDataSource(),
getQuerySegmentSpec(),
virtualColumns,
dimFilter,
getGranularity(),
getDimensions(),
@ -446,6 +464,7 @@ public class GroupByQuery extends BaseQuery<Row>
return new GroupByQuery(
dataSource,
getQuerySegmentSpec(),
virtualColumns,
dimFilter,
granularity,
dimensions,
@ -463,6 +482,7 @@ public class GroupByQuery extends BaseQuery<Row>
return new GroupByQuery(
getDataSource(),
getQuerySegmentSpec(),
virtualColumns,
getDimFilter(),
getGranularity(),
dimensionSpecs,
@ -480,6 +500,7 @@ public class GroupByQuery extends BaseQuery<Row>
return new GroupByQuery(
getDataSource(),
getQuerySegmentSpec(),
virtualColumns,
getDimFilter(),
getGranularity(),
getDimensions(),
@ -496,6 +517,7 @@ public class GroupByQuery extends BaseQuery<Row>
return new GroupByQuery(
getDataSource(),
getQuerySegmentSpec(),
virtualColumns,
getDimFilter(),
getGranularity(),
getDimensions(),
@ -513,6 +535,7 @@ public class GroupByQuery extends BaseQuery<Row>
return new GroupByQuery(
getDataSource(),
getQuerySegmentSpec(),
virtualColumns,
getDimFilter(),
getGranularity(),
getDimensions(),
@ -555,6 +578,7 @@ public class GroupByQuery extends BaseQuery<Row>
{
private DataSource dataSource;
private QuerySegmentSpec querySegmentSpec;
private VirtualColumns virtualColumns;
private DimFilter dimFilter;
private QueryGranularity granularity;
private List<DimensionSpec> dimensions;
@ -576,6 +600,7 @@ public class GroupByQuery extends BaseQuery<Row>
{
dataSource = query.getDataSource();
querySegmentSpec = query.getQuerySegmentSpec();
virtualColumns = query.getVirtualColumns();
limitSpec = query.getLimitSpec();
dimFilter = query.getDimFilter();
granularity = query.getGranularity();
@ -590,6 +615,7 @@ public class GroupByQuery extends BaseQuery<Row>
{
dataSource = builder.dataSource;
querySegmentSpec = builder.querySegmentSpec;
virtualColumns = builder.virtualColumns;
limitSpec = builder.limitSpec;
dimFilter = builder.dimFilter;
granularity = builder.granularity;
@ -640,6 +666,24 @@ public class GroupByQuery extends BaseQuery<Row>
return setQuerySegmentSpec(new LegacySegmentSpec(interval));
}
public Builder setVirtualColumns(VirtualColumns virtualColumns)
{
this.virtualColumns = Preconditions.checkNotNull(virtualColumns, "virtualColumns");
return this;
}
public Builder setVirtualColumns(List<VirtualColumn> virtualColumns)
{
this.virtualColumns = VirtualColumns.create(virtualColumns);
return this;
}
public Builder setVirtualColumns(VirtualColumn... virtualColumns)
{
this.virtualColumns = VirtualColumns.create(Arrays.asList(virtualColumns));
return this;
}
public Builder limit(int limit)
{
ensureExplicitLimitNotSet();
@ -802,6 +846,7 @@ public class GroupByQuery extends BaseQuery<Row>
return new GroupByQuery(
dataSource,
querySegmentSpec,
virtualColumns,
dimFilter,
granularity,
dimensions,
@ -820,6 +865,7 @@ public class GroupByQuery extends BaseQuery<Row>
return "GroupByQuery{" +
"dataSource='" + getDataSource() + '\'' +
", querySegmentSpec=" + getQuerySegmentSpec() +
", virtualColumns=" + virtualColumns +
", limitSpec=" + limitSpec +
", dimFilter=" + dimFilter +
", granularity=" + granularity +
@ -831,7 +877,7 @@ public class GroupByQuery extends BaseQuery<Row>
}
@Override
public boolean equals(Object o)
public boolean equals(final Object o)
{
if (this == o) {
return true;
@ -842,47 +888,30 @@ public class GroupByQuery extends BaseQuery<Row>
if (!super.equals(o)) {
return false;
}
GroupByQuery that = (GroupByQuery) o;
if (aggregatorSpecs != null ? !aggregatorSpecs.equals(that.aggregatorSpecs) : that.aggregatorSpecs != null) {
return false;
}
if (dimFilter != null ? !dimFilter.equals(that.dimFilter) : that.dimFilter != null) {
return false;
}
if (dimensions != null ? !dimensions.equals(that.dimensions) : that.dimensions != null) {
return false;
}
if (granularity != null ? !granularity.equals(that.granularity) : that.granularity != null) {
return false;
}
if (havingSpec != null ? !havingSpec.equals(that.havingSpec) : that.havingSpec != null) {
return false;
}
if (limitSpec != null ? !limitSpec.equals(that.limitSpec) : that.limitSpec != null) {
return false;
}
if (postAggregatorSpecs != null
? !postAggregatorSpecs.equals(that.postAggregatorSpecs)
: that.postAggregatorSpecs != null) {
return false;
}
return true;
final GroupByQuery that = (GroupByQuery) o;
return Objects.equals(virtualColumns, that.virtualColumns) &&
Objects.equals(limitSpec, that.limitSpec) &&
Objects.equals(havingSpec, that.havingSpec) &&
Objects.equals(dimFilter, that.dimFilter) &&
Objects.equals(granularity, that.granularity) &&
Objects.equals(dimensions, that.dimensions) &&
Objects.equals(aggregatorSpecs, that.aggregatorSpecs) &&
Objects.equals(postAggregatorSpecs, that.postAggregatorSpecs);
}
@Override
public int hashCode()
{
int result = super.hashCode();
result = 31 * result + (limitSpec != null ? limitSpec.hashCode() : 0);
result = 31 * result + (havingSpec != null ? havingSpec.hashCode() : 0);
result = 31 * result + (dimFilter != null ? dimFilter.hashCode() : 0);
result = 31 * result + (granularity != null ? granularity.hashCode() : 0);
result = 31 * result + (dimensions != null ? dimensions.hashCode() : 0);
result = 31 * result + (aggregatorSpecs != null ? aggregatorSpecs.hashCode() : 0);
result = 31 * result + (postAggregatorSpecs != null ? postAggregatorSpecs.hashCode() : 0);
return result;
return Objects.hash(
super.hashCode(),
virtualColumns,
limitSpec,
havingSpec,
dimFilter,
granularity,
dimensions,
aggregatorSpecs,
postAggregatorSpecs
);
}
}

View File

@ -48,7 +48,6 @@ import io.druid.query.filter.Filter;
import io.druid.segment.Cursor;
import io.druid.segment.DimensionSelector;
import io.druid.segment.StorageAdapter;
import io.druid.segment.VirtualColumns;
import io.druid.segment.column.ValueType;
import io.druid.segment.data.IndexedInts;
import io.druid.segment.filter.Filters;
@ -102,7 +101,7 @@ public class GroupByQueryEngine
final Sequence<Cursor> cursors = storageAdapter.makeCursors(
filter,
intervals.get(0),
VirtualColumns.EMPTY,
query.getVirtualColumns(),
query.getGranularity(),
false
);

View File

@ -384,6 +384,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
.appendCacheable(query.getDimFilter())
.appendCacheablesIgnoringOrder(query.getAggregatorSpecs())
.appendCacheablesIgnoringOrder(query.getDimensions())
.appendCacheable(query.getVirtualColumns())
.build();
}

View File

@ -52,7 +52,6 @@ import io.druid.segment.DimensionSelector;
import io.druid.segment.StorageAdapter;
import io.druid.segment.column.ColumnCapabilities;
import io.druid.segment.column.ValueType;
import io.druid.segment.VirtualColumns;
import io.druid.segment.filter.Filters;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@ -106,7 +105,7 @@ public class GroupByQueryEngineV2
final Sequence<Cursor> cursors = storageAdapter.makeCursors(
Filters.toFilter(query.getDimFilter()),
intervals.get(0),
VirtualColumns.EMPTY,
query.getVirtualColumns(),
query.getGranularity(),
false
);

View File

@ -99,9 +99,11 @@ public class RowBasedGrouperHelper
valueTypes
);
final ThreadLocal<Row> columnSelectorRow = new ThreadLocal<>();
final ColumnSelectorFactory columnSelectorFactory = RowBasedColumnSelectorFactory.create(
columnSelectorRow,
rawInputRowSignature
final ColumnSelectorFactory columnSelectorFactory = query.getVirtualColumns().wrap(
RowBasedColumnSelectorFactory.create(
columnSelectorRow,
rawInputRowSignature
)
);
final Grouper<RowBasedKey> grouper;
if (concurrencyHint == -1) {

View File

@ -105,6 +105,7 @@ public class GroupByStrategyV1 implements GroupByStrategy
new GroupByQuery(
query.getDataSource(),
query.getQuerySegmentSpec(),
query.getVirtualColumns(),
query.getDimFilter(),
query.getGranularity(),
query.getDimensions(),

View File

@ -210,6 +210,7 @@ public class GroupByStrategyV2 implements GroupByStrategy
new GroupByQuery(
query.getDataSource(),
query.getQuerySegmentSpec(),
query.getVirtualColumns(),
query.getDimFilter(),
query.getGranularity(),
query.getDimensions(),

View File

@ -67,7 +67,7 @@ public class SelectQuery extends BaseQuery<Result<SelectResultValue>>
this.dimFilter = dimFilter;
this.granularity = granularity;
this.dimensions = dimensions;
this.virtualColumns = virtualColumns == null ? VirtualColumns.EMPTY : virtualColumns;
this.virtualColumns = VirtualColumns.nullToEmpty(virtualColumns);
this.metrics = metrics;
this.pagingSpec = pagingSpec;

View File

@ -33,15 +33,18 @@ import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.filter.DimFilter;
import io.druid.query.spec.QuerySegmentSpec;
import io.druid.segment.VirtualColumns;
import java.util.List;
import java.util.Map;
import java.util.Objects;
/**
*/
@JsonTypeName("timeseries")
public class TimeseriesQuery extends BaseQuery<Result<TimeseriesResultValue>>
{
private final VirtualColumns virtualColumns;
private final DimFilter dimFilter;
private final QueryGranularity granularity;
private final List<AggregatorFactory> aggregatorSpecs;
@ -52,6 +55,7 @@ public class TimeseriesQuery extends BaseQuery<Result<TimeseriesResultValue>>
@JsonProperty("dataSource") DataSource dataSource,
@JsonProperty("intervals") QuerySegmentSpec querySegmentSpec,
@JsonProperty("descending") boolean descending,
@JsonProperty("virtualColumns") VirtualColumns virtualColumns,
@JsonProperty("filter") DimFilter dimFilter,
@JsonProperty("granularity") QueryGranularity granularity,
@JsonProperty("aggregations") List<AggregatorFactory> aggregatorSpecs,
@ -60,13 +64,15 @@ public class TimeseriesQuery extends BaseQuery<Result<TimeseriesResultValue>>
)
{
super(dataSource, querySegmentSpec, descending, context);
this.virtualColumns = VirtualColumns.nullToEmpty(virtualColumns);
this.dimFilter = dimFilter;
this.granularity = granularity;
this.aggregatorSpecs = aggregatorSpecs == null ? ImmutableList.<AggregatorFactory>of() : aggregatorSpecs;
this.postAggregatorSpecs = Queries.prepareAggregations(this.aggregatorSpecs,
postAggregatorSpecs == null
? ImmutableList.<PostAggregator>of()
: postAggregatorSpecs
this.postAggregatorSpecs = Queries.prepareAggregations(
this.aggregatorSpecs,
postAggregatorSpecs == null
? ImmutableList.<PostAggregator>of()
: postAggregatorSpecs
);
}
@ -88,6 +94,12 @@ public class TimeseriesQuery extends BaseQuery<Result<TimeseriesResultValue>>
return Query.TIMESERIES;
}
@JsonProperty
public VirtualColumns getVirtualColumns()
{
return virtualColumns;
}
@JsonProperty("filter")
public DimFilter getDimensionsFilter()
{
@ -123,6 +135,7 @@ public class TimeseriesQuery extends BaseQuery<Result<TimeseriesResultValue>>
getDataSource(),
querySegmentSpec,
isDescending(),
virtualColumns,
dimFilter,
granularity,
aggregatorSpecs,
@ -138,6 +151,7 @@ public class TimeseriesQuery extends BaseQuery<Result<TimeseriesResultValue>>
dataSource,
getQuerySegmentSpec(),
isDescending(),
virtualColumns,
dimFilter,
granularity,
aggregatorSpecs,
@ -152,6 +166,7 @@ public class TimeseriesQuery extends BaseQuery<Result<TimeseriesResultValue>>
getDataSource(),
getQuerySegmentSpec(),
isDescending(),
virtualColumns,
dimFilter,
granularity,
aggregatorSpecs,
@ -166,6 +181,7 @@ public class TimeseriesQuery extends BaseQuery<Result<TimeseriesResultValue>>
getDataSource(),
getQuerySegmentSpec(),
isDescending(),
virtualColumns,
dimFilter,
granularity,
aggregatorSpecs,
@ -181,6 +197,7 @@ public class TimeseriesQuery extends BaseQuery<Result<TimeseriesResultValue>>
"dataSource='" + getDataSource() + '\'' +
", querySegmentSpec=" + getQuerySegmentSpec() +
", descending=" + isDescending() +
", virtualColumns=" + virtualColumns +
", dimFilter=" + dimFilter +
", granularity='" + granularity + '\'' +
", aggregatorSpecs=" + aggregatorSpecs +
@ -190,7 +207,7 @@ public class TimeseriesQuery extends BaseQuery<Result<TimeseriesResultValue>>
}
@Override
public boolean equals(Object o)
public boolean equals(final Object o)
{
if (this == o) {
return true;
@ -201,35 +218,17 @@ public class TimeseriesQuery extends BaseQuery<Result<TimeseriesResultValue>>
if (!super.equals(o)) {
return false;
}
TimeseriesQuery that = (TimeseriesQuery) o;
if (aggregatorSpecs != null ? !aggregatorSpecs.equals(that.aggregatorSpecs) : that.aggregatorSpecs != null) {
return false;
}
if (dimFilter != null ? !dimFilter.equals(that.dimFilter) : that.dimFilter != null) {
return false;
}
if (granularity != null ? !granularity.equals(that.granularity) : that.granularity != null) {
return false;
}
if (postAggregatorSpecs != null
? !postAggregatorSpecs.equals(that.postAggregatorSpecs)
: that.postAggregatorSpecs != null) {
return false;
}
return true;
final TimeseriesQuery that = (TimeseriesQuery) o;
return Objects.equals(virtualColumns, that.virtualColumns) &&
Objects.equals(dimFilter, that.dimFilter) &&
Objects.equals(granularity, that.granularity) &&
Objects.equals(aggregatorSpecs, that.aggregatorSpecs) &&
Objects.equals(postAggregatorSpecs, that.postAggregatorSpecs);
}
@Override
public int hashCode()
{
int result = super.hashCode();
result = 31 * result + (dimFilter != null ? dimFilter.hashCode() : 0);
result = 31 * result + (granularity != null ? granularity.hashCode() : 0);
result = 31 * result + (aggregatorSpecs != null ? aggregatorSpecs.hashCode() : 0);
result = 31 * result + (postAggregatorSpecs != null ? postAggregatorSpecs.hashCode() : 0);
return result;
return Objects.hash(super.hashCode(), virtualColumns, dimFilter, granularity, aggregatorSpecs, postAggregatorSpecs);
}
}

View File

@ -29,7 +29,6 @@ import io.druid.query.filter.Filter;
import io.druid.segment.Cursor;
import io.druid.segment.SegmentMissingException;
import io.druid.segment.StorageAdapter;
import io.druid.segment.VirtualColumns;
import io.druid.segment.filter.Filters;
import java.util.List;
@ -52,7 +51,7 @@ public class TimeseriesQueryEngine
adapter,
query.getQuerySegmentSpec().getIntervals(),
filter,
VirtualColumns.EMPTY,
query.getVirtualColumns(),
query.isDescending(),
query.getGranularity(),
new Function<Cursor, Result<TimeseriesResultValue>>()

View File

@ -142,6 +142,7 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<Timeser
.appendCacheable(query.getGranularity())
.appendCacheable(query.getDimensionsFilter())
.appendCacheablesIgnoringOrder(query.getAggregatorSpecs())
.appendCacheable(query.getVirtualColumns())
.build();
}

View File

@ -81,10 +81,10 @@ public class AggregateTopNMetricFirstAlgorithm implements TopNAlgorithm<int[], T
throw new ISE("WTF! Can't find the metric to do topN over?");
}
// Run topN for only a single metric
TopNQuery singleMetricQuery = new TopNQueryBuilder().copy(query)
.aggregators(condensedAggPostAggPair.lhs)
.postAggregators(condensedAggPostAggPair.rhs)
.build();
TopNQuery singleMetricQuery = new TopNQueryBuilder(query)
.aggregators(condensedAggPostAggPair.lhs)
.postAggregators(condensedAggPostAggPair.rhs)
.build();
final TopNResultBuilder singleMetricResultBuilder = BaseTopNAlgorithm.makeResultBuilder(params, singleMetricQuery);
PooledTopNAlgorithm singleMetricAlgo = new PooledTopNAlgorithm(capabilities, singleMetricQuery, bufferPool);

View File

@ -34,9 +34,11 @@ import io.druid.query.aggregation.PostAggregator;
import io.druid.query.dimension.DimensionSpec;
import io.druid.query.filter.DimFilter;
import io.druid.query.spec.QuerySegmentSpec;
import io.druid.segment.VirtualColumns;
import java.util.List;
import java.util.Map;
import java.util.Objects;
/**
*/
@ -44,6 +46,7 @@ public class TopNQuery extends BaseQuery<Result<TopNResultValue>>
{
public static final String TOPN = "topN";
private final VirtualColumns virtualColumns;
private final DimensionSpec dimensionSpec;
private final TopNMetricSpec topNMetricSpec;
private final int threshold;
@ -55,6 +58,7 @@ public class TopNQuery extends BaseQuery<Result<TopNResultValue>>
@JsonCreator
public TopNQuery(
@JsonProperty("dataSource") DataSource dataSource,
@JsonProperty("virtualColumns") VirtualColumns virtualColumns,
@JsonProperty("dimension") DimensionSpec dimensionSpec,
@JsonProperty("metric") TopNMetricSpec topNMetricSpec,
@JsonProperty("threshold") int threshold,
@ -67,6 +71,7 @@ public class TopNQuery extends BaseQuery<Result<TopNResultValue>>
)
{
super(dataSource, querySegmentSpec, false, context);
this.virtualColumns = VirtualColumns.nullToEmpty(virtualColumns);
this.dimensionSpec = dimensionSpec;
this.topNMetricSpec = topNMetricSpec;
this.threshold = threshold;
@ -74,10 +79,11 @@ public class TopNQuery extends BaseQuery<Result<TopNResultValue>>
this.dimFilter = dimFilter;
this.granularity = granularity;
this.aggregatorSpecs = aggregatorSpecs == null ? ImmutableList.<AggregatorFactory>of() : aggregatorSpecs;
this.postAggregatorSpecs = Queries.prepareAggregations(this.aggregatorSpecs,
postAggregatorSpecs == null
? ImmutableList.<PostAggregator>of()
: postAggregatorSpecs
this.postAggregatorSpecs = Queries.prepareAggregations(
this.aggregatorSpecs,
postAggregatorSpecs == null
? ImmutableList.<PostAggregator>of()
: postAggregatorSpecs
);
Preconditions.checkNotNull(dimensionSpec, "dimensionSpec can't be null");
@ -105,6 +111,12 @@ public class TopNQuery extends BaseQuery<Result<TopNResultValue>>
return TOPN;
}
@JsonProperty
public VirtualColumns getVirtualColumns()
{
return virtualColumns;
}
@JsonProperty("dimension")
public DimensionSpec getDimensionSpec()
{
@ -159,6 +171,7 @@ public class TopNQuery extends BaseQuery<Result<TopNResultValue>>
{
return new TopNQuery(
getDataSource(),
virtualColumns,
dimensionSpec,
topNMetricSpec,
threshold,
@ -175,6 +188,7 @@ public class TopNQuery extends BaseQuery<Result<TopNResultValue>>
{
return new TopNQuery(
getDataSource(),
virtualColumns,
spec,
topNMetricSpec,
threshold,
@ -191,6 +205,7 @@ public class TopNQuery extends BaseQuery<Result<TopNResultValue>>
{
return new TopNQuery(
getDataSource(),
virtualColumns,
getDimensionSpec(),
topNMetricSpec,
threshold,
@ -207,6 +222,7 @@ public class TopNQuery extends BaseQuery<Result<TopNResultValue>>
{
return new TopNQuery(
getDataSource(),
virtualColumns,
getDimensionSpec(),
topNMetricSpec,
threshold,
@ -224,6 +240,7 @@ public class TopNQuery extends BaseQuery<Result<TopNResultValue>>
{
return new TopNQuery(
dataSource,
virtualColumns,
dimensionSpec,
topNMetricSpec,
threshold,
@ -240,6 +257,7 @@ public class TopNQuery extends BaseQuery<Result<TopNResultValue>>
{
return new TopNQuery(
getDataSource(),
virtualColumns,
dimensionSpec,
topNMetricSpec,
threshold,
@ -256,6 +274,7 @@ public class TopNQuery extends BaseQuery<Result<TopNResultValue>>
{
return new TopNQuery(
getDataSource(),
virtualColumns,
dimensionSpec,
topNMetricSpec,
threshold,
@ -272,6 +291,7 @@ public class TopNQuery extends BaseQuery<Result<TopNResultValue>>
{
return new TopNQuery(
getDataSource(),
virtualColumns,
getDimensionSpec(),
topNMetricSpec,
threshold,
@ -293,6 +313,7 @@ public class TopNQuery extends BaseQuery<Result<TopNResultValue>>
", topNMetricSpec=" + topNMetricSpec +
", threshold=" + threshold +
", querySegmentSpec=" + getQuerySegmentSpec() +
", virtualColumns=" + virtualColumns +
", dimFilter=" + dimFilter +
", granularity='" + granularity + '\'' +
", aggregatorSpecs=" + aggregatorSpecs +
@ -301,7 +322,7 @@ public class TopNQuery extends BaseQuery<Result<TopNResultValue>>
}
@Override
public boolean equals(Object o)
public boolean equals(final Object o)
{
if (this == o) {
return true;
@ -312,49 +333,30 @@ public class TopNQuery extends BaseQuery<Result<TopNResultValue>>
if (!super.equals(o)) {
return false;
}
TopNQuery topNQuery = (TopNQuery) o;
if (threshold != topNQuery.threshold) {
return false;
}
if (aggregatorSpecs != null
? !aggregatorSpecs.equals(topNQuery.aggregatorSpecs)
: topNQuery.aggregatorSpecs != null) {
return false;
}
if (dimFilter != null ? !dimFilter.equals(topNQuery.dimFilter) : topNQuery.dimFilter != null) {
return false;
}
if (dimensionSpec != null ? !dimensionSpec.equals(topNQuery.dimensionSpec) : topNQuery.dimensionSpec != null) {
return false;
}
if (granularity != null ? !granularity.equals(topNQuery.granularity) : topNQuery.granularity != null) {
return false;
}
if (postAggregatorSpecs != null
? !postAggregatorSpecs.equals(topNQuery.postAggregatorSpecs)
: topNQuery.postAggregatorSpecs != null) {
return false;
}
if (topNMetricSpec != null ? !topNMetricSpec.equals(topNQuery.topNMetricSpec) : topNQuery.topNMetricSpec != null) {
return false;
}
return true;
final TopNQuery topNQuery = (TopNQuery) o;
return threshold == topNQuery.threshold &&
Objects.equals(virtualColumns, topNQuery.virtualColumns) &&
Objects.equals(dimensionSpec, topNQuery.dimensionSpec) &&
Objects.equals(topNMetricSpec, topNQuery.topNMetricSpec) &&
Objects.equals(dimFilter, topNQuery.dimFilter) &&
Objects.equals(granularity, topNQuery.granularity) &&
Objects.equals(aggregatorSpecs, topNQuery.aggregatorSpecs) &&
Objects.equals(postAggregatorSpecs, topNQuery.postAggregatorSpecs);
}
@Override
public int hashCode()
{
int result = super.hashCode();
result = 31 * result + (dimensionSpec != null ? dimensionSpec.hashCode() : 0);
result = 31 * result + (topNMetricSpec != null ? topNMetricSpec.hashCode() : 0);
result = 31 * result + threshold;
result = 31 * result + (dimFilter != null ? dimFilter.hashCode() : 0);
result = 31 * result + (granularity != null ? granularity.hashCode() : 0);
result = 31 * result + (aggregatorSpecs != null ? aggregatorSpecs.hashCode() : 0);
result = 31 * result + (postAggregatorSpecs != null ? postAggregatorSpecs.hashCode() : 0);
return result;
return Objects.hash(
super.hashCode(),
virtualColumns,
dimensionSpec,
topNMetricSpec,
threshold,
dimFilter,
granularity,
aggregatorSpecs,
postAggregatorSpecs
);
}
}

View File

@ -33,8 +33,11 @@ import io.druid.query.filter.InDimFilter;
import io.druid.query.filter.SelectorDimFilter;
import io.druid.query.spec.LegacySegmentSpec;
import io.druid.query.spec.QuerySegmentSpec;
import io.druid.segment.VirtualColumn;
import io.druid.segment.VirtualColumns;
import org.joda.time.Interval;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
@ -62,6 +65,7 @@ import java.util.Map;
public class TopNQueryBuilder
{
private DataSource dataSource;
private VirtualColumns virtualColumns;
private DimensionSpec dimensionSpec;
private TopNMetricSpec topNMetricSpec;
private int threshold;
@ -75,6 +79,7 @@ public class TopNQueryBuilder
public TopNQueryBuilder()
{
dataSource = null;
virtualColumns = null;
dimensionSpec = null;
topNMetricSpec = null;
threshold = 0;
@ -86,11 +91,31 @@ public class TopNQueryBuilder
context = null;
}
public TopNQueryBuilder(final TopNQuery query)
{
this.dataSource = query.getDataSource();
this.virtualColumns = query.getVirtualColumns();
this.dimensionSpec = query.getDimensionSpec();
this.topNMetricSpec = query.getTopNMetricSpec();
this.threshold = query.getThreshold();
this.querySegmentSpec = query.getQuerySegmentSpec();
this.dimFilter = query.getDimensionsFilter();
this.granularity = query.getGranularity();
this.aggregatorSpecs = query.getAggregatorSpecs();
this.postAggregatorSpecs = query.getPostAggregatorSpecs();
this.context = query.getContext();
}
public DataSource getDataSource()
{
return dataSource;
}
public VirtualColumns getVirtualColumns()
{
return virtualColumns;
}
public DimensionSpec getDimensionSpec()
{
return dimensionSpec;
@ -140,6 +165,7 @@ public class TopNQueryBuilder
{
return new TopNQuery(
dataSource,
virtualColumns,
dimensionSpec,
topNMetricSpec,
threshold,
@ -152,25 +178,18 @@ public class TopNQueryBuilder
);
}
@Deprecated
public TopNQueryBuilder copy(TopNQuery query)
{
return new TopNQueryBuilder()
.dataSource(query.getDataSource().toString())
.dimension(query.getDimensionSpec())
.metric(query.getTopNMetricSpec())
.threshold(query.getThreshold())
.intervals(query.getIntervals())
.filters(query.getDimensionsFilter())
.granularity(query.getGranularity())
.aggregators(query.getAggregatorSpecs())
.postAggregators(query.getPostAggregatorSpecs())
.context(query.getContext());
return new TopNQueryBuilder(query);
}
@Deprecated
public TopNQueryBuilder copy(TopNQueryBuilder builder)
{
return new TopNQueryBuilder()
.dataSource(builder.dataSource)
.virtualColumns(builder.virtualColumns)
.dimension(builder.dimensionSpec)
.metric(builder.topNMetricSpec)
.threshold(builder.threshold)
@ -188,6 +207,22 @@ public class TopNQueryBuilder
return this;
}
public TopNQueryBuilder virtualColumns(VirtualColumns virtualColumns)
{
this.virtualColumns = virtualColumns;
return this;
}
public TopNQueryBuilder virtualColumns(List<VirtualColumn> virtualColumns)
{
return virtualColumns(VirtualColumns.create(virtualColumns));
}
public TopNQueryBuilder virtualColumns(VirtualColumn... virtualColumns)
{
return virtualColumns(VirtualColumns.create(Arrays.asList(virtualColumns)));
}
public TopNQueryBuilder dataSource(DataSource d)
{
dataSource = d;

View File

@ -35,7 +35,6 @@ import io.druid.segment.Capabilities;
import io.druid.segment.Cursor;
import io.druid.segment.SegmentMissingException;
import io.druid.segment.StorageAdapter;
import io.druid.segment.VirtualColumns;
import io.druid.segment.column.Column;
import io.druid.segment.column.ColumnCapabilities;
import io.druid.segment.column.ValueType;
@ -77,7 +76,13 @@ public class TopNQueryEngine
return Sequences.filter(
Sequences.map(
adapter.makeCursors(filter, queryIntervals.get(0), VirtualColumns.EMPTY, granularity, query.isDescending()),
adapter.makeCursors(
filter,
queryIntervals.get(0),
query.getVirtualColumns(),
granularity,
query.isDescending()
),
new Function<Cursor, Result<TopNResultValue>>()
{
@Override
@ -106,7 +111,8 @@ public class TopNQueryEngine
final TopNAlgorithmSelector selector = new TopNAlgorithmSelector(cardinality, numBytesPerRecord);
query.initTopNAlgorithmSelector(selector);
final ColumnCapabilities columnCapabilities = adapter.getColumnCapabilities(dimension);
final ColumnCapabilities columnCapabilities = query.getVirtualColumns()
.getColumnCapabilitiesWithFallback(adapter, dimension);
final TopNAlgorithm topNAlgorithm;
if (

View File

@ -317,7 +317,8 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
.appendInt(query.getThreshold())
.appendCacheable(query.getGranularity())
.appendCacheable(query.getDimensionsFilter())
.appendCacheablesIgnoringOrder(query.getAggregatorSpecs());
.appendCacheablesIgnoringOrder(query.getAggregatorSpecs())
.appendCacheable(query.getVirtualColumns());
final List<PostAggregator> postAggregators = prunePostAggregators(query);
if (!postAggregators.isEmpty()) {

View File

@ -21,6 +21,7 @@ package io.druid.segment;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.druid.query.cache.Cacheable;
import io.druid.query.dimension.DimensionSpec;
import io.druid.segment.column.ColumnCapabilities;
import io.druid.segment.virtual.ExpressionVirtualColumn;
@ -39,7 +40,7 @@ import java.util.List;
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "expression", value = ExpressionVirtualColumn.class)
})
public interface VirtualColumn
public interface VirtualColumn extends Cacheable
{
/**
* Output name of this column.
@ -130,11 +131,4 @@ public interface VirtualColumn
* @return whether to use dot notation
*/
boolean usesDotNotation();
/**
* Returns cache key
*
* @return cache key
*/
byte[] getCacheKey();
}

View File

@ -26,15 +26,16 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.primitives.Ints;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.Pair;
import io.druid.query.cache.CacheKeyBuilder;
import io.druid.query.cache.Cacheable;
import io.druid.query.dimension.DimensionSpec;
import io.druid.segment.column.Column;
import io.druid.segment.column.ColumnCapabilities;
import io.druid.segment.virtual.VirtualizedColumnSelectorFactory;
import java.nio.ByteBuffer;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -42,7 +43,7 @@ import java.util.Set;
/**
* Class allowing lookup and usage of virtual columns.
*/
public class VirtualColumns
public class VirtualColumns implements Cacheable
{
public static final VirtualColumns EMPTY = new VirtualColumns(
ImmutableList.<VirtualColumn>of(),
@ -94,6 +95,11 @@ public class VirtualColumns
return new VirtualColumns(ImmutableList.copyOf(virtualColumns), withDotSupport, withoutDotSupport);
}
public static VirtualColumns nullToEmpty(@Nullable VirtualColumns virtualColumns)
{
return virtualColumns == null ? EMPTY : virtualColumns;
}
private VirtualColumns(
List<VirtualColumn> virtualColumns,
Map<String, VirtualColumn> withDotSupport,
@ -193,6 +199,16 @@ public class VirtualColumns
}
}
public ColumnCapabilities getColumnCapabilitiesWithFallback(StorageAdapter adapter, String columnName)
{
final ColumnCapabilities virtualColumnCapabilities = getColumnCapabilities(columnName);
if (virtualColumnCapabilities != null) {
return virtualColumnCapabilities;
} else {
return adapter.getColumnCapabilities(columnName);
}
}
public boolean isEmpty()
{
return withDotSupport.isEmpty() && withoutDotSupport.isEmpty();
@ -212,18 +228,8 @@ public class VirtualColumns
public byte[] getCacheKey()
{
final byte[][] cacheKeys = new byte[virtualColumns.size()][];
int len = Ints.BYTES;
for (int i = 0; i < virtualColumns.size(); i++) {
cacheKeys[i] = virtualColumns.get(i).getCacheKey();
len += Ints.BYTES + cacheKeys[i].length;
}
final ByteBuffer buf = ByteBuffer.allocate(len).putInt(virtualColumns.size());
for (byte[] cacheKey : cacheKeys) {
buf.putInt(cacheKey.length);
buf.put(cacheKey);
}
return buf.array();
// id doesn't matter as there is only one kind of "VirtualColumns", so use 0.
return new CacheKeyBuilder((byte) 0).appendCacheablesIgnoringOrder(virtualColumns).build();
}
private void detectCycles(VirtualColumn virtualColumn, Set<String> columnNames)

View File

@ -53,7 +53,7 @@ public class IncrementalIndexSchema
this.minTimestamp = minTimestamp;
this.timestampSpec = timestampSpec;
this.gran = gran;
this.virtualColumns = virtualColumns == null ? VirtualColumns.EMPTY : virtualColumns;
this.virtualColumns = VirtualColumns.nullToEmpty(virtualColumns);
this.dimensionsSpec = dimensionsSpec;
this.metrics = metrics;
this.rollup = rollup;

View File

@ -117,6 +117,7 @@ import io.druid.query.spec.MultipleIntervalSegmentSpec;
import io.druid.segment.TestHelper;
import io.druid.segment.column.Column;
import io.druid.segment.column.ValueType;
import io.druid.segment.virtual.ExpressionVirtualColumn;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.Interval;
@ -2296,16 +2297,33 @@ public class GroupByQueryRunnerTest
TestHelper.assertExpectedObjects(
Iterables.limit(expectedResults, limit), mergeRunner.run(fullQuery, context), String.format("limit: %d", limit)
);
}
builder.setAggregatorSpecs(
Arrays.asList(
QueryRunnerTestHelper.rowsCount,
new LongSumAggregatorFactory("idx", null, "index * 2 + indexMin / 10")
@Test
public void testMergeResultsAcrossMultipleDaysWithLimitAndOrderByUsingMathExpressions()
{
final int limit = 14;
GroupByQuery.Builder builder = GroupByQuery
.builder()
.setDataSource(QueryRunnerTestHelper.dataSource)
.setInterval(QueryRunnerTestHelper.firstToThird)
.setVirtualColumns(
new ExpressionVirtualColumn("expr", "index * 2 + indexMin / 10")
)
);
fullQuery = builder.build();
.setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("quality", "alias")))
.setAggregatorSpecs(
Arrays.asList(
QueryRunnerTestHelper.rowsCount,
new LongSumAggregatorFactory("idx", "expr")
)
)
.setGranularity(QueryGranularities.DAY)
.setLimit(limit)
.addOrderByColumn("idx", OrderByColumnSpec.Direction.DESCENDING);
expectedResults = Arrays.asList(
GroupByQuery fullQuery = builder.build();
List<Row> expectedResults = Arrays.asList(
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "premium", "rows", 3L, "idx", 6090L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "mezzanine", "rows", 3L, "idx", 6030L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "entertainment", "rows", 1L, "idx", 333L),
@ -2323,8 +2341,9 @@ public class GroupByQueryRunnerTest
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "travel", "rows", 1L, "idx", 265L)
);
mergeRunner = factory.getToolchest().mergeResults(runner);
QueryRunner<Row> mergeRunner = factory.getToolchest().mergeResults(runner);
Map<String, Object> context = Maps.newHashMap();
TestHelper.assertExpectedObjects(
Iterables.limit(expectedResults, limit), mergeRunner.run(fullQuery, context), String.format("limit: %d", limit)
);
@ -2497,6 +2516,7 @@ public class GroupByQueryRunnerTest
Iterables.limit(expectedResults, 5), mergeRunner.run(builder.limit(5).build(), context), "limited"
);
// Now try it with an expression based aggregator.
builder.limit(Integer.MAX_VALUE)
.setAggregatorSpecs(
Arrays.asList(
@ -2522,6 +2542,23 @@ public class GroupByQueryRunnerTest
TestHelper.assertExpectedObjects(
Iterables.limit(expectedResults, 5), mergeRunner.run(builder.limit(5).build(), context), "limited"
);
// Now try it with an expression virtual column.
builder.limit(Integer.MAX_VALUE)
.setVirtualColumns(
new ExpressionVirtualColumn("expr", "index / 2 + indexMin")
)
.setAggregatorSpecs(
Arrays.asList(
QueryRunnerTestHelper.rowsCount,
new DoubleSumAggregatorFactory("idx", "expr")
)
);
TestHelper.assertExpectedObjects(expectedResults, mergeRunner.run(builder.build(), context), "no-limit");
TestHelper.assertExpectedObjects(
Iterables.limit(expectedResults, 5), mergeRunner.run(builder.limit(5).build(), context), "limited"
);
}
@Test
@ -4024,13 +4061,17 @@ public class GroupByQueryRunnerTest
GroupByQueryRunnerTestHelper.runQuery(factory, runner, query), ""
);
subquery = subquery.withAggregatorSpecs(
Arrays.asList(
QueryRunnerTestHelper.rowsCount,
new LongSumAggregatorFactory("idx", null, "-index + 100"),
new LongSumAggregatorFactory("indexMaxPlusTen", "indexMaxPlusTen")
subquery = new GroupByQuery.Builder(subquery)
.setVirtualColumns(
new ExpressionVirtualColumn("expr", "-index + 100")
)
);
.setAggregatorSpecs(
Arrays.asList(
QueryRunnerTestHelper.rowsCount,
new LongSumAggregatorFactory("idx", "expr"),
new LongSumAggregatorFactory("indexMaxPlusTen", "indexMaxPlusTen")
)
).build();
query = (GroupByQuery) query.withDataSource(new QueryDataSource(subquery));
expectedResults = GroupByQueryRunnerTestHelper.createExpectedRows(
@ -5138,6 +5179,34 @@ public class GroupByQueryRunnerTest
TestHelper.assertExpectedObjects(expectedResults, results, "");
}
@Test
public void testSubqueryWithOuterVirtualColumns()
{
final GroupByQuery subquery = GroupByQuery
.builder()
.setDataSource(QueryRunnerTestHelper.dataSource)
.setQuerySegmentSpec(QueryRunnerTestHelper.fullOnInterval)
.setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("quality", "alias")))
.setGranularity(QueryRunnerTestHelper.dayGran)
.build();
final GroupByQuery query = GroupByQuery
.builder()
.setDataSource(subquery)
.setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
.setVirtualColumns(new ExpressionVirtualColumn("expr", "1"))
.setDimensions(Lists.<DimensionSpec>newArrayList())
.setAggregatorSpecs(ImmutableList.<AggregatorFactory>of(new LongSumAggregatorFactory("count", "expr")))
.setGranularity(QueryRunnerTestHelper.allGran)
.build();
List<Row> expectedResults = Arrays.asList(
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "count", 18L)
);
Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
TestHelper.assertExpectedObjects(expectedResults, results, "");
}
@Test
public void testSubqueryWithOuterCardinalityAggregator()
{
@ -5145,8 +5214,10 @@ public class GroupByQueryRunnerTest
.builder()
.setDataSource(QueryRunnerTestHelper.dataSource)
.setQuerySegmentSpec(QueryRunnerTestHelper.fullOnInterval)
.setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("market", "market"),
new DefaultDimensionSpec("quality", "quality")))
.setDimensions(Lists.<DimensionSpec>newArrayList(
new DefaultDimensionSpec("market", "market"),
new DefaultDimensionSpec("quality", "quality")
))
.setAggregatorSpecs(
Arrays.asList(
QueryRunnerTestHelper.rowsCount,

View File

@ -98,6 +98,7 @@ public class GroupByTimeseriesQueryRunnerTest extends TimeseriesQueryRunnerTest
.setDimFilter(tsQuery.getDimensionsFilter())
.setAggregatorSpecs(tsQuery.getAggregatorSpecs())
.setPostAggregatorSpecs(tsQuery.getPostAggregatorSpecs())
.setVirtualColumns(tsQuery.getVirtualColumns())
.build(),
responseContext
),

View File

@ -31,6 +31,7 @@ import io.druid.query.TableDataSource;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.spec.MultipleIntervalSegmentSpec;
import io.druid.segment.VirtualColumns;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.Assert;
@ -73,6 +74,7 @@ public class TimeseriesQueryQueryToolChestTest
)
),
descending,
VirtualColumns.EMPTY,
null,
QueryGranularities.ALL,
ImmutableList.<AggregatorFactory>of(new CountAggregatorFactory("metric1")),

View File

@ -52,6 +52,7 @@ import io.druid.query.lookup.LookupExtractionFn;
import io.druid.query.ordering.StringComparators;
import io.druid.query.spec.MultipleIntervalSegmentSpec;
import io.druid.segment.TestHelper;
import io.druid.segment.virtual.ExpressionVirtualColumn;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.Interval;
@ -398,6 +399,47 @@ public class TimeseriesQueryRunnerTest
assertExpectedResults(expectedResults, results);
}
@Test
public void testTimeseriesWithVirtualColumn()
{
TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
.dataSource(QueryRunnerTestHelper.dataSource)
.granularity(QueryRunnerTestHelper.dayGran)
.intervals(QueryRunnerTestHelper.firstToThird)
.aggregators(
Arrays.<AggregatorFactory>asList(
QueryRunnerTestHelper.rowsCount,
new LongSumAggregatorFactory("idx", "expr"),
QueryRunnerTestHelper.qualityUniques
)
)
.descending(descending)
.virtualColumns(new ExpressionVirtualColumn("expr", "index"))
.build();
List<Result<TimeseriesResultValue>> expectedResults = Arrays.asList(
new Result<>(
new DateTime("2011-04-01"),
new TimeseriesResultValue(
ImmutableMap.<String, Object>of("rows", 13L, "idx", 6619L, "uniques", QueryRunnerTestHelper.UNIQUES_9)
)
),
new Result<>(
new DateTime("2011-04-02"),
new TimeseriesResultValue(
ImmutableMap.<String, Object>of("rows", 13L, "idx", 5827L, "uniques", QueryRunnerTestHelper.UNIQUES_9)
)
)
);
Iterable<Result<TimeseriesResultValue>> results = Sequences.toList(
runner.run(query, CONTEXT),
Lists.<Result<TimeseriesResultValue>>newArrayList()
);
assertExpectedResults(expectedResults, results);
}
@Test
public void testTimeseriesWithTimeZone()
{

View File

@ -44,6 +44,7 @@ import io.druid.query.dimension.DefaultDimensionSpec;
import io.druid.query.spec.MultipleIntervalSegmentSpec;
import io.druid.segment.IncrementalIndexSegment;
import io.druid.segment.TestIndex;
import io.druid.segment.VirtualColumns;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.Assert;
@ -64,6 +65,7 @@ public class TopNQueryQueryToolChestTest
new TopNQueryQueryToolChest(null, null).getCacheStrategy(
new TopNQuery(
new TableDataSource("dummy"),
VirtualColumns.EMPTY,
new DefaultDimensionSpec("test", "test"),
new NumericTopNMetricSpec("metric1"),
3,
@ -115,6 +117,7 @@ public class TopNQueryQueryToolChestTest
{
final TopNQuery query1 = new TopNQuery(
new TableDataSource("dummy"),
VirtualColumns.EMPTY,
new DefaultDimensionSpec("test", "test"),
new NumericTopNMetricSpec("post"),
3,
@ -134,6 +137,7 @@ public class TopNQueryQueryToolChestTest
final TopNQuery query2 = new TopNQuery(
new TableDataSource("dummy"),
VirtualColumns.EMPTY,
new DefaultDimensionSpec("test", "test"),
new NumericTopNMetricSpec("post"),
3,

View File

@ -77,6 +77,7 @@ import io.druid.query.timeseries.TimeseriesQuery;
import io.druid.segment.TestHelper;
import io.druid.segment.column.Column;
import io.druid.segment.column.ValueType;
import io.druid.segment.virtual.ExpressionVirtualColumn;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.Assert;
@ -3962,6 +3963,79 @@ public class TopNQueryRunnerTest
assertExpectedResults(expectedResults, query);
}
@Test
public void testFullOnTopNVirtualColumn()
{
TopNQuery query = new TopNQueryBuilder()
.dataSource(QueryRunnerTestHelper.dataSource)
.granularity(QueryRunnerTestHelper.allGran)
.dimension(new DefaultDimensionSpec("ql_expr", "ql_alias", ValueType.LONG))
.metric("maxIndex")
.threshold(4)
.intervals(QueryRunnerTestHelper.fullOnInterval)
.aggregators(
Lists.<AggregatorFactory>newArrayList(
Iterables.concat(
QueryRunnerTestHelper.commonAggregators,
Lists.newArrayList(
new DoubleMaxAggregatorFactory("maxIndex", "index"),
new DoubleMinAggregatorFactory("minIndex", "index")
)
)
)
)
.postAggregators(Arrays.<PostAggregator>asList(QueryRunnerTestHelper.addRowsIndexConstant))
.virtualColumns(new ExpressionVirtualColumn("ql_expr", "qualityLong"))
.build();
List<Result<TopNResultValue>> expectedResults = Arrays.asList(
new Result<TopNResultValue>(
new DateTime("2011-01-12T00:00:00.000Z"),
new TopNResultValue(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>builder()
.put("ql_alias", 1400L)
.put(QueryRunnerTestHelper.indexMetric, 217725.42022705078D)
.put("rows", 279L)
.put("addRowsIndexConstant", 218005.42022705078D)
.put("uniques", QueryRunnerTestHelper.UNIQUES_1)
.put("maxIndex", 1870.06103515625D)
.put("minIndex", 91.27055358886719D)
.build(),
ImmutableMap.<String, Object>builder()
.put("ql_alias", 1600L)
.put(QueryRunnerTestHelper.indexMetric, 210865.67966461182D)
.put("rows", 279L)
.put("addRowsIndexConstant", 211145.67966461182D)
.put("uniques", QueryRunnerTestHelper.UNIQUES_1)
.put("maxIndex", 1862.7379150390625D)
.put("minIndex", 99.2845230102539D)
.build(),
ImmutableMap.<String, Object>builder()
.put("ql_alias", 1000L)
.put(QueryRunnerTestHelper.indexMetric, 12270.807106018066D)
.put("rows", 93L)
.put("addRowsIndexConstant", 12364.807106018066D)
.put("uniques", QueryRunnerTestHelper.UNIQUES_1)
.put("maxIndex", 277.2735290527344D)
.put("minIndex", 71.31593322753906D)
.build(),
ImmutableMap.<String, Object>builder()
.put("ql_alias", 1200L)
.put(QueryRunnerTestHelper.indexMetric, 12086.472755432129D)
.put("rows", 93L)
.put("addRowsIndexConstant", 12180.472755432129D)
.put("uniques", QueryRunnerTestHelper.UNIQUES_1)
.put("maxIndex", 193.78756713867188D)
.put("minIndex", 84.71052551269531D)
.build()
)
)
)
);
assertExpectedResults(expectedResults, query);
}
@Test
public void testFullOnTopNLongColumnWithExFn()
{

View File

@ -43,6 +43,7 @@ import io.druid.query.topn.InvertedTopNMetricSpec;
import io.druid.query.topn.NumericTopNMetricSpec;
import io.druid.query.topn.TopNMetricSpec;
import io.druid.query.topn.TopNQuery;
import io.druid.segment.VirtualColumns;
import io.druid.segment.column.Column;
import io.druid.segment.column.ValueType;
import io.druid.sql.calcite.expression.ExtractionFns;
@ -341,6 +342,7 @@ public class DruidQueryBuilder
dataSource,
filtration.getQuerySegmentSpec(),
descending,
VirtualColumns.EMPTY,
filtration.getDimFilter(),
queryGranularity,
grouping.getAggregatorFactories(),
@ -414,6 +416,7 @@ public class DruidQueryBuilder
return new TopNQuery(
dataSource,
VirtualColumns.EMPTY,
Iterables.getOnlyElement(grouping.getDimensions()),
topNMetricSpec,
limitSpec.getLimit(),
@ -450,6 +453,7 @@ public class DruidQueryBuilder
return new GroupByQuery(
dataSource,
filtration.getQuerySegmentSpec(),
VirtualColumns.EMPTY,
filtration.getDimFilter(),
QueryGranularities.ALL,
grouping.getDimensions(),