Fix bugs in query builders and in TimeBoundaryQuery.getFilter() (#4131)

* Add queryMetrics property to Query interface; Fix bugs and removed unused code in Druids

* Fix a bug in TimeBoundaryQuery.getFilter() and remove TimeBoundaryQuery.getDimensionsFilter()

* Don't reassign query's queryMetrics if already present in CPUTimeMetricQueryRunner and MetricsEmittingQueryRunner

* Add compatibility constructor to BaseQuery

* Remove Query.queryMetrics property

* Move nullToNoopLimitSpec() method to LimitSpec interface

* Rename GroupByQuery.applyLimit() to postProcess(); Fix inconsistencies in GroupByQuery.Builder
This commit is contained in:
Roman Leventov 2017-04-26 00:32:02 +03:00 committed by Himanshu
parent a2419654ea
commit ee9b5a619a
21 changed files with 317 additions and 798 deletions

View File

@ -125,60 +125,24 @@ public class ScanQuery extends BaseQuery<ScanResultValue>
@Override
public Query<ScanResultValue> withQuerySegmentSpec(QuerySegmentSpec querySegmentSpec)
{
return new ScanQuery(
getDataSource(),
querySegmentSpec,
resultFormat,
batchSize,
limit,
dimFilter,
columns,
getContext()
);
return ScanQueryBuilder.copy(this).intervals(querySegmentSpec).build();
}
@Override
public Query<ScanResultValue> withDataSource(DataSource dataSource)
{
return new ScanQuery(
dataSource,
getQuerySegmentSpec(),
resultFormat,
batchSize,
limit,
dimFilter,
columns,
getContext()
);
return ScanQueryBuilder.copy(this).dataSource(dataSource).build();
}
@Override
public Query<ScanResultValue> withOverriddenContext(Map<String, Object> contextOverrides)
{
return new ScanQuery(
getDataSource(),
getQuerySegmentSpec(),
resultFormat,
batchSize,
limit,
dimFilter,
columns,
computeOverridenContext(contextOverrides)
);
return ScanQueryBuilder.copy(this).context(computeOverriddenContext(getContext(), contextOverrides)).build();
}
public ScanQuery withDimFilter(DimFilter dimFilter)
{
return new ScanQuery(
getDataSource(),
getQuerySegmentSpec(),
resultFormat,
batchSize,
limit,
dimFilter,
columns,
getContext()
);
return ScanQueryBuilder.copy(this).filters(dimFilter).build();
}
@Override
@ -290,12 +254,17 @@ public class ScanQuery extends BaseQuery<ScanResultValue>
);
}
public ScanQueryBuilder copy(ScanQueryBuilder builder)
public static ScanQueryBuilder copy(ScanQuery query)
{
return new ScanQueryBuilder()
.dataSource(builder.dataSource)
.intervals(builder.querySegmentSpec)
.context(builder.context);
.dataSource(query.getDataSource())
.intervals(query.getQuerySegmentSpec())
.resultFormat(query.getResultFormat())
.batchSize(query.getBatchSize())
.limit(query.getLimit())
.filters(query.getFilter())
.columns(query.getColumns())
.context(query.getContext());
}
public ScanQueryBuilder dataSource(String ds)

View File

@ -145,10 +145,12 @@ public abstract class BaseQuery<T extends Comparable<T>> implements Query<T>
return QueryContexts.parseBoolean(this, key, defaultValue);
}
protected Map<String, Object> computeOverridenContext(Map<String, Object> overrides)
protected static Map<String, Object> computeOverriddenContext(
final Map<String, Object> context,
final Map<String, Object> overrides
)
{
Map<String, Object> overridden = Maps.newTreeMap();
final Map<String, Object> context = getContext();
if (context != null) {
overridden.putAll(context);
}
@ -173,7 +175,7 @@ public abstract class BaseQuery<T extends Comparable<T>> implements Query<T>
@Override
public Query withId(String id)
{
return withOverriddenContext(ImmutableMap.<String, Object>of(QUERYID, id));
return withOverriddenContext(ImmutableMap.of(QUERYID, id));
}
@Override

View File

@ -58,9 +58,7 @@ public class CPUTimeMetricQueryRunner<T> implements QueryRunner<T>
@Override
public Sequence<T> run(
final Query<T> query, final Map<String, Object> responseContext
)
public Sequence<T> run(final Query<T> query, final Map<String, Object> responseContext)
{
final Sequence<T> baseSequence = delegate.run(query, responseContext);
return Sequences.wrap(
@ -91,7 +89,6 @@ public class CPUTimeMetricQueryRunner<T> implements QueryRunner<T>
}
);
}
public static <T> QueryRunner<T> safeBuild(
QueryRunner<T> delegate,
QueryToolChest<?, ? super Query<T>> queryToolChest,

View File

@ -367,32 +367,20 @@ public class Druids
);
}
public TimeseriesQueryBuilder copy(TimeseriesQuery query)
public static TimeseriesQueryBuilder copy(TimeseriesQuery query)
{
return new TimeseriesQueryBuilder()
.dataSource(query.getDataSource())
.intervals(query.getIntervals())
.filters(query.getDimensionsFilter())
.intervals(query.getQuerySegmentSpec())
.descending(query.isDescending())
.virtualColumns(query.getVirtualColumns())
.filters(query.getDimensionsFilter())
.granularity(query.getGranularity())
.aggregators(query.getAggregatorSpecs())
.postAggregators(query.getPostAggregatorSpecs())
.context(query.getContext());
}
public TimeseriesQueryBuilder copy(TimeseriesQueryBuilder builder)
{
return new TimeseriesQueryBuilder()
.dataSource(builder.dataSource)
.intervals(builder.querySegmentSpec)
.filters(builder.dimFilter)
.descending(builder.descending)
.granularity(builder.granularity)
.aggregators(builder.aggregatorSpecs)
.postAggregators(builder.postAggregatorSpecs)
.context(builder.context);
}
public DataSource getDataSource()
{
return dataSource;
@ -579,6 +567,7 @@ public class Druids
querySegmentSpec = null;
dimensions = null;
querySpec = null;
sortSpec = null;
context = null;
}
@ -597,32 +586,20 @@ public class Druids
);
}
public SearchQueryBuilder copy(SearchQuery query)
public static SearchQueryBuilder copy(SearchQuery query)
{
return new SearchQueryBuilder()
.dataSource(query.getDataSource())
.intervals(query.getQuerySegmentSpec())
.filters(query.getDimensionsFilter())
.granularity(query.getGranularity())
.limit(query.getLimit())
.intervals(query.getQuerySegmentSpec())
.dimensions(query.getDimensions())
.query(query.getQuery())
.sortSpec(query.getSort())
.context(query.getContext());
}
public SearchQueryBuilder copy(SearchQueryBuilder builder)
{
return new SearchQueryBuilder()
.dataSource(builder.dataSource)
.intervals(builder.querySegmentSpec)
.filters(builder.dimFilter)
.granularity(builder.granularity)
.limit(builder.limit)
.dimensions(builder.dimensions)
.query(builder.querySpec)
.context(builder.context);
}
public SearchQueryBuilder dataSource(String d)
{
dataSource = new TableDataSource(d);
@ -819,14 +796,14 @@ public class Druids
);
}
public TimeBoundaryQueryBuilder copy(TimeBoundaryQueryBuilder builder)
public static TimeBoundaryQueryBuilder copy(TimeBoundaryQuery query)
{
return new TimeBoundaryQueryBuilder()
.dataSource(builder.dataSource)
.intervals(builder.querySegmentSpec)
.bound(builder.bound)
.filters(builder.dimFilter)
.context(builder.context);
.dataSource(query.getDataSource())
.intervals(query.getQuerySegmentSpec())
.bound(query.getBound())
.filters(query.getFilter())
.context(query.getContext());
}
public TimeBoundaryQueryBuilder dataSource(String ds)
@ -993,8 +970,8 @@ public class Druids
toInclude = null;
analysisTypes = null;
merge = null;
context = null;
lenientAggregatorMerge = null;
context = null;
}
public SegmentMetadataQuery build()
@ -1011,20 +988,16 @@ public class Druids
);
}
public SegmentMetadataQueryBuilder copy(SegmentMetadataQueryBuilder builder)
public static SegmentMetadataQueryBuilder copy(SegmentMetadataQuery query)
{
final SegmentMetadataQuery.AnalysisType[] analysisTypesArray =
analysisTypes != null
? analysisTypes.toArray(new SegmentMetadataQuery.AnalysisType[analysisTypes.size()])
: null;
return new SegmentMetadataQueryBuilder()
.dataSource(builder.dataSource)
.intervals(builder.querySegmentSpec)
.toInclude(toInclude)
.analysisTypes(analysisTypesArray)
.merge(merge)
.lenientAggregatorMerge(lenientAggregatorMerge)
.context(builder.context);
.dataSource(query.getDataSource())
.intervals(query.getQuerySegmentSpec())
.toInclude(query.getToInclude())
.analysisTypes(query.getAnalysisTypes())
.merge(query.isMerge())
.lenientAggregatorMerge(query.isLenientAggregatorMerge())
.context(query.getContext());
}
public SegmentMetadataQueryBuilder dataSource(String ds)
@ -1075,6 +1048,12 @@ public class Druids
return this;
}
public SegmentMetadataQueryBuilder analysisTypes(EnumSet<SegmentMetadataQuery.AnalysisType> analysisTypes)
{
this.analysisTypes = analysisTypes;
return this;
}
public SegmentMetadataQueryBuilder merge(boolean merge)
{
this.merge = merge;
@ -1131,11 +1110,13 @@ public class Druids
{
dataSource = null;
querySegmentSpec = null;
descending = false;
context = null;
dimFilter = null;
granularity = Granularities.ALL;
dimensions = Lists.newArrayList();
metrics = Lists.newArrayList();
virtualColumns = null;
pagingSpec = null;
}
@ -1155,12 +1136,19 @@ public class Druids
);
}
public SelectQueryBuilder copy(SelectQueryBuilder builder)
public static SelectQueryBuilder copy(SelectQuery query)
{
return new SelectQueryBuilder()
.dataSource(builder.dataSource)
.intervals(builder.querySegmentSpec)
.context(builder.context);
.dataSource(query.getDataSource())
.intervals(query.getQuerySegmentSpec())
.descending(query.isDescending())
.filters(query.getFilter())
.granularity(query.getGranularity())
.dimensionSpecs(query.getDimensions())
.metrics(query.getMetrics())
.virtualColumns(query.getVirtualColumns())
.pagingSpec(query.getPagingSpec())
.context(query.getContext());
}
public SelectQueryBuilder dataSource(String ds)
@ -1317,12 +1305,12 @@ public class Druids
);
}
public DataSourceMetadataQueryBuilder copy(DataSourceMetadataQueryBuilder builder)
public static DataSourceMetadataQueryBuilder copy(DataSourceMetadataQuery query)
{
return new DataSourceMetadataQueryBuilder()
.dataSource(builder.dataSource)
.intervals(builder.querySegmentSpec)
.context(builder.context);
.dataSource(query.getDataSource())
.intervals(query.getQuerySegmentSpec())
.context(query.getContext());
}
public DataSourceMetadataQueryBuilder dataSource(String ds)

View File

@ -19,7 +19,6 @@
package io.druid.query;
import com.google.common.base.Supplier;
import com.metamx.emitter.service.ServiceEmitter;
import io.druid.java.util.common.guava.LazySequence;
import io.druid.java.util.common.guava.Sequence;
@ -91,15 +90,8 @@ public class MetricsEmittingQueryRunner<T> implements QueryRunner<T>
return Sequences.wrap(
// Use LazySequence because want to account execution time of queryRunner.run() (it prepares the underlying
// Sequence) as part of the reported query time, i. e. we want to execute queryRunner.run() after
// `startTime = System.currentTimeMillis();` (see below).
new LazySequence<>(new Supplier<Sequence<T>>()
{
@Override
public Sequence<T> get()
{
return queryRunner.run(query, responseContext);
}
}),
// `startTime = System.nanoTime();` (see below).
new LazySequence<>(() -> queryRunner.run(query, responseContext)),
new SequenceWrapper()
{
private long startTimeNs;

View File

@ -25,6 +25,7 @@ import com.google.common.collect.Lists;
import io.druid.common.utils.JodaUtils;
import io.druid.query.BaseQuery;
import io.druid.query.DataSource;
import io.druid.query.Druids;
import io.druid.query.Query;
import io.druid.query.Result;
import io.druid.query.filter.DimFilter;
@ -34,6 +35,7 @@ import org.joda.time.DateTime;
import org.joda.time.Interval;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@ -54,7 +56,7 @@ public class DataSourceMetadataQuery extends BaseQuery<Result<DataSourceMetadata
{
super(
dataSource,
(querySegmentSpec == null) ? new MultipleIntervalSegmentSpec(Arrays.asList(MY_Y2K_INTERVAL))
(querySegmentSpec == null) ? new MultipleIntervalSegmentSpec(Collections.singletonList(MY_Y2K_INTERVAL))
: querySegmentSpec,
false,
context
@ -82,31 +84,20 @@ public class DataSourceMetadataQuery extends BaseQuery<Result<DataSourceMetadata
@Override
public DataSourceMetadataQuery withOverriddenContext(Map<String, Object> contextOverrides)
{
return new DataSourceMetadataQuery(
getDataSource(),
getQuerySegmentSpec(),
computeOverridenContext(contextOverrides)
);
Map<String, Object> newContext = computeOverriddenContext(getContext(), contextOverrides);
return Druids.DataSourceMetadataQueryBuilder.copy(this).context(newContext).build();
}
@Override
public DataSourceMetadataQuery withQuerySegmentSpec(QuerySegmentSpec spec)
{
return new DataSourceMetadataQuery(
getDataSource(),
spec,
getContext()
);
return Druids.DataSourceMetadataQueryBuilder.copy(this).intervals(spec).build();
}
@Override
public Query<Result<DataSourceMetadataResultValue>> withDataSource(DataSource dataSource)
{
return new DataSourceMetadataQuery(
dataSource,
getQuerySegmentSpec(),
getContext()
);
return Druids.DataSourceMetadataQueryBuilder.copy(this).dataSource(dataSource).build();
}
public Iterable<Result<DataSourceMetadataResultValue>> buildResult(DateTime timestamp, DateTime maxIngestedEventTime)

View File

@ -25,7 +25,6 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Function;
import com.google.common.base.Functions;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
@ -61,6 +60,8 @@ import io.druid.segment.VirtualColumns;
import io.druid.segment.column.Column;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
@ -76,17 +77,10 @@ public class GroupByQuery extends BaseQuery<Row>
private final static Comparator NATURAL_NULLS_FIRST = Ordering.natural().nullsFirst();
private final static Comparator<Row> NON_GRANULAR_TIME_COMP = new Comparator<Row>()
{
@Override
public int compare(Row lhs, Row rhs)
{
return Longs.compare(
private final static Comparator<Row> NON_GRANULAR_TIME_COMP = (Row lhs, Row rhs) -> Longs.compare(
lhs.getTimestampFromEpoch(),
rhs.getTimestampFromEpoch()
);
}
};
public static Builder builder()
{
@ -102,7 +96,7 @@ public class GroupByQuery extends BaseQuery<Row>
private final List<AggregatorFactory> aggregatorSpecs;
private final List<PostAggregator> postAggregatorSpecs;
private final Function<Sequence<Row>, Sequence<Row>> limitFn;
private final Function<Sequence<Row>, Sequence<Row>> postProcessingFn;
@JsonCreator
public GroupByQuery(
@ -118,12 +112,64 @@ public class GroupByQuery extends BaseQuery<Row>
@JsonProperty("limitSpec") LimitSpec limitSpec,
@JsonProperty("context") Map<String, Object> context
)
{
this(
dataSource,
querySegmentSpec,
virtualColumns,
dimFilter,
granularity,
dimensions,
aggregatorSpecs,
postAggregatorSpecs,
havingSpec,
limitSpec,
null,
context
);
}
private Function<Sequence<Row>, Sequence<Row>> makePostProcessingFn()
{
Function<Sequence<Row>, Sequence<Row>> postProcessingFn =
limitSpec.build(dimensions, aggregatorSpecs, postAggregatorSpecs);
if (havingSpec != null) {
postProcessingFn = Functions.compose(
postProcessingFn,
(Sequence<Row> input) -> {
havingSpec.setRowSignature(GroupByQueryHelper.rowSignatureFor(GroupByQuery.this));
return Sequences.filter(input, havingSpec::eval);
}
);
}
return postProcessingFn;
}
/**
* A private constructor that avoids recomputing postProcessingFn.
*/
private GroupByQuery(
final DataSource dataSource,
final QuerySegmentSpec querySegmentSpec,
final VirtualColumns virtualColumns,
final DimFilter dimFilter,
final Granularity granularity,
final List<DimensionSpec> dimensions,
final List<AggregatorFactory> aggregatorSpecs,
final List<PostAggregator> postAggregatorSpecs,
final HavingSpec havingSpec,
final LimitSpec limitSpec,
final @Nullable Function<Sequence<Row>, Sequence<Row>> postProcessingFn,
final Map<String, Object> context
)
{
super(dataSource, querySegmentSpec, false, context);
this.virtualColumns = VirtualColumns.nullToEmpty(virtualColumns);
this.dimFilter = dimFilter;
this.granularity = granularity;
this.dimensions = dimensions == null ? ImmutableList.<DimensionSpec>of() : dimensions;
this.dimensions = dimensions == null ? ImmutableList.of() : dimensions;
for (DimensionSpec spec : this.dimensions) {
Preconditions.checkArgument(spec != null, "dimensions has null DimensionSpec");
}
@ -133,77 +179,16 @@ public class GroupByQuery extends BaseQuery<Row>
postAggregatorSpecs == null ? ImmutableList.<PostAggregator>of() : postAggregatorSpecs
);
this.havingSpec = havingSpec;
this.limitSpec = (limitSpec == null) ? new NoopLimitSpec() : limitSpec;
this.limitSpec = LimitSpec.nullToNoopLimitSpec(limitSpec);
Preconditions.checkNotNull(this.granularity, "Must specify a granularity");
// Verify no duplicate names between dimensions, aggregators, and postAggregators.
// They will all end up in the same namespace in the returned Rows and we can't have them clobbering each other.
// We're not counting __time, even though that name is problematic. See: https://github.com/druid-io/druid/pull/3684
verifyOutputNames(this.dimensions, this.aggregatorSpecs, this.postAggregatorSpecs);
Function<Sequence<Row>, Sequence<Row>> postProcFn =
this.limitSpec.build(this.dimensions, this.aggregatorSpecs, this.postAggregatorSpecs);
if (havingSpec != null) {
postProcFn = Functions.compose(
postProcFn,
new Function<Sequence<Row>, Sequence<Row>>()
{
@Override
public Sequence<Row> apply(Sequence<Row> input)
{
GroupByQuery.this.havingSpec.setRowSignature(GroupByQueryHelper.rowSignatureFor(GroupByQuery.this));
return Sequences.filter(
input,
new Predicate<Row>()
{
@Override
public boolean apply(Row input)
{
return GroupByQuery.this.havingSpec.eval(input);
}
}
);
}
}
);
}
limitFn = postProcFn;
}
/**
* A private constructor that avoids all of the various state checks. Used by the with*() methods where the checks
* have already passed in order for the object to exist.
*/
private GroupByQuery(
DataSource dataSource,
QuerySegmentSpec querySegmentSpec,
VirtualColumns virtualColumns,
DimFilter dimFilter,
Granularity granularity,
List<DimensionSpec> dimensions,
List<AggregatorFactory> aggregatorSpecs,
List<PostAggregator> postAggregatorSpecs,
HavingSpec havingSpec,
LimitSpec orderBySpec,
Function<Sequence<Row>, Sequence<Row>> limitFn,
Map<String, Object> context
)
{
super(dataSource, querySegmentSpec, false, context);
this.virtualColumns = virtualColumns;
this.dimFilter = dimFilter;
this.granularity = granularity;
this.dimensions = dimensions;
this.aggregatorSpecs = aggregatorSpecs;
this.postAggregatorSpecs = postAggregatorSpecs;
this.havingSpec = havingSpec;
this.limitSpec = orderBySpec;
this.limitFn = limitFn;
this.postProcessingFn = postProcessingFn != null ? postProcessingFn : makePostProcessingFn();
}
@JsonProperty
@ -284,11 +269,7 @@ public class GroupByQuery extends BaseQuery<Row>
final Ordering<Row> rowOrdering = getRowOrdering(false);
return Ordering.from(
new Comparator<Object>()
{
@Override
public int compare(Object lhs, Object rhs)
{
(lhs, rhs) -> {
if (lhs instanceof Row) {
return rowOrdering.compare((Row) lhs, (Row) rhs);
} else {
@ -296,7 +277,6 @@ public class GroupByQuery extends BaseQuery<Row>
return NATURAL_NULLS_FIRST.compare(lhs, rhs);
}
}
}
);
}
@ -307,23 +287,10 @@ public class GroupByQuery extends BaseQuery<Row>
final Comparator<Row> timeComparator = getTimeComparator(granular);
if (timeComparator == null) {
return Ordering.from(
new Comparator<Row>()
{
@Override
public int compare(Row lhs, Row rhs)
{
return compareDims(dimensions, lhs, rhs);
}
}
);
return Ordering.from((lhs, rhs) -> compareDims(dimensions, lhs, rhs));
} else if (sortByDimsFirst) {
return Ordering.from(
new Comparator<Row>()
{
@Override
public int compare(Row lhs, Row rhs)
{
(lhs, rhs) -> {
final int cmp = compareDims(dimensions, lhs, rhs);
if (cmp != 0) {
return cmp;
@ -331,15 +298,10 @@ public class GroupByQuery extends BaseQuery<Row>
return timeComparator.compare(lhs, rhs);
}
}
);
} else {
return Ordering.from(
new Comparator<Row>()
{
@Override
public int compare(Row lhs, Row rhs)
{
(lhs, rhs) -> {
final int timeCompare = timeComparator.compare(lhs, rhs);
if (timeCompare != 0) {
@ -348,7 +310,6 @@ public class GroupByQuery extends BaseQuery<Row>
return compareDims(dimensions, lhs, rhs);
}
}
);
}
}
@ -358,17 +319,10 @@ public class GroupByQuery extends BaseQuery<Row>
if (Granularities.ALL.equals(granularity)) {
return null;
} else if (granular) {
return new Comparator<Row>()
{
@Override
public int compare(Row lhs, Row rhs)
{
return Longs.compare(
return (lhs, rhs) -> Longs.compare(
granularity.bucketStart(lhs.getTimestamp()).getMillis(),
granularity.bucketStart(rhs.getTimestamp()).getMillis()
);
}
};
} else {
return NON_GRANULAR_TIME_COMP;
}
@ -398,155 +352,52 @@ public class GroupByQuery extends BaseQuery<Row>
*
* @return sequence of rows after applying havingSpec and limitSpec
*/
public Sequence<Row> applyLimit(Sequence<Row> results)
public Sequence<Row> postProcess(Sequence<Row> results)
{
return limitFn.apply(results);
return postProcessingFn.apply(results);
}
@Override
public GroupByQuery withOverriddenContext(Map<String, Object> contextOverride)
{
return new GroupByQuery(
getDataSource(),
getQuerySegmentSpec(),
virtualColumns,
dimFilter,
granularity,
dimensions,
aggregatorSpecs,
postAggregatorSpecs,
havingSpec,
limitSpec,
limitFn,
computeOverridenContext(contextOverride)
);
return new Builder(this).overrideContext(contextOverride).build();
}
@Override
public GroupByQuery withQuerySegmentSpec(QuerySegmentSpec spec)
{
return new GroupByQuery(
getDataSource(),
spec,
virtualColumns,
dimFilter,
granularity,
dimensions,
aggregatorSpecs,
postAggregatorSpecs,
havingSpec,
limitSpec,
limitFn,
getContext()
);
return new Builder(this).setQuerySegmentSpec(spec).build();
}
public GroupByQuery withDimFilter(final DimFilter dimFilter)
{
return new GroupByQuery(
getDataSource(),
getQuerySegmentSpec(),
virtualColumns,
dimFilter,
getGranularity(),
getDimensions(),
getAggregatorSpecs(),
getPostAggregatorSpecs(),
getHavingSpec(),
getLimitSpec(),
limitFn,
getContext()
);
return new Builder(this).setDimFilter(dimFilter).build();
}
@Override
public Query<Row> withDataSource(DataSource dataSource)
{
return new GroupByQuery(
dataSource,
getQuerySegmentSpec(),
virtualColumns,
dimFilter,
granularity,
dimensions,
aggregatorSpecs,
postAggregatorSpecs,
havingSpec,
limitSpec,
limitFn,
getContext()
);
return new Builder(this).setDataSource(dataSource).build();
}
public GroupByQuery withDimensionSpecs(final List<DimensionSpec> dimensionSpecs)
{
return new GroupByQuery(
getDataSource(),
getQuerySegmentSpec(),
virtualColumns,
getDimFilter(),
getGranularity(),
dimensionSpecs,
getAggregatorSpecs(),
getPostAggregatorSpecs(),
getHavingSpec(),
getLimitSpec(),
limitFn,
getContext()
);
return new Builder(this).setDimensions(dimensionSpecs).build();
}
public GroupByQuery withLimitSpec(final LimitSpec limitSpec)
public GroupByQuery withLimitSpec(LimitSpec limitSpec)
{
return new GroupByQuery(
getDataSource(),
getQuerySegmentSpec(),
virtualColumns,
getDimFilter(),
getGranularity(),
getDimensions(),
getAggregatorSpecs(),
getPostAggregatorSpecs(),
getHavingSpec(),
limitSpec,
getContext()
);
return new Builder(this).setLimitSpec(limitSpec).build();
}
public GroupByQuery withAggregatorSpecs(final List<AggregatorFactory> aggregatorSpecs)
{
return new GroupByQuery(
getDataSource(),
getQuerySegmentSpec(),
virtualColumns,
getDimFilter(),
getGranularity(),
getDimensions(),
aggregatorSpecs,
getPostAggregatorSpecs(),
getHavingSpec(),
getLimitSpec(),
limitFn,
getContext()
);
return new Builder(this).setAggregatorSpecs(aggregatorSpecs).build();
}
public GroupByQuery withPostAggregatorSpecs(final List<PostAggregator> postAggregatorSpecs)
{
return new GroupByQuery(
getDataSource(),
getQuerySegmentSpec(),
virtualColumns,
getDimFilter(),
getGranularity(),
getDimensions(),
getAggregatorSpecs(),
postAggregatorSpecs,
getHavingSpec(),
getLimitSpec(),
limitFn,
getContext()
);
return new Builder(this).setPostAggregatorSpecs(postAggregatorSpecs).build();
}
private static void verifyOutputNames(
@ -597,6 +448,7 @@ public class GroupByQuery extends BaseQuery<Row>
private Map<String, Object> context;
private LimitSpec limitSpec = null;
private Function<Sequence<Row>, Sequence<Row>> postProcessingFn;
private List<OrderByColumnSpec> orderByColumnSpecs = Lists.newArrayList();
private int limit = Integer.MAX_VALUE;
@ -609,13 +461,14 @@ public class GroupByQuery extends BaseQuery<Row>
dataSource = query.getDataSource();
querySegmentSpec = query.getQuerySegmentSpec();
virtualColumns = query.getVirtualColumns();
limitSpec = query.getLimitSpec();
dimFilter = query.getDimFilter();
granularity = query.getGranularity();
dimensions = query.getDimensions();
aggregatorSpecs = query.getAggregatorSpecs();
postAggregatorSpecs = query.getPostAggregatorSpecs();
havingSpec = query.getHavingSpec();
limitSpec = query.getLimitSpec();
postProcessingFn = query.postProcessingFn;
context = query.getContext();
}
@ -624,15 +477,16 @@ public class GroupByQuery extends BaseQuery<Row>
dataSource = builder.dataSource;
querySegmentSpec = builder.querySegmentSpec;
virtualColumns = builder.virtualColumns;
limitSpec = builder.limitSpec;
dimFilter = builder.dimFilter;
granularity = builder.granularity;
dimensions = builder.dimensions;
aggregatorSpecs = builder.aggregatorSpecs;
postAggregatorSpecs = builder.postAggregatorSpecs;
havingSpec = builder.havingSpec;
limitSpec = builder.limitSpec;
postProcessingFn = builder.postProcessingFn;
limit = builder.limit;
orderByColumnSpecs = new ArrayList<>(builder.orderByColumnSpecs);
context = builder.context;
}
@ -692,16 +546,17 @@ public class GroupByQuery extends BaseQuery<Row>
return this;
}
public Builder limit(int limit)
public Builder setLimit(int limit)
{
ensureExplicitLimitNotSet();
ensureExplicitLimitSpecNotSet();
this.limit = limit;
this.postProcessingFn = null;
return this;
}
public Builder addOrderByColumn(String dimension)
{
return addOrderByColumn(dimension, (OrderByColumnSpec.Direction) null);
return addOrderByColumn(dimension, null);
}
public Builder addOrderByColumn(String dimension, OrderByColumnSpec.Direction direction)
@ -711,19 +566,22 @@ public class GroupByQuery extends BaseQuery<Row>
public Builder addOrderByColumn(OrderByColumnSpec columnSpec)
{
ensureExplicitLimitNotSet();
ensureExplicitLimitSpecNotSet();
this.orderByColumnSpecs.add(columnSpec);
this.postProcessingFn = null;
return this;
}
public Builder setLimitSpec(LimitSpec limitSpec)
{
Preconditions.checkNotNull(limitSpec);
ensureFluentLimitsNotSet();
this.limitSpec = limitSpec;
this.postProcessingFn = null;
return this;
}
private void ensureExplicitLimitNotSet()
private void ensureExplicitLimitSpecNotSet()
{
if (limitSpec != null) {
throw new ISE("Ambiguous build, limitSpec[%s] already set", limitSpec);
@ -772,12 +630,14 @@ public class GroupByQuery extends BaseQuery<Row>
}
dimensions.add(dimension);
this.postProcessingFn = null;
return this;
}
public Builder setDimensions(List<DimensionSpec> dimensions)
{
this.dimensions = Lists.newArrayList(dimensions);
this.postProcessingFn = null;
return this;
}
@ -788,12 +648,14 @@ public class GroupByQuery extends BaseQuery<Row>
}
aggregatorSpecs.add(aggregator);
this.postProcessingFn = null;
return this;
}
public Builder setAggregatorSpecs(List<AggregatorFactory> aggregatorSpecs)
{
this.aggregatorSpecs = Lists.newArrayList(aggregatorSpecs);
this.postProcessingFn = null;
return this;
}
@ -804,12 +666,14 @@ public class GroupByQuery extends BaseQuery<Row>
}
postAggregatorSpecs.add(postAgg);
this.postProcessingFn = null;
return this;
}
public Builder setPostAggregatorSpecs(List<PostAggregator> postAggregatorSpecs)
{
this.postAggregatorSpecs = Lists.newArrayList(postAggregatorSpecs);
this.postProcessingFn = null;
return this;
}
@ -819,17 +683,16 @@ public class GroupByQuery extends BaseQuery<Row>
return this;
}
public Builder setHavingSpec(HavingSpec havingSpec)
public Builder overrideContext(Map<String, Object> contextOverride)
{
this.havingSpec = havingSpec;
this.context = computeOverriddenContext(context, contextOverride);
return this;
}
public Builder setLimit(Integer limit)
public Builder setHavingSpec(HavingSpec havingSpec)
{
this.limit = limit;
this.havingSpec = havingSpec;
this.postProcessingFn = null;
return this;
}
@ -843,7 +706,7 @@ public class GroupByQuery extends BaseQuery<Row>
final LimitSpec theLimitSpec;
if (limitSpec == null) {
if (orderByColumnSpecs.isEmpty() && limit == Integer.MAX_VALUE) {
theLimitSpec = new NoopLimitSpec();
theLimitSpec = NoopLimitSpec.instance();
} else {
theLimitSpec = new DefaultLimitSpec(orderByColumnSpecs, limit);
}
@ -862,6 +725,7 @@ public class GroupByQuery extends BaseQuery<Row>
postAggregatorSpecs,
havingSpec,
theLimitSpec,
postProcessingFn,
context
);
}

View File

@ -29,6 +29,7 @@ import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.dimension.DimensionSpec;
import javax.annotation.Nullable;
import java.util.List;
/**
@ -39,6 +40,11 @@ import java.util.List;
})
public interface LimitSpec extends Cacheable
{
static LimitSpec nullToNoopLimitSpec(@Nullable LimitSpec limitSpec)
{
return (limitSpec == null) ? NoopLimitSpec.instance() : limitSpec;
}
/**
* Returns a function that applies a limit to an input sequence that is assumed to be sorted on dimensions.
*

View File

@ -19,6 +19,7 @@
package io.druid.query.groupby.orderby;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.google.common.base.Function;
import com.google.common.base.Functions;
import io.druid.data.input.Row;
@ -31,10 +32,22 @@ import java.util.List;
/**
*/
public class NoopLimitSpec implements LimitSpec
public final class NoopLimitSpec implements LimitSpec
{
private static final byte CACHE_KEY = 0x0;
public static final NoopLimitSpec INSTANCE = new NoopLimitSpec();
@JsonCreator
public static NoopLimitSpec instance()
{
return INSTANCE;
}
private NoopLimitSpec()
{
}
@Override
public Function<Sequence<Row>, Sequence<Row>> build(
List<DimensionSpec> dimensions, List<AggregatorFactory> aggs, List<PostAggregator> postAggs

View File

@ -47,6 +47,7 @@ import io.druid.query.groupby.GroupByQueryConfig;
import io.druid.query.groupby.GroupByQueryEngine;
import io.druid.query.groupby.GroupByQueryHelper;
import io.druid.query.groupby.GroupByQueryQueryToolChest;
import io.druid.query.groupby.orderby.NoopLimitSpec;
import io.druid.query.groupby.resource.GroupByQueryResource;
import io.druid.query.spec.MultipleIntervalSegmentSpec;
import io.druid.segment.StorageAdapter;
@ -119,38 +120,31 @@ public class GroupByStrategyV1 implements GroupByStrategy
configSupplier.get(),
bufferPool,
baseRunner.run(
new GroupByQuery(
query.getDataSource(),
query.getQuerySegmentSpec(),
query.getVirtualColumns(),
query.getDimFilter(),
query.getGranularity(),
query.getDimensions(),
query.getAggregatorSpecs(),
new GroupByQuery.Builder(query)
// Don't do post aggs until the end of this method.
ImmutableList.<PostAggregator>of(),
.setPostAggregatorSpecs(ImmutableList.of())
// Don't do "having" clause until the end of this method.
null,
null,
query.getContext()
).withOverriddenContext(
ImmutableMap.<String, Object>of(
.setHavingSpec(null)
.setLimitSpec(NoopLimitSpec.instance())
.overrideContext(
ImmutableMap.of(
"finalize", false,
//setting sort to false avoids unnecessary sorting while merging results. we only need to sort
//in the end when returning results to user. (note this is only respected by groupBy v1)
GroupByQueryHelper.CTX_KEY_SORT_RESULTS, false,
//no merging needed at historicals because GroupByQueryRunnerFactory.mergeRunners(..) would return
//merged results. (note this is only respected by groupBy v1)
//no merging needed at historicals because GroupByQueryRunnerFactory.mergeRunners(..) would
//return merged results. (note this is only respected by groupBy v1)
GroupByQueryQueryToolChest.GROUP_BY_MERGE_KEY, false,
GroupByQueryConfig.CTX_KEY_STRATEGY, GroupByStrategySelector.STRATEGY_V1
)
),
)
.build(),
responseContext
),
true
);
return Sequences.withBaggage(query.applyLimit(GroupByQueryHelper.postAggregate(query, index)), index);
return Sequences.withBaggage(query.postProcess(GroupByQueryHelper.postAggregate(query, index)), index);
}
@Override
@ -253,7 +247,7 @@ public class GroupByStrategyV1 implements GroupByStrategy
innerQueryResultIndex.close();
return Sequences.withBaggage(
outerQuery.applyLimit(GroupByQueryHelper.postAggregate(query, outerQueryResultIndex)),
outerQuery.postProcess(GroupByQueryHelper.postAggregate(query, outerQueryResultIndex)),
outerQueryResultIndex
);
}

View File

@ -61,6 +61,7 @@ import io.druid.query.groupby.epinephelinae.GroupByBinaryFnV2;
import io.druid.query.groupby.epinephelinae.GroupByMergingQueryRunnerV2;
import io.druid.query.groupby.epinephelinae.GroupByQueryEngineV2;
import io.druid.query.groupby.epinephelinae.GroupByRowProcessor;
import io.druid.query.groupby.orderby.NoopLimitSpec;
import io.druid.query.groupby.resource.GroupByQueryResource;
import io.druid.segment.StorageAdapter;
import org.joda.time.DateTime;
@ -229,31 +230,24 @@ public class GroupByStrategyV2 implements GroupByStrategy
// Fudge timestamp, maybe.
final DateTime fudgeTimestamp = getUniversalTimestamp(query);
return query.applyLimit(
return query.postProcess(
Sequences.map(
mergingQueryRunner.run(
new GroupByQuery(
query.getDataSource(),
query.getQuerySegmentSpec(),
query.getVirtualColumns(),
query.getDimFilter(),
query.getGranularity(),
query.getDimensions(),
query.getAggregatorSpecs(),
new GroupByQuery.Builder(query)
// Don't do post aggs until the end of this method.
ImmutableList.<PostAggregator>of(),
.setPostAggregatorSpecs(ImmutableList.of())
// Don't do "having" clause until the end of this method.
null,
null,
query.getContext()
).withOverriddenContext(
ImmutableMap.<String, Object>of(
.setHavingSpec(null)
.setLimitSpec(NoopLimitSpec.instance())
.overrideContext(
ImmutableMap.of(
"finalize", false,
GroupByQueryConfig.CTX_KEY_STRATEGY, GroupByStrategySelector.STRATEGY_V2,
CTX_KEY_FUDGE_TIMESTAMP, fudgeTimestamp == null ? "" : String.valueOf(fudgeTimestamp.getMillis()),
CTX_KEY_OUTERMOST, false
)
),
)
.build(),
responseContext
),
new Function<Row, Row>()

View File

@ -27,6 +27,7 @@ import com.google.common.collect.Lists;
import io.druid.common.utils.JodaUtils;
import io.druid.query.BaseQuery;
import io.druid.query.DataSource;
import io.druid.query.Druids;
import io.druid.query.Query;
import io.druid.query.TableDataSource;
import io.druid.query.UnionDataSource;
@ -232,60 +233,25 @@ public class SegmentMetadataQuery extends BaseQuery<SegmentAnalysis>
@Override
public Query<SegmentAnalysis> withOverriddenContext(Map<String, Object> contextOverride)
{
return new SegmentMetadataQuery(
getDataSource(),
getQuerySegmentSpec(),
toInclude,
merge,
computeOverridenContext(contextOverride),
analysisTypes,
usingDefaultInterval,
lenientAggregatorMerge
);
Map<String, Object> newContext = computeOverriddenContext(getContext(), contextOverride);
return Druids.SegmentMetadataQueryBuilder.copy(this).context(newContext).build();
}
@Override
public Query<SegmentAnalysis> withQuerySegmentSpec(QuerySegmentSpec spec)
{
return new SegmentMetadataQuery(
getDataSource(),
spec,
toInclude,
merge,
getContext(),
analysisTypes,
usingDefaultInterval,
lenientAggregatorMerge
);
return Druids.SegmentMetadataQueryBuilder.copy(this).intervals(spec).build();
}
@Override
public Query<SegmentAnalysis> withDataSource(DataSource dataSource)
{
return new SegmentMetadataQuery(
dataSource,
getQuerySegmentSpec(),
toInclude,
merge,
getContext(),
analysisTypes,
usingDefaultInterval,
lenientAggregatorMerge
);
return Druids.SegmentMetadataQueryBuilder.copy(this).dataSource(dataSource).build();
}
public Query<SegmentAnalysis> withColumns(ColumnIncluderator includerator)
{
return new SegmentMetadataQuery(
getDataSource(),
getQuerySegmentSpec(),
includerator,
merge,
getContext(),
analysisTypes,
usingDefaultInterval,
lenientAggregatorMerge
);
return Druids.SegmentMetadataQueryBuilder.copy(this).toInclude(includerator).build();
}
@Override

View File

@ -26,6 +26,7 @@ import io.druid.java.util.common.granularity.Granularities;
import io.druid.java.util.common.granularity.Granularity;
import io.druid.query.BaseQuery;
import io.druid.query.DataSource;
import io.druid.query.Druids;
import io.druid.query.Query;
import io.druid.query.Result;
import io.druid.query.dimension.DimensionSpec;
@ -95,64 +96,25 @@ public class SearchQuery extends BaseQuery<Result<SearchResultValue>>
@Override
public SearchQuery withQuerySegmentSpec(QuerySegmentSpec spec)
{
return new SearchQuery(
getDataSource(),
dimFilter,
granularity,
limit,
spec,
dimensions,
querySpec,
sortSpec,
getContext()
);
return Druids.SearchQueryBuilder.copy(this).intervals(spec).build();
}
@Override
public Query<Result<SearchResultValue>> withDataSource(DataSource dataSource)
{
return new SearchQuery(
dataSource,
dimFilter,
granularity,
limit,
getQuerySegmentSpec(),
dimensions,
querySpec,
sortSpec,
getContext()
);
return Druids.SearchQueryBuilder.copy(this).dataSource(dataSource).build();
}
@Override
public SearchQuery withOverriddenContext(Map<String, Object> contextOverrides)
{
return new SearchQuery(
getDataSource(),
dimFilter,
granularity,
limit,
getQuerySegmentSpec(),
dimensions,
querySpec,
sortSpec,
computeOverridenContext(contextOverrides)
);
Map<String, Object> newContext = computeOverriddenContext(getContext(), contextOverrides);
return Druids.SearchQueryBuilder.copy(this).context(newContext).build();
}
public SearchQuery withDimFilter(DimFilter dimFilter)
{
return new SearchQuery(
getDataSource(),
dimFilter,
granularity,
limit,
getQuerySegmentSpec(),
dimensions,
querySpec,
sortSpec,
getContext()
);
return Druids.SearchQueryBuilder.copy(this).filters(dimFilter).build();
}
@JsonProperty("filter")
@ -193,17 +155,7 @@ public class SearchQuery extends BaseQuery<Result<SearchResultValue>>
public SearchQuery withLimit(int newLimit)
{
return new SearchQuery(
getDataSource(),
dimFilter,
granularity,
newLimit,
getQuerySegmentSpec(),
dimensions,
querySpec,
sortSpec,
getContext()
);
return Druids.SearchQueryBuilder.copy(this).limit(newLimit).build();
}
@Override

View File

@ -26,6 +26,7 @@ import com.google.common.base.Preconditions;
import io.druid.java.util.common.granularity.Granularity;
import io.druid.query.BaseQuery;
import io.druid.query.DataSource;
import io.druid.query.Druids;
import io.druid.query.Query;
import io.druid.query.Result;
import io.druid.query.dimension.DimensionSpec;
@ -146,83 +147,29 @@ public class SelectQuery extends BaseQuery<Result<SelectResultValue>>
public SelectQuery withQuerySegmentSpec(QuerySegmentSpec querySegmentSpec)
{
return new SelectQuery(
getDataSource(),
querySegmentSpec,
isDescending(),
dimFilter,
granularity,
dimensions,
metrics,
virtualColumns,
pagingSpec,
getContext()
);
return Druids.SelectQueryBuilder.copy(this).intervals(querySegmentSpec).build();
}
@Override
public Query<Result<SelectResultValue>> withDataSource(DataSource dataSource)
{
return new SelectQuery(
dataSource,
getQuerySegmentSpec(),
isDescending(),
dimFilter,
granularity,
dimensions,
metrics,
virtualColumns,
pagingSpec,
getContext()
);
return Druids.SelectQueryBuilder.copy(this).dataSource(dataSource).build();
}
public SelectQuery withOverriddenContext(Map<String, Object> contextOverrides)
{
return new SelectQuery(
getDataSource(),
getQuerySegmentSpec(),
isDescending(),
dimFilter,
granularity,
dimensions,
metrics,
virtualColumns,
pagingSpec,
computeOverridenContext(contextOverrides)
);
Map<String, Object> newContext = computeOverriddenContext(getContext(), contextOverrides);
return Druids.SelectQueryBuilder.copy(this).context(newContext).build();
}
public SelectQuery withPagingSpec(PagingSpec pagingSpec)
{
return new SelectQuery(
getDataSource(),
getQuerySegmentSpec(),
isDescending(),
dimFilter,
granularity,
dimensions,
metrics,
virtualColumns,
pagingSpec,
getContext()
);
return Druids.SelectQueryBuilder.copy(this).pagingSpec(pagingSpec).build();
}
public SelectQuery withDimFilter(DimFilter dimFilter)
{
return new SelectQuery(
getDataSource(),
getQuerySegmentSpec(),
isDescending(),
dimFilter,
granularity,
dimensions,
metrics,
virtualColumns,
pagingSpec,
getContext()
);
return Druids.SelectQueryBuilder.copy(this).filters(dimFilter).build();
}
@Override

View File

@ -27,6 +27,7 @@ import io.druid.common.utils.JodaUtils;
import io.druid.java.util.common.StringUtils;
import io.druid.query.BaseQuery;
import io.druid.query.DataSource;
import io.druid.query.Druids;
import io.druid.query.Query;
import io.druid.query.Result;
import io.druid.query.filter.DimFilter;
@ -82,10 +83,11 @@ public class TimeBoundaryQuery extends BaseQuery<Result<TimeBoundaryResultValue>
return dimFilter != null;
}
@JsonProperty("filter")
@Override
public DimFilter getFilter()
{
return null;
return dimFilter;
}
@Override
@ -94,12 +96,6 @@ public class TimeBoundaryQuery extends BaseQuery<Result<TimeBoundaryResultValue>
return Query.TIME_BOUNDARY;
}
@JsonProperty("filter")
public DimFilter getDimensionsFilter()
{
return dimFilter;
}
@JsonProperty
public String getBound()
{
@ -109,37 +105,20 @@ public class TimeBoundaryQuery extends BaseQuery<Result<TimeBoundaryResultValue>
@Override
public TimeBoundaryQuery withOverriddenContext(Map<String, Object> contextOverrides)
{
return new TimeBoundaryQuery(
getDataSource(),
getQuerySegmentSpec(),
bound,
dimFilter,
computeOverridenContext(contextOverrides)
);
Map<String, Object> newContext = computeOverriddenContext(getContext(), contextOverrides);
return Druids.TimeBoundaryQueryBuilder.copy(this).context(newContext).build();
}
@Override
public TimeBoundaryQuery withQuerySegmentSpec(QuerySegmentSpec spec)
{
return new TimeBoundaryQuery(
getDataSource(),
spec,
bound,
dimFilter,
getContext()
);
return Druids.TimeBoundaryQueryBuilder.copy(this).intervals(spec).build();
}
@Override
public Query<Result<TimeBoundaryResultValue>> withDataSource(DataSource dataSource)
{
return new TimeBoundaryQuery(
dataSource,
getQuerySegmentSpec(),
bound,
dimFilter,
getContext()
);
return Druids.TimeBoundaryQueryBuilder.copy(this).dataSource(dataSource).build();
}
public byte[] getCacheKey()

View File

@ -112,7 +112,8 @@ public class TimeBoundaryQueryRunnerFactory
final Sequence<Result<DateTime>> resultSequence = QueryRunnerHelper.makeCursorBasedQuery(
adapter,
legacyQuery.getQuerySegmentSpec().getIntervals(),
Filters.toFilter(legacyQuery.getDimensionsFilter()), VirtualColumns.EMPTY,
Filters.toFilter(legacyQuery.getFilter()),
VirtualColumns.EMPTY,
descending,
Granularities.ALL,
this.skipToFirstMatching
@ -154,7 +155,7 @@ public class TimeBoundaryQueryRunnerFactory
final DateTime minTime;
final DateTime maxTime;
if (legacyQuery.getDimensionsFilter() != null) {
if (legacyQuery.getFilter() != null) {
minTime = getTimeBoundary(adapter, legacyQuery, false);
if (minTime == null) {
maxTime = null;

View File

@ -26,6 +26,7 @@ import com.google.common.collect.ImmutableList;
import io.druid.java.util.common.granularity.Granularity;
import io.druid.query.BaseQuery;
import io.druid.query.DataSource;
import io.druid.query.Druids;
import io.druid.query.Queries;
import io.druid.query.Query;
import io.druid.query.Result;
@ -64,15 +65,14 @@ public class TimeseriesQuery extends BaseQuery<Result<TimeseriesResultValue>>
)
{
super(dataSource, querySegmentSpec, descending, context);
this.virtualColumns = VirtualColumns.nullToEmpty(virtualColumns);
this.dimFilter = dimFilter;
this.granularity = granularity;
this.aggregatorSpecs = aggregatorSpecs == null ? ImmutableList.<AggregatorFactory>of() : aggregatorSpecs;
this.aggregatorSpecs = aggregatorSpecs == null ? ImmutableList.of() : aggregatorSpecs;
this.postAggregatorSpecs = Queries.prepareAggregations(
this.aggregatorSpecs,
postAggregatorSpecs == null
? ImmutableList.<PostAggregator>of()
: postAggregatorSpecs
postAggregatorSpecs == null ? ImmutableList.of() : postAggregatorSpecs
);
}
@ -131,78 +131,29 @@ public class TimeseriesQuery extends BaseQuery<Result<TimeseriesResultValue>>
public TimeseriesQuery withQuerySegmentSpec(QuerySegmentSpec querySegmentSpec)
{
return new TimeseriesQuery(
getDataSource(),
querySegmentSpec,
isDescending(),
virtualColumns,
dimFilter,
granularity,
aggregatorSpecs,
postAggregatorSpecs,
getContext()
);
return Druids.TimeseriesQueryBuilder.copy(this).intervals(querySegmentSpec).build();
}
@Override
public Query<Result<TimeseriesResultValue>> withDataSource(DataSource dataSource)
{
return new TimeseriesQuery(
dataSource,
getQuerySegmentSpec(),
isDescending(),
virtualColumns,
dimFilter,
granularity,
aggregatorSpecs,
postAggregatorSpecs,
getContext()
);
return Druids.TimeseriesQueryBuilder.copy(this).dataSource(dataSource).build();
}
public TimeseriesQuery withOverriddenContext(Map<String, Object> contextOverrides)
{
return new TimeseriesQuery(
getDataSource(),
getQuerySegmentSpec(),
isDescending(),
virtualColumns,
dimFilter,
granularity,
aggregatorSpecs,
postAggregatorSpecs,
computeOverridenContext(contextOverrides)
);
Map<String, Object> newContext = computeOverriddenContext(getContext(), contextOverrides);
return Druids.TimeseriesQueryBuilder.copy(this).context(newContext).build();
}
public TimeseriesQuery withDimFilter(DimFilter dimFilter)
{
return new TimeseriesQuery(
getDataSource(),
getQuerySegmentSpec(),
isDescending(),
virtualColumns,
dimFilter,
granularity,
aggregatorSpecs,
postAggregatorSpecs,
getContext()
);
return Druids.TimeseriesQueryBuilder.copy(this).filters(dimFilter).build();
}
public TimeseriesQuery withPostAggregatorSpecs(final List<PostAggregator> postAggregatorSpecs)
{
return new TimeseriesQuery(
getDataSource(),
getQuerySegmentSpec(),
isDescending(),
virtualColumns,
dimFilter,
granularity,
aggregatorSpecs,
postAggregatorSpecs,
getContext()
);
return Druids.TimeseriesQueryBuilder.copy(this).postAggregators(postAggregatorSpecs).build();
}
@Override

View File

@ -71,6 +71,7 @@ public class TopNQuery extends BaseQuery<Result<TopNResultValue>>
)
{
super(dataSource, querySegmentSpec, false, context);
this.virtualColumns = VirtualColumns.nullToEmpty(virtualColumns);
this.dimensionSpec = dimensionSpec;
this.topNMetricSpec = topNMetricSpec;
@ -169,139 +170,43 @@ public class TopNQuery extends BaseQuery<Result<TopNResultValue>>
public TopNQuery withQuerySegmentSpec(QuerySegmentSpec querySegmentSpec)
{
return new TopNQuery(
getDataSource(),
virtualColumns,
dimensionSpec,
topNMetricSpec,
threshold,
querySegmentSpec,
dimFilter,
granularity,
aggregatorSpecs,
postAggregatorSpecs,
getContext()
);
return new TopNQueryBuilder(this).intervals(querySegmentSpec).build();
}
public TopNQuery withDimensionSpec(DimensionSpec spec)
{
return new TopNQuery(
getDataSource(),
virtualColumns,
spec,
topNMetricSpec,
threshold,
getQuerySegmentSpec(),
dimFilter,
granularity,
aggregatorSpecs,
postAggregatorSpecs,
getContext()
);
return new TopNQueryBuilder(this).dimension(spec).build();
}
public TopNQuery withAggregatorSpecs(List<AggregatorFactory> aggregatorSpecs)
{
return new TopNQuery(
getDataSource(),
virtualColumns,
getDimensionSpec(),
topNMetricSpec,
threshold,
getQuerySegmentSpec(),
dimFilter,
granularity,
aggregatorSpecs,
postAggregatorSpecs,
getContext()
);
return new TopNQueryBuilder(this).aggregators(aggregatorSpecs).build();
}
public TopNQuery withPostAggregatorSpecs(List<PostAggregator> postAggregatorSpecs)
{
return new TopNQuery(
getDataSource(),
virtualColumns,
getDimensionSpec(),
topNMetricSpec,
threshold,
getQuerySegmentSpec(),
dimFilter,
granularity,
aggregatorSpecs,
postAggregatorSpecs,
getContext()
);
return new TopNQueryBuilder(this).postAggregators(postAggregatorSpecs).build();
}
@Override
public Query<Result<TopNResultValue>> withDataSource(DataSource dataSource)
{
return new TopNQuery(
dataSource,
virtualColumns,
dimensionSpec,
topNMetricSpec,
threshold,
getQuerySegmentSpec(),
dimFilter,
granularity,
aggregatorSpecs,
postAggregatorSpecs,
getContext()
);
return new TopNQueryBuilder(this).dataSource(dataSource).build();
}
public TopNQuery withThreshold(int threshold)
{
return new TopNQuery(
getDataSource(),
virtualColumns,
dimensionSpec,
topNMetricSpec,
threshold,
getQuerySegmentSpec(),
dimFilter,
granularity,
aggregatorSpecs,
postAggregatorSpecs,
getContext()
);
return new TopNQueryBuilder(this).threshold(threshold).build();
}
public TopNQuery withOverriddenContext(Map<String, Object> contextOverrides)
{
return new TopNQuery(
getDataSource(),
virtualColumns,
dimensionSpec,
topNMetricSpec,
threshold,
getQuerySegmentSpec(),
dimFilter,
granularity,
aggregatorSpecs,
postAggregatorSpecs,
computeOverridenContext(contextOverrides)
);
return new TopNQueryBuilder(this).context(computeOverriddenContext(getContext(), contextOverrides)).build();
}
public TopNQuery withDimFilter(DimFilter dimFilter)
{
return new TopNQuery(
getDataSource(),
virtualColumns,
getDimensionSpec(),
topNMetricSpec,
threshold,
getQuerySegmentSpec(),
dimFilter,
granularity,
aggregatorSpecs,
postAggregatorSpecs,
getContext()
);
return new TopNQueryBuilder(this).filters(dimFilter).build();
}
@Override

View File

@ -23,6 +23,7 @@ import com.google.common.collect.Lists;
import io.druid.java.util.common.granularity.Granularities;
import io.druid.java.util.common.granularity.Granularity;
import io.druid.query.DataSource;
import io.druid.query.QueryMetrics;
import io.druid.query.TableDataSource;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
@ -75,6 +76,7 @@ public class TopNQueryBuilder
private List<AggregatorFactory> aggregatorSpecs;
private List<PostAggregator> postAggregatorSpecs;
private Map<String, Object> context;
private QueryMetrics<?> queryMetrics;
public TopNQueryBuilder()
{
@ -328,4 +330,10 @@ public class TopNQueryBuilder
context = c;
return this;
}
public TopNQueryBuilder queryMetrics(QueryMetrics<?> m)
{
queryMetrics = m;
return this;
}
}

View File

@ -78,7 +78,7 @@ public class QueryContextsTest
getDataSource(),
getQuerySegmentSpec(),
isDescending(),
computeOverridenContext(contextOverride)
BaseQuery.computeOverriddenContext(getContext(), contextOverride)
);
}
}

View File

@ -2710,11 +2710,11 @@ public class GroupByQueryRunnerTest
TestHelper.assertExpectedObjects(expectedResults, mergeRunner.run(query, context), "no-limit");
TestHelper.assertExpectedObjects(
Iterables.limit(expectedResults, 5), mergeRunner.run(builder.limit(5).build(), context), "limited"
Iterables.limit(expectedResults, 5), mergeRunner.run(builder.setLimit(5).build(), context), "limited"
);
// Now try it with an expression based aggregator.
builder.limit(Integer.MAX_VALUE)
builder.setLimit(Integer.MAX_VALUE)
.setAggregatorSpecs(
Arrays.asList(
QueryRunnerTestHelper.rowsCount,
@ -2737,11 +2737,11 @@ public class GroupByQueryRunnerTest
TestHelper.assertExpectedObjects(expectedResults, mergeRunner.run(builder.build(), context), "no-limit");
TestHelper.assertExpectedObjects(
Iterables.limit(expectedResults, 5), mergeRunner.run(builder.limit(5).build(), context), "limited"
Iterables.limit(expectedResults, 5), mergeRunner.run(builder.setLimit(5).build(), context), "limited"
);
// Now try it with an expression virtual column.
builder.limit(Integer.MAX_VALUE)
builder.setLimit(Integer.MAX_VALUE)
.setVirtualColumns(
new ExpressionVirtualColumn("expr", "index / 2 + indexMin")
)
@ -2754,7 +2754,7 @@ public class GroupByQueryRunnerTest
TestHelper.assertExpectedObjects(expectedResults, mergeRunner.run(builder.build(), context), "no-limit");
TestHelper.assertExpectedObjects(
Iterables.limit(expectedResults, 5), mergeRunner.run(builder.limit(5).build(), context), "limited"
Iterables.limit(expectedResults, 5), mergeRunner.run(builder.setLimit(5).build(), context), "limited"
);
}
@ -2794,7 +2794,7 @@ public class GroupByQueryRunnerTest
QueryRunner<Row> mergeRunner = factory.getToolchest().mergeResults(runner);
TestHelper.assertExpectedObjects(expectedResults, mergeRunner.run(query, context), "no-limit");
TestHelper.assertExpectedObjects(
Iterables.limit(expectedResults, 5), mergeRunner.run(builder.limit(5).build(), context), "limited"
Iterables.limit(expectedResults, 5), mergeRunner.run(builder.setLimit(5).build(), context), "limited"
);
}
@ -2835,7 +2835,7 @@ public class GroupByQueryRunnerTest
QueryRunner<Row> mergeRunner = factory.getToolchest().mergeResults(runner);
TestHelper.assertExpectedObjects(expectedResults, mergeRunner.run(query, context), "no-limit");
TestHelper.assertExpectedObjects(
Iterables.limit(expectedResults, 5), mergeRunner.run(builder.limit(5).build(), context), "limited"
Iterables.limit(expectedResults, 5), mergeRunner.run(builder.setLimit(5).build(), context), "limited"
);
}
@ -2875,7 +2875,7 @@ public class GroupByQueryRunnerTest
QueryRunner<Row> mergeRunner = factory.getToolchest().mergeResults(runner);
TestHelper.assertExpectedObjects(expectedResults, mergeRunner.run(query, context), "no-limit");
TestHelper.assertExpectedObjects(
Iterables.limit(expectedResults, 5), mergeRunner.run(builder.limit(5).build(), context), "limited"
Iterables.limit(expectedResults, 5), mergeRunner.run(builder.setLimit(5).build(), context), "limited"
);
}