From ee9b5a619a9c265a31112e68c2a5eeaf4ff6c6e7 Mon Sep 17 00:00:00 2001 From: Roman Leventov Date: Wed, 26 Apr 2017 00:32:02 +0300 Subject: [PATCH] Fix bugs in query builders and in TimeBoundaryQuery.getFilter() (#4131) * Add queryMetrics property to Query interface; Fix bugs and removed unused code in Druids * Fix a bug in TimeBoundaryQuery.getFilter() and remove TimeBoundaryQuery.getDimensionsFilter() * Don't reassign query's queryMetrics if already present in CPUTimeMetricQueryRunner and MetricsEmittingQueryRunner * Add compatibility constructor to BaseQuery * Remove Query.queryMetrics property * Move nullToNoopLimitSpec() method to LimitSpec interface * Rename GroupByQuery.applyLimit() to postProcess(); Fix inconsistencies in GroupByQuery.Builder --- .../java/io/druid/query/scan/ScanQuery.java | 57 +-- .../main/java/io/druid/query/BaseQuery.java | 8 +- .../druid/query/CPUTimeMetricQueryRunner.java | 5 +- .../src/main/java/io/druid/query/Druids.java | 104 +++-- .../query/MetricsEmittingQueryRunner.java | 12 +- .../DataSourceMetadataQuery.java | 23 +- .../io/druid/query/groupby/GroupByQuery.java | 384 ++++++------------ .../query/groupby/orderby/LimitSpec.java | 6 + .../query/groupby/orderby/NoopLimitSpec.java | 15 +- .../groupby/strategy/GroupByStrategyV1.java | 44 +- .../groupby/strategy/GroupByStrategyV2.java | 34 +- .../metadata/SegmentMetadataQuery.java | 46 +-- .../query/search/search/SearchQuery.java | 62 +-- .../io/druid/query/select/SelectQuery.java | 67 +-- .../query/timeboundary/TimeBoundaryQuery.java | 35 +- .../TimeBoundaryQueryRunnerFactory.java | 5 +- .../query/timeseries/TimeseriesQuery.java | 69 +--- .../java/io/druid/query/topn/TopNQuery.java | 113 +----- .../io/druid/query/topn/TopNQueryBuilder.java | 8 + .../io/druid/query/QueryContextsTest.java | 2 +- .../query/groupby/GroupByQueryRunnerTest.java | 16 +- 21 files changed, 317 insertions(+), 798 deletions(-) diff --git a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQuery.java b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQuery.java index df6a4079d5c..a526868c363 100644 --- a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQuery.java +++ b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQuery.java @@ -125,60 +125,24 @@ public class ScanQuery extends BaseQuery @Override public Query withQuerySegmentSpec(QuerySegmentSpec querySegmentSpec) { - return new ScanQuery( - getDataSource(), - querySegmentSpec, - resultFormat, - batchSize, - limit, - dimFilter, - columns, - getContext() - ); + return ScanQueryBuilder.copy(this).intervals(querySegmentSpec).build(); } @Override public Query withDataSource(DataSource dataSource) { - return new ScanQuery( - dataSource, - getQuerySegmentSpec(), - resultFormat, - batchSize, - limit, - dimFilter, - columns, - getContext() - ); + return ScanQueryBuilder.copy(this).dataSource(dataSource).build(); } @Override public Query withOverriddenContext(Map contextOverrides) { - return new ScanQuery( - getDataSource(), - getQuerySegmentSpec(), - resultFormat, - batchSize, - limit, - dimFilter, - columns, - computeOverridenContext(contextOverrides) - ); + return ScanQueryBuilder.copy(this).context(computeOverriddenContext(getContext(), contextOverrides)).build(); } public ScanQuery withDimFilter(DimFilter dimFilter) { - return new ScanQuery( - getDataSource(), - getQuerySegmentSpec(), - resultFormat, - batchSize, - limit, - dimFilter, - columns, - getContext() - ); + return ScanQueryBuilder.copy(this).filters(dimFilter).build(); } @Override @@ -290,12 +254,17 @@ public class ScanQuery extends BaseQuery ); } - public ScanQueryBuilder copy(ScanQueryBuilder builder) + public static ScanQueryBuilder copy(ScanQuery query) { return new ScanQueryBuilder() - .dataSource(builder.dataSource) - .intervals(builder.querySegmentSpec) - .context(builder.context); + .dataSource(query.getDataSource()) + .intervals(query.getQuerySegmentSpec()) + .resultFormat(query.getResultFormat()) + .batchSize(query.getBatchSize()) + .limit(query.getLimit()) + .filters(query.getFilter()) + .columns(query.getColumns()) + .context(query.getContext()); } public ScanQueryBuilder dataSource(String ds) diff --git a/processing/src/main/java/io/druid/query/BaseQuery.java b/processing/src/main/java/io/druid/query/BaseQuery.java index 0186e00075a..7900474505f 100644 --- a/processing/src/main/java/io/druid/query/BaseQuery.java +++ b/processing/src/main/java/io/druid/query/BaseQuery.java @@ -145,10 +145,12 @@ public abstract class BaseQuery> implements Query return QueryContexts.parseBoolean(this, key, defaultValue); } - protected Map computeOverridenContext(Map overrides) + protected static Map computeOverriddenContext( + final Map context, + final Map overrides + ) { Map overridden = Maps.newTreeMap(); - final Map context = getContext(); if (context != null) { overridden.putAll(context); } @@ -173,7 +175,7 @@ public abstract class BaseQuery> implements Query @Override public Query withId(String id) { - return withOverriddenContext(ImmutableMap.of(QUERYID, id)); + return withOverriddenContext(ImmutableMap.of(QUERYID, id)); } @Override diff --git a/processing/src/main/java/io/druid/query/CPUTimeMetricQueryRunner.java b/processing/src/main/java/io/druid/query/CPUTimeMetricQueryRunner.java index 7aefbb7ad47..0d42ab983d4 100644 --- a/processing/src/main/java/io/druid/query/CPUTimeMetricQueryRunner.java +++ b/processing/src/main/java/io/druid/query/CPUTimeMetricQueryRunner.java @@ -58,9 +58,7 @@ public class CPUTimeMetricQueryRunner implements QueryRunner @Override - public Sequence run( - final Query query, final Map responseContext - ) + public Sequence run(final Query query, final Map responseContext) { final Sequence baseSequence = delegate.run(query, responseContext); return Sequences.wrap( @@ -91,7 +89,6 @@ public class CPUTimeMetricQueryRunner implements QueryRunner } ); } - public static QueryRunner safeBuild( QueryRunner delegate, QueryToolChest> queryToolChest, diff --git a/processing/src/main/java/io/druid/query/Druids.java b/processing/src/main/java/io/druid/query/Druids.java index a835394052f..da81fa9a80a 100644 --- a/processing/src/main/java/io/druid/query/Druids.java +++ b/processing/src/main/java/io/druid/query/Druids.java @@ -367,32 +367,20 @@ public class Druids ); } - public TimeseriesQueryBuilder copy(TimeseriesQuery query) + public static TimeseriesQueryBuilder copy(TimeseriesQuery query) { return new TimeseriesQueryBuilder() .dataSource(query.getDataSource()) - .intervals(query.getIntervals()) - .filters(query.getDimensionsFilter()) + .intervals(query.getQuerySegmentSpec()) .descending(query.isDescending()) + .virtualColumns(query.getVirtualColumns()) + .filters(query.getDimensionsFilter()) .granularity(query.getGranularity()) .aggregators(query.getAggregatorSpecs()) .postAggregators(query.getPostAggregatorSpecs()) .context(query.getContext()); } - public TimeseriesQueryBuilder copy(TimeseriesQueryBuilder builder) - { - return new TimeseriesQueryBuilder() - .dataSource(builder.dataSource) - .intervals(builder.querySegmentSpec) - .filters(builder.dimFilter) - .descending(builder.descending) - .granularity(builder.granularity) - .aggregators(builder.aggregatorSpecs) - .postAggregators(builder.postAggregatorSpecs) - .context(builder.context); - } - public DataSource getDataSource() { return dataSource; @@ -579,6 +567,7 @@ public class Druids querySegmentSpec = null; dimensions = null; querySpec = null; + sortSpec = null; context = null; } @@ -597,32 +586,20 @@ public class Druids ); } - public SearchQueryBuilder copy(SearchQuery query) + public static SearchQueryBuilder copy(SearchQuery query) { return new SearchQueryBuilder() .dataSource(query.getDataSource()) - .intervals(query.getQuerySegmentSpec()) .filters(query.getDimensionsFilter()) .granularity(query.getGranularity()) .limit(query.getLimit()) + .intervals(query.getQuerySegmentSpec()) .dimensions(query.getDimensions()) .query(query.getQuery()) + .sortSpec(query.getSort()) .context(query.getContext()); } - public SearchQueryBuilder copy(SearchQueryBuilder builder) - { - return new SearchQueryBuilder() - .dataSource(builder.dataSource) - .intervals(builder.querySegmentSpec) - .filters(builder.dimFilter) - .granularity(builder.granularity) - .limit(builder.limit) - .dimensions(builder.dimensions) - .query(builder.querySpec) - .context(builder.context); - } - public SearchQueryBuilder dataSource(String d) { dataSource = new TableDataSource(d); @@ -819,14 +796,14 @@ public class Druids ); } - public TimeBoundaryQueryBuilder copy(TimeBoundaryQueryBuilder builder) + public static TimeBoundaryQueryBuilder copy(TimeBoundaryQuery query) { return new TimeBoundaryQueryBuilder() - .dataSource(builder.dataSource) - .intervals(builder.querySegmentSpec) - .bound(builder.bound) - .filters(builder.dimFilter) - .context(builder.context); + .dataSource(query.getDataSource()) + .intervals(query.getQuerySegmentSpec()) + .bound(query.getBound()) + .filters(query.getFilter()) + .context(query.getContext()); } public TimeBoundaryQueryBuilder dataSource(String ds) @@ -993,8 +970,8 @@ public class Druids toInclude = null; analysisTypes = null; merge = null; - context = null; lenientAggregatorMerge = null; + context = null; } public SegmentMetadataQuery build() @@ -1011,20 +988,16 @@ public class Druids ); } - public SegmentMetadataQueryBuilder copy(SegmentMetadataQueryBuilder builder) + public static SegmentMetadataQueryBuilder copy(SegmentMetadataQuery query) { - final SegmentMetadataQuery.AnalysisType[] analysisTypesArray = - analysisTypes != null - ? analysisTypes.toArray(new SegmentMetadataQuery.AnalysisType[analysisTypes.size()]) - : null; return new SegmentMetadataQueryBuilder() - .dataSource(builder.dataSource) - .intervals(builder.querySegmentSpec) - .toInclude(toInclude) - .analysisTypes(analysisTypesArray) - .merge(merge) - .lenientAggregatorMerge(lenientAggregatorMerge) - .context(builder.context); + .dataSource(query.getDataSource()) + .intervals(query.getQuerySegmentSpec()) + .toInclude(query.getToInclude()) + .analysisTypes(query.getAnalysisTypes()) + .merge(query.isMerge()) + .lenientAggregatorMerge(query.isLenientAggregatorMerge()) + .context(query.getContext()); } public SegmentMetadataQueryBuilder dataSource(String ds) @@ -1075,6 +1048,12 @@ public class Druids return this; } + public SegmentMetadataQueryBuilder analysisTypes(EnumSet analysisTypes) + { + this.analysisTypes = analysisTypes; + return this; + } + public SegmentMetadataQueryBuilder merge(boolean merge) { this.merge = merge; @@ -1131,11 +1110,13 @@ public class Druids { dataSource = null; querySegmentSpec = null; + descending = false; context = null; dimFilter = null; granularity = Granularities.ALL; dimensions = Lists.newArrayList(); metrics = Lists.newArrayList(); + virtualColumns = null; pagingSpec = null; } @@ -1155,12 +1136,19 @@ public class Druids ); } - public SelectQueryBuilder copy(SelectQueryBuilder builder) + public static SelectQueryBuilder copy(SelectQuery query) { return new SelectQueryBuilder() - .dataSource(builder.dataSource) - .intervals(builder.querySegmentSpec) - .context(builder.context); + .dataSource(query.getDataSource()) + .intervals(query.getQuerySegmentSpec()) + .descending(query.isDescending()) + .filters(query.getFilter()) + .granularity(query.getGranularity()) + .dimensionSpecs(query.getDimensions()) + .metrics(query.getMetrics()) + .virtualColumns(query.getVirtualColumns()) + .pagingSpec(query.getPagingSpec()) + .context(query.getContext()); } public SelectQueryBuilder dataSource(String ds) @@ -1317,12 +1305,12 @@ public class Druids ); } - public DataSourceMetadataQueryBuilder copy(DataSourceMetadataQueryBuilder builder) + public static DataSourceMetadataQueryBuilder copy(DataSourceMetadataQuery query) { return new DataSourceMetadataQueryBuilder() - .dataSource(builder.dataSource) - .intervals(builder.querySegmentSpec) - .context(builder.context); + .dataSource(query.getDataSource()) + .intervals(query.getQuerySegmentSpec()) + .context(query.getContext()); } public DataSourceMetadataQueryBuilder dataSource(String ds) diff --git a/processing/src/main/java/io/druid/query/MetricsEmittingQueryRunner.java b/processing/src/main/java/io/druid/query/MetricsEmittingQueryRunner.java index 096fb39d45f..1f4041973b4 100644 --- a/processing/src/main/java/io/druid/query/MetricsEmittingQueryRunner.java +++ b/processing/src/main/java/io/druid/query/MetricsEmittingQueryRunner.java @@ -19,7 +19,6 @@ package io.druid.query; -import com.google.common.base.Supplier; import com.metamx.emitter.service.ServiceEmitter; import io.druid.java.util.common.guava.LazySequence; import io.druid.java.util.common.guava.Sequence; @@ -91,15 +90,8 @@ public class MetricsEmittingQueryRunner implements QueryRunner return Sequences.wrap( // Use LazySequence because want to account execution time of queryRunner.run() (it prepares the underlying // Sequence) as part of the reported query time, i. e. we want to execute queryRunner.run() after - // `startTime = System.currentTimeMillis();` (see below). - new LazySequence<>(new Supplier>() - { - @Override - public Sequence get() - { - return queryRunner.run(query, responseContext); - } - }), + // `startTime = System.nanoTime();` (see below). + new LazySequence<>(() -> queryRunner.run(query, responseContext)), new SequenceWrapper() { private long startTimeNs; diff --git a/processing/src/main/java/io/druid/query/datasourcemetadata/DataSourceMetadataQuery.java b/processing/src/main/java/io/druid/query/datasourcemetadata/DataSourceMetadataQuery.java index 186fcaf2f6e..e987fe2c4fb 100644 --- a/processing/src/main/java/io/druid/query/datasourcemetadata/DataSourceMetadataQuery.java +++ b/processing/src/main/java/io/druid/query/datasourcemetadata/DataSourceMetadataQuery.java @@ -25,6 +25,7 @@ import com.google.common.collect.Lists; import io.druid.common.utils.JodaUtils; import io.druid.query.BaseQuery; import io.druid.query.DataSource; +import io.druid.query.Druids; import io.druid.query.Query; import io.druid.query.Result; import io.druid.query.filter.DimFilter; @@ -34,6 +35,7 @@ import org.joda.time.DateTime; import org.joda.time.Interval; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; @@ -54,7 +56,7 @@ public class DataSourceMetadataQuery extends BaseQuery contextOverrides) { - return new DataSourceMetadataQuery( - getDataSource(), - getQuerySegmentSpec(), - computeOverridenContext(contextOverrides) - ); + Map newContext = computeOverriddenContext(getContext(), contextOverrides); + return Druids.DataSourceMetadataQueryBuilder.copy(this).context(newContext).build(); } @Override public DataSourceMetadataQuery withQuerySegmentSpec(QuerySegmentSpec spec) { - return new DataSourceMetadataQuery( - getDataSource(), - spec, - getContext() - ); + return Druids.DataSourceMetadataQueryBuilder.copy(this).intervals(spec).build(); } @Override public Query> withDataSource(DataSource dataSource) { - return new DataSourceMetadataQuery( - dataSource, - getQuerySegmentSpec(), - getContext() - ); + return Druids.DataSourceMetadataQueryBuilder.copy(this).dataSource(dataSource).build(); } public Iterable> buildResult(DateTime timestamp, DateTime maxIngestedEventTime) diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java b/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java index 9184ad38689..553e8a2a95c 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java @@ -25,7 +25,6 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Function; import com.google.common.base.Functions; import com.google.common.base.Preconditions; -import com.google.common.base.Predicate; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Ordering; @@ -61,6 +60,8 @@ import io.druid.segment.VirtualColumns; import io.druid.segment.column.Column; import org.joda.time.Interval; +import javax.annotation.Nullable; +import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; import java.util.List; @@ -76,17 +77,10 @@ public class GroupByQuery extends BaseQuery private final static Comparator NATURAL_NULLS_FIRST = Ordering.natural().nullsFirst(); - private final static Comparator NON_GRANULAR_TIME_COMP = new Comparator() - { - @Override - public int compare(Row lhs, Row rhs) - { - return Longs.compare( - lhs.getTimestampFromEpoch(), - rhs.getTimestampFromEpoch() - ); - } - }; + private final static Comparator NON_GRANULAR_TIME_COMP = (Row lhs, Row rhs) -> Longs.compare( + lhs.getTimestampFromEpoch(), + rhs.getTimestampFromEpoch() + ); public static Builder builder() { @@ -102,7 +96,7 @@ public class GroupByQuery extends BaseQuery private final List aggregatorSpecs; private final List postAggregatorSpecs; - private final Function, Sequence> limitFn; + private final Function, Sequence> postProcessingFn; @JsonCreator public GroupByQuery( @@ -118,12 +112,64 @@ public class GroupByQuery extends BaseQuery @JsonProperty("limitSpec") LimitSpec limitSpec, @JsonProperty("context") Map context ) + { + this( + dataSource, + querySegmentSpec, + virtualColumns, + dimFilter, + granularity, + dimensions, + aggregatorSpecs, + postAggregatorSpecs, + havingSpec, + limitSpec, + null, + context + ); + } + + private Function, Sequence> makePostProcessingFn() + { + Function, Sequence> postProcessingFn = + limitSpec.build(dimensions, aggregatorSpecs, postAggregatorSpecs); + + if (havingSpec != null) { + postProcessingFn = Functions.compose( + postProcessingFn, + (Sequence input) -> { + havingSpec.setRowSignature(GroupByQueryHelper.rowSignatureFor(GroupByQuery.this)); + return Sequences.filter(input, havingSpec::eval); + } + ); + } + return postProcessingFn; + } + + /** + * A private constructor that avoids recomputing postProcessingFn. + */ + private GroupByQuery( + final DataSource dataSource, + final QuerySegmentSpec querySegmentSpec, + final VirtualColumns virtualColumns, + final DimFilter dimFilter, + final Granularity granularity, + final List dimensions, + final List aggregatorSpecs, + final List postAggregatorSpecs, + final HavingSpec havingSpec, + final LimitSpec limitSpec, + final @Nullable Function, Sequence> postProcessingFn, + final Map context + ) { super(dataSource, querySegmentSpec, false, context); + this.virtualColumns = VirtualColumns.nullToEmpty(virtualColumns); this.dimFilter = dimFilter; this.granularity = granularity; - this.dimensions = dimensions == null ? ImmutableList.of() : dimensions; + this.dimensions = dimensions == null ? ImmutableList.of() : dimensions; for (DimensionSpec spec : this.dimensions) { Preconditions.checkArgument(spec != null, "dimensions has null DimensionSpec"); } @@ -133,77 +179,16 @@ public class GroupByQuery extends BaseQuery postAggregatorSpecs == null ? ImmutableList.of() : postAggregatorSpecs ); this.havingSpec = havingSpec; - this.limitSpec = (limitSpec == null) ? new NoopLimitSpec() : limitSpec; + this.limitSpec = LimitSpec.nullToNoopLimitSpec(limitSpec); Preconditions.checkNotNull(this.granularity, "Must specify a granularity"); - // Verify no duplicate names between dimensions, aggregators, and postAggregators. // They will all end up in the same namespace in the returned Rows and we can't have them clobbering each other. // We're not counting __time, even though that name is problematic. See: https://github.com/druid-io/druid/pull/3684 verifyOutputNames(this.dimensions, this.aggregatorSpecs, this.postAggregatorSpecs); - Function, Sequence> postProcFn = - this.limitSpec.build(this.dimensions, this.aggregatorSpecs, this.postAggregatorSpecs); - - if (havingSpec != null) { - postProcFn = Functions.compose( - postProcFn, - new Function, Sequence>() - { - @Override - public Sequence apply(Sequence input) - { - GroupByQuery.this.havingSpec.setRowSignature(GroupByQueryHelper.rowSignatureFor(GroupByQuery.this)); - return Sequences.filter( - input, - new Predicate() - { - @Override - public boolean apply(Row input) - { - return GroupByQuery.this.havingSpec.eval(input); - } - } - ); - } - } - ); - } - - limitFn = postProcFn; - } - - /** - * A private constructor that avoids all of the various state checks. Used by the with*() methods where the checks - * have already passed in order for the object to exist. - */ - private GroupByQuery( - DataSource dataSource, - QuerySegmentSpec querySegmentSpec, - VirtualColumns virtualColumns, - DimFilter dimFilter, - Granularity granularity, - List dimensions, - List aggregatorSpecs, - List postAggregatorSpecs, - HavingSpec havingSpec, - LimitSpec orderBySpec, - Function, Sequence> limitFn, - Map context - ) - { - super(dataSource, querySegmentSpec, false, context); - - this.virtualColumns = virtualColumns; - this.dimFilter = dimFilter; - this.granularity = granularity; - this.dimensions = dimensions; - this.aggregatorSpecs = aggregatorSpecs; - this.postAggregatorSpecs = postAggregatorSpecs; - this.havingSpec = havingSpec; - this.limitSpec = orderBySpec; - this.limitFn = limitFn; + this.postProcessingFn = postProcessingFn != null ? postProcessingFn : makePostProcessingFn(); } @JsonProperty @@ -284,17 +269,12 @@ public class GroupByQuery extends BaseQuery final Ordering rowOrdering = getRowOrdering(false); return Ordering.from( - new Comparator() - { - @Override - public int compare(Object lhs, Object rhs) - { - if (lhs instanceof Row) { - return rowOrdering.compare((Row) lhs, (Row) rhs); - } else { - // Probably bySegment queries - return NATURAL_NULLS_FIRST.compare(lhs, rhs); - } + (lhs, rhs) -> { + if (lhs instanceof Row) { + return rowOrdering.compare((Row) lhs, (Row) rhs); + } else { + // Probably bySegment queries + return NATURAL_NULLS_FIRST.compare(lhs, rhs); } } ); @@ -307,47 +287,28 @@ public class GroupByQuery extends BaseQuery final Comparator timeComparator = getTimeComparator(granular); if (timeComparator == null) { - return Ordering.from( - new Comparator() - { - @Override - public int compare(Row lhs, Row rhs) - { - return compareDims(dimensions, lhs, rhs); - } - } - ); + return Ordering.from((lhs, rhs) -> compareDims(dimensions, lhs, rhs)); } else if (sortByDimsFirst) { return Ordering.from( - new Comparator() - { - @Override - public int compare(Row lhs, Row rhs) - { - final int cmp = compareDims(dimensions, lhs, rhs); - if (cmp != 0) { - return cmp; - } - - return timeComparator.compare(lhs, rhs); + (lhs, rhs) -> { + final int cmp = compareDims(dimensions, lhs, rhs); + if (cmp != 0) { + return cmp; } + + return timeComparator.compare(lhs, rhs); } ); } else { return Ordering.from( - new Comparator() - { - @Override - public int compare(Row lhs, Row rhs) - { - final int timeCompare = timeComparator.compare(lhs, rhs); + (lhs, rhs) -> { + final int timeCompare = timeComparator.compare(lhs, rhs); - if (timeCompare != 0) { - return timeCompare; - } - - return compareDims(dimensions, lhs, rhs); + if (timeCompare != 0) { + return timeCompare; } + + return compareDims(dimensions, lhs, rhs); } ); } @@ -358,17 +319,10 @@ public class GroupByQuery extends BaseQuery if (Granularities.ALL.equals(granularity)) { return null; } else if (granular) { - return new Comparator() - { - @Override - public int compare(Row lhs, Row rhs) - { - return Longs.compare( - granularity.bucketStart(lhs.getTimestamp()).getMillis(), - granularity.bucketStart(rhs.getTimestamp()).getMillis() - ); - } - }; + return (lhs, rhs) -> Longs.compare( + granularity.bucketStart(lhs.getTimestamp()).getMillis(), + granularity.bucketStart(rhs.getTimestamp()).getMillis() + ); } else { return NON_GRANULAR_TIME_COMP; } @@ -398,155 +352,52 @@ public class GroupByQuery extends BaseQuery * * @return sequence of rows after applying havingSpec and limitSpec */ - public Sequence applyLimit(Sequence results) + public Sequence postProcess(Sequence results) { - return limitFn.apply(results); + return postProcessingFn.apply(results); } @Override public GroupByQuery withOverriddenContext(Map contextOverride) { - return new GroupByQuery( - getDataSource(), - getQuerySegmentSpec(), - virtualColumns, - dimFilter, - granularity, - dimensions, - aggregatorSpecs, - postAggregatorSpecs, - havingSpec, - limitSpec, - limitFn, - computeOverridenContext(contextOverride) - ); + return new Builder(this).overrideContext(contextOverride).build(); } @Override public GroupByQuery withQuerySegmentSpec(QuerySegmentSpec spec) { - return new GroupByQuery( - getDataSource(), - spec, - virtualColumns, - dimFilter, - granularity, - dimensions, - aggregatorSpecs, - postAggregatorSpecs, - havingSpec, - limitSpec, - limitFn, - getContext() - ); + return new Builder(this).setQuerySegmentSpec(spec).build(); } public GroupByQuery withDimFilter(final DimFilter dimFilter) { - return new GroupByQuery( - getDataSource(), - getQuerySegmentSpec(), - virtualColumns, - dimFilter, - getGranularity(), - getDimensions(), - getAggregatorSpecs(), - getPostAggregatorSpecs(), - getHavingSpec(), - getLimitSpec(), - limitFn, - getContext() - ); + return new Builder(this).setDimFilter(dimFilter).build(); } @Override public Query withDataSource(DataSource dataSource) { - return new GroupByQuery( - dataSource, - getQuerySegmentSpec(), - virtualColumns, - dimFilter, - granularity, - dimensions, - aggregatorSpecs, - postAggregatorSpecs, - havingSpec, - limitSpec, - limitFn, - getContext() - ); + return new Builder(this).setDataSource(dataSource).build(); } public GroupByQuery withDimensionSpecs(final List dimensionSpecs) { - return new GroupByQuery( - getDataSource(), - getQuerySegmentSpec(), - virtualColumns, - getDimFilter(), - getGranularity(), - dimensionSpecs, - getAggregatorSpecs(), - getPostAggregatorSpecs(), - getHavingSpec(), - getLimitSpec(), - limitFn, - getContext() - ); + return new Builder(this).setDimensions(dimensionSpecs).build(); } - public GroupByQuery withLimitSpec(final LimitSpec limitSpec) + public GroupByQuery withLimitSpec(LimitSpec limitSpec) { - return new GroupByQuery( - getDataSource(), - getQuerySegmentSpec(), - virtualColumns, - getDimFilter(), - getGranularity(), - getDimensions(), - getAggregatorSpecs(), - getPostAggregatorSpecs(), - getHavingSpec(), - limitSpec, - getContext() - ); + return new Builder(this).setLimitSpec(limitSpec).build(); } public GroupByQuery withAggregatorSpecs(final List aggregatorSpecs) { - return new GroupByQuery( - getDataSource(), - getQuerySegmentSpec(), - virtualColumns, - getDimFilter(), - getGranularity(), - getDimensions(), - aggregatorSpecs, - getPostAggregatorSpecs(), - getHavingSpec(), - getLimitSpec(), - limitFn, - getContext() - ); + return new Builder(this).setAggregatorSpecs(aggregatorSpecs).build(); } public GroupByQuery withPostAggregatorSpecs(final List postAggregatorSpecs) { - return new GroupByQuery( - getDataSource(), - getQuerySegmentSpec(), - virtualColumns, - getDimFilter(), - getGranularity(), - getDimensions(), - getAggregatorSpecs(), - postAggregatorSpecs, - getHavingSpec(), - getLimitSpec(), - limitFn, - getContext() - ); + return new Builder(this).setPostAggregatorSpecs(postAggregatorSpecs).build(); } private static void verifyOutputNames( @@ -597,6 +448,7 @@ public class GroupByQuery extends BaseQuery private Map context; private LimitSpec limitSpec = null; + private Function, Sequence> postProcessingFn; private List orderByColumnSpecs = Lists.newArrayList(); private int limit = Integer.MAX_VALUE; @@ -609,13 +461,14 @@ public class GroupByQuery extends BaseQuery dataSource = query.getDataSource(); querySegmentSpec = query.getQuerySegmentSpec(); virtualColumns = query.getVirtualColumns(); - limitSpec = query.getLimitSpec(); dimFilter = query.getDimFilter(); granularity = query.getGranularity(); dimensions = query.getDimensions(); aggregatorSpecs = query.getAggregatorSpecs(); postAggregatorSpecs = query.getPostAggregatorSpecs(); havingSpec = query.getHavingSpec(); + limitSpec = query.getLimitSpec(); + postProcessingFn = query.postProcessingFn; context = query.getContext(); } @@ -624,15 +477,16 @@ public class GroupByQuery extends BaseQuery dataSource = builder.dataSource; querySegmentSpec = builder.querySegmentSpec; virtualColumns = builder.virtualColumns; - limitSpec = builder.limitSpec; dimFilter = builder.dimFilter; granularity = builder.granularity; dimensions = builder.dimensions; aggregatorSpecs = builder.aggregatorSpecs; postAggregatorSpecs = builder.postAggregatorSpecs; havingSpec = builder.havingSpec; + limitSpec = builder.limitSpec; + postProcessingFn = builder.postProcessingFn; limit = builder.limit; - + orderByColumnSpecs = new ArrayList<>(builder.orderByColumnSpecs); context = builder.context; } @@ -692,16 +546,17 @@ public class GroupByQuery extends BaseQuery return this; } - public Builder limit(int limit) + public Builder setLimit(int limit) { - ensureExplicitLimitNotSet(); + ensureExplicitLimitSpecNotSet(); this.limit = limit; + this.postProcessingFn = null; return this; } public Builder addOrderByColumn(String dimension) { - return addOrderByColumn(dimension, (OrderByColumnSpec.Direction) null); + return addOrderByColumn(dimension, null); } public Builder addOrderByColumn(String dimension, OrderByColumnSpec.Direction direction) @@ -711,19 +566,22 @@ public class GroupByQuery extends BaseQuery public Builder addOrderByColumn(OrderByColumnSpec columnSpec) { - ensureExplicitLimitNotSet(); + ensureExplicitLimitSpecNotSet(); this.orderByColumnSpecs.add(columnSpec); + this.postProcessingFn = null; return this; } public Builder setLimitSpec(LimitSpec limitSpec) { + Preconditions.checkNotNull(limitSpec); ensureFluentLimitsNotSet(); this.limitSpec = limitSpec; + this.postProcessingFn = null; return this; } - private void ensureExplicitLimitNotSet() + private void ensureExplicitLimitSpecNotSet() { if (limitSpec != null) { throw new ISE("Ambiguous build, limitSpec[%s] already set", limitSpec); @@ -772,12 +630,14 @@ public class GroupByQuery extends BaseQuery } dimensions.add(dimension); + this.postProcessingFn = null; return this; } public Builder setDimensions(List dimensions) { this.dimensions = Lists.newArrayList(dimensions); + this.postProcessingFn = null; return this; } @@ -788,12 +648,14 @@ public class GroupByQuery extends BaseQuery } aggregatorSpecs.add(aggregator); + this.postProcessingFn = null; return this; } public Builder setAggregatorSpecs(List aggregatorSpecs) { this.aggregatorSpecs = Lists.newArrayList(aggregatorSpecs); + this.postProcessingFn = null; return this; } @@ -804,12 +666,14 @@ public class GroupByQuery extends BaseQuery } postAggregatorSpecs.add(postAgg); + this.postProcessingFn = null; return this; } public Builder setPostAggregatorSpecs(List postAggregatorSpecs) { this.postAggregatorSpecs = Lists.newArrayList(postAggregatorSpecs); + this.postProcessingFn = null; return this; } @@ -819,17 +683,16 @@ public class GroupByQuery extends BaseQuery return this; } - public Builder setHavingSpec(HavingSpec havingSpec) + public Builder overrideContext(Map contextOverride) { - this.havingSpec = havingSpec; - + this.context = computeOverriddenContext(context, contextOverride); return this; } - public Builder setLimit(Integer limit) + public Builder setHavingSpec(HavingSpec havingSpec) { - this.limit = limit; - + this.havingSpec = havingSpec; + this.postProcessingFn = null; return this; } @@ -843,7 +706,7 @@ public class GroupByQuery extends BaseQuery final LimitSpec theLimitSpec; if (limitSpec == null) { if (orderByColumnSpecs.isEmpty() && limit == Integer.MAX_VALUE) { - theLimitSpec = new NoopLimitSpec(); + theLimitSpec = NoopLimitSpec.instance(); } else { theLimitSpec = new DefaultLimitSpec(orderByColumnSpecs, limit); } @@ -862,6 +725,7 @@ public class GroupByQuery extends BaseQuery postAggregatorSpecs, havingSpec, theLimitSpec, + postProcessingFn, context ); } diff --git a/processing/src/main/java/io/druid/query/groupby/orderby/LimitSpec.java b/processing/src/main/java/io/druid/query/groupby/orderby/LimitSpec.java index 9581dc35e7a..4638e6a49dc 100644 --- a/processing/src/main/java/io/druid/query/groupby/orderby/LimitSpec.java +++ b/processing/src/main/java/io/druid/query/groupby/orderby/LimitSpec.java @@ -29,6 +29,7 @@ import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.PostAggregator; import io.druid.query.dimension.DimensionSpec; +import javax.annotation.Nullable; import java.util.List; /** @@ -39,6 +40,11 @@ import java.util.List; }) public interface LimitSpec extends Cacheable { + static LimitSpec nullToNoopLimitSpec(@Nullable LimitSpec limitSpec) + { + return (limitSpec == null) ? NoopLimitSpec.instance() : limitSpec; + } + /** * Returns a function that applies a limit to an input sequence that is assumed to be sorted on dimensions. * diff --git a/processing/src/main/java/io/druid/query/groupby/orderby/NoopLimitSpec.java b/processing/src/main/java/io/druid/query/groupby/orderby/NoopLimitSpec.java index c8d770e87d5..b14224d4cbb 100644 --- a/processing/src/main/java/io/druid/query/groupby/orderby/NoopLimitSpec.java +++ b/processing/src/main/java/io/druid/query/groupby/orderby/NoopLimitSpec.java @@ -19,6 +19,7 @@ package io.druid.query.groupby.orderby; +import com.fasterxml.jackson.annotation.JsonCreator; import com.google.common.base.Function; import com.google.common.base.Functions; import io.druid.data.input.Row; @@ -31,10 +32,22 @@ import java.util.List; /** */ -public class NoopLimitSpec implements LimitSpec +public final class NoopLimitSpec implements LimitSpec { private static final byte CACHE_KEY = 0x0; + public static final NoopLimitSpec INSTANCE = new NoopLimitSpec(); + + @JsonCreator + public static NoopLimitSpec instance() + { + return INSTANCE; + } + + private NoopLimitSpec() + { + } + @Override public Function, Sequence> build( List dimensions, List aggs, List postAggs diff --git a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV1.java b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV1.java index 9b2c9163818..4ba495356ba 100644 --- a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV1.java +++ b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV1.java @@ -47,6 +47,7 @@ import io.druid.query.groupby.GroupByQueryConfig; import io.druid.query.groupby.GroupByQueryEngine; import io.druid.query.groupby.GroupByQueryHelper; import io.druid.query.groupby.GroupByQueryQueryToolChest; +import io.druid.query.groupby.orderby.NoopLimitSpec; import io.druid.query.groupby.resource.GroupByQueryResource; import io.druid.query.spec.MultipleIntervalSegmentSpec; import io.druid.segment.StorageAdapter; @@ -119,38 +120,31 @@ public class GroupByStrategyV1 implements GroupByStrategy configSupplier.get(), bufferPool, baseRunner.run( - new GroupByQuery( - query.getDataSource(), - query.getQuerySegmentSpec(), - query.getVirtualColumns(), - query.getDimFilter(), - query.getGranularity(), - query.getDimensions(), - query.getAggregatorSpecs(), + new GroupByQuery.Builder(query) // Don't do post aggs until the end of this method. - ImmutableList.of(), + .setPostAggregatorSpecs(ImmutableList.of()) // Don't do "having" clause until the end of this method. - null, - null, - query.getContext() - ).withOverriddenContext( - ImmutableMap.of( - "finalize", false, - //setting sort to false avoids unnecessary sorting while merging results. we only need to sort - //in the end when returning results to user. (note this is only respected by groupBy v1) - GroupByQueryHelper.CTX_KEY_SORT_RESULTS, false, - //no merging needed at historicals because GroupByQueryRunnerFactory.mergeRunners(..) would return - //merged results. (note this is only respected by groupBy v1) - GroupByQueryQueryToolChest.GROUP_BY_MERGE_KEY, false, - GroupByQueryConfig.CTX_KEY_STRATEGY, GroupByStrategySelector.STRATEGY_V1 + .setHavingSpec(null) + .setLimitSpec(NoopLimitSpec.instance()) + .overrideContext( + ImmutableMap.of( + "finalize", false, + //setting sort to false avoids unnecessary sorting while merging results. we only need to sort + //in the end when returning results to user. (note this is only respected by groupBy v1) + GroupByQueryHelper.CTX_KEY_SORT_RESULTS, false, + //no merging needed at historicals because GroupByQueryRunnerFactory.mergeRunners(..) would + //return merged results. (note this is only respected by groupBy v1) + GroupByQueryQueryToolChest.GROUP_BY_MERGE_KEY, false, + GroupByQueryConfig.CTX_KEY_STRATEGY, GroupByStrategySelector.STRATEGY_V1 + ) ) - ), + .build(), responseContext ), true ); - return Sequences.withBaggage(query.applyLimit(GroupByQueryHelper.postAggregate(query, index)), index); + return Sequences.withBaggage(query.postProcess(GroupByQueryHelper.postAggregate(query, index)), index); } @Override @@ -253,7 +247,7 @@ public class GroupByStrategyV1 implements GroupByStrategy innerQueryResultIndex.close(); return Sequences.withBaggage( - outerQuery.applyLimit(GroupByQueryHelper.postAggregate(query, outerQueryResultIndex)), + outerQuery.postProcess(GroupByQueryHelper.postAggregate(query, outerQueryResultIndex)), outerQueryResultIndex ); } diff --git a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java index e0e7273f572..e13b4e7c6fb 100644 --- a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java +++ b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java @@ -61,6 +61,7 @@ import io.druid.query.groupby.epinephelinae.GroupByBinaryFnV2; import io.druid.query.groupby.epinephelinae.GroupByMergingQueryRunnerV2; import io.druid.query.groupby.epinephelinae.GroupByQueryEngineV2; import io.druid.query.groupby.epinephelinae.GroupByRowProcessor; +import io.druid.query.groupby.orderby.NoopLimitSpec; import io.druid.query.groupby.resource.GroupByQueryResource; import io.druid.segment.StorageAdapter; import org.joda.time.DateTime; @@ -229,31 +230,24 @@ public class GroupByStrategyV2 implements GroupByStrategy // Fudge timestamp, maybe. final DateTime fudgeTimestamp = getUniversalTimestamp(query); - return query.applyLimit( + return query.postProcess( Sequences.map( mergingQueryRunner.run( - new GroupByQuery( - query.getDataSource(), - query.getQuerySegmentSpec(), - query.getVirtualColumns(), - query.getDimFilter(), - query.getGranularity(), - query.getDimensions(), - query.getAggregatorSpecs(), + new GroupByQuery.Builder(query) // Don't do post aggs until the end of this method. - ImmutableList.of(), + .setPostAggregatorSpecs(ImmutableList.of()) // Don't do "having" clause until the end of this method. - null, - null, - query.getContext() - ).withOverriddenContext( - ImmutableMap.of( - "finalize", false, - GroupByQueryConfig.CTX_KEY_STRATEGY, GroupByStrategySelector.STRATEGY_V2, - CTX_KEY_FUDGE_TIMESTAMP, fudgeTimestamp == null ? "" : String.valueOf(fudgeTimestamp.getMillis()), - CTX_KEY_OUTERMOST, false + .setHavingSpec(null) + .setLimitSpec(NoopLimitSpec.instance()) + .overrideContext( + ImmutableMap.of( + "finalize", false, + GroupByQueryConfig.CTX_KEY_STRATEGY, GroupByStrategySelector.STRATEGY_V2, + CTX_KEY_FUDGE_TIMESTAMP, fudgeTimestamp == null ? "" : String.valueOf(fudgeTimestamp.getMillis()), + CTX_KEY_OUTERMOST, false + ) ) - ), + .build(), responseContext ), new Function() diff --git a/processing/src/main/java/io/druid/query/metadata/metadata/SegmentMetadataQuery.java b/processing/src/main/java/io/druid/query/metadata/metadata/SegmentMetadataQuery.java index 3d9ab5b117e..130025c6077 100644 --- a/processing/src/main/java/io/druid/query/metadata/metadata/SegmentMetadataQuery.java +++ b/processing/src/main/java/io/druid/query/metadata/metadata/SegmentMetadataQuery.java @@ -27,6 +27,7 @@ import com.google.common.collect.Lists; import io.druid.common.utils.JodaUtils; import io.druid.query.BaseQuery; import io.druid.query.DataSource; +import io.druid.query.Druids; import io.druid.query.Query; import io.druid.query.TableDataSource; import io.druid.query.UnionDataSource; @@ -232,60 +233,25 @@ public class SegmentMetadataQuery extends BaseQuery @Override public Query withOverriddenContext(Map contextOverride) { - return new SegmentMetadataQuery( - getDataSource(), - getQuerySegmentSpec(), - toInclude, - merge, - computeOverridenContext(contextOverride), - analysisTypes, - usingDefaultInterval, - lenientAggregatorMerge - ); + Map newContext = computeOverriddenContext(getContext(), contextOverride); + return Druids.SegmentMetadataQueryBuilder.copy(this).context(newContext).build(); } @Override public Query withQuerySegmentSpec(QuerySegmentSpec spec) { - return new SegmentMetadataQuery( - getDataSource(), - spec, - toInclude, - merge, - getContext(), - analysisTypes, - usingDefaultInterval, - lenientAggregatorMerge - ); + return Druids.SegmentMetadataQueryBuilder.copy(this).intervals(spec).build(); } @Override public Query withDataSource(DataSource dataSource) { - return new SegmentMetadataQuery( - dataSource, - getQuerySegmentSpec(), - toInclude, - merge, - getContext(), - analysisTypes, - usingDefaultInterval, - lenientAggregatorMerge - ); + return Druids.SegmentMetadataQueryBuilder.copy(this).dataSource(dataSource).build(); } public Query withColumns(ColumnIncluderator includerator) { - return new SegmentMetadataQuery( - getDataSource(), - getQuerySegmentSpec(), - includerator, - merge, - getContext(), - analysisTypes, - usingDefaultInterval, - lenientAggregatorMerge - ); + return Druids.SegmentMetadataQueryBuilder.copy(this).toInclude(includerator).build(); } @Override diff --git a/processing/src/main/java/io/druid/query/search/search/SearchQuery.java b/processing/src/main/java/io/druid/query/search/search/SearchQuery.java index c45a21cddb0..a9971907dd5 100644 --- a/processing/src/main/java/io/druid/query/search/search/SearchQuery.java +++ b/processing/src/main/java/io/druid/query/search/search/SearchQuery.java @@ -26,6 +26,7 @@ import io.druid.java.util.common.granularity.Granularities; import io.druid.java.util.common.granularity.Granularity; import io.druid.query.BaseQuery; import io.druid.query.DataSource; +import io.druid.query.Druids; import io.druid.query.Query; import io.druid.query.Result; import io.druid.query.dimension.DimensionSpec; @@ -95,64 +96,25 @@ public class SearchQuery extends BaseQuery> @Override public SearchQuery withQuerySegmentSpec(QuerySegmentSpec spec) { - return new SearchQuery( - getDataSource(), - dimFilter, - granularity, - limit, - spec, - dimensions, - querySpec, - sortSpec, - getContext() - ); + return Druids.SearchQueryBuilder.copy(this).intervals(spec).build(); } @Override public Query> withDataSource(DataSource dataSource) { - return new SearchQuery( - dataSource, - dimFilter, - granularity, - limit, - getQuerySegmentSpec(), - dimensions, - querySpec, - sortSpec, - getContext() - ); + return Druids.SearchQueryBuilder.copy(this).dataSource(dataSource).build(); } @Override public SearchQuery withOverriddenContext(Map contextOverrides) { - return new SearchQuery( - getDataSource(), - dimFilter, - granularity, - limit, - getQuerySegmentSpec(), - dimensions, - querySpec, - sortSpec, - computeOverridenContext(contextOverrides) - ); + Map newContext = computeOverriddenContext(getContext(), contextOverrides); + return Druids.SearchQueryBuilder.copy(this).context(newContext).build(); } public SearchQuery withDimFilter(DimFilter dimFilter) { - return new SearchQuery( - getDataSource(), - dimFilter, - granularity, - limit, - getQuerySegmentSpec(), - dimensions, - querySpec, - sortSpec, - getContext() - ); + return Druids.SearchQueryBuilder.copy(this).filters(dimFilter).build(); } @JsonProperty("filter") @@ -193,17 +155,7 @@ public class SearchQuery extends BaseQuery> public SearchQuery withLimit(int newLimit) { - return new SearchQuery( - getDataSource(), - dimFilter, - granularity, - newLimit, - getQuerySegmentSpec(), - dimensions, - querySpec, - sortSpec, - getContext() - ); + return Druids.SearchQueryBuilder.copy(this).limit(newLimit).build(); } @Override diff --git a/processing/src/main/java/io/druid/query/select/SelectQuery.java b/processing/src/main/java/io/druid/query/select/SelectQuery.java index 4c0154857c6..72059580c4f 100644 --- a/processing/src/main/java/io/druid/query/select/SelectQuery.java +++ b/processing/src/main/java/io/druid/query/select/SelectQuery.java @@ -26,6 +26,7 @@ import com.google.common.base.Preconditions; import io.druid.java.util.common.granularity.Granularity; import io.druid.query.BaseQuery; import io.druid.query.DataSource; +import io.druid.query.Druids; import io.druid.query.Query; import io.druid.query.Result; import io.druid.query.dimension.DimensionSpec; @@ -146,83 +147,29 @@ public class SelectQuery extends BaseQuery> public SelectQuery withQuerySegmentSpec(QuerySegmentSpec querySegmentSpec) { - return new SelectQuery( - getDataSource(), - querySegmentSpec, - isDescending(), - dimFilter, - granularity, - dimensions, - metrics, - virtualColumns, - pagingSpec, - getContext() - ); + return Druids.SelectQueryBuilder.copy(this).intervals(querySegmentSpec).build(); } @Override public Query> withDataSource(DataSource dataSource) { - return new SelectQuery( - dataSource, - getQuerySegmentSpec(), - isDescending(), - dimFilter, - granularity, - dimensions, - metrics, - virtualColumns, - pagingSpec, - getContext() - ); + return Druids.SelectQueryBuilder.copy(this).dataSource(dataSource).build(); } public SelectQuery withOverriddenContext(Map contextOverrides) { - return new SelectQuery( - getDataSource(), - getQuerySegmentSpec(), - isDescending(), - dimFilter, - granularity, - dimensions, - metrics, - virtualColumns, - pagingSpec, - computeOverridenContext(contextOverrides) - ); + Map newContext = computeOverriddenContext(getContext(), contextOverrides); + return Druids.SelectQueryBuilder.copy(this).context(newContext).build(); } public SelectQuery withPagingSpec(PagingSpec pagingSpec) { - return new SelectQuery( - getDataSource(), - getQuerySegmentSpec(), - isDescending(), - dimFilter, - granularity, - dimensions, - metrics, - virtualColumns, - pagingSpec, - getContext() - ); + return Druids.SelectQueryBuilder.copy(this).pagingSpec(pagingSpec).build(); } public SelectQuery withDimFilter(DimFilter dimFilter) { - return new SelectQuery( - getDataSource(), - getQuerySegmentSpec(), - isDescending(), - dimFilter, - granularity, - dimensions, - metrics, - virtualColumns, - pagingSpec, - getContext() - ); + return Druids.SelectQueryBuilder.copy(this).filters(dimFilter).build(); } @Override diff --git a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQuery.java b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQuery.java index 4501e40c560..d466cd21140 100644 --- a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQuery.java +++ b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQuery.java @@ -27,6 +27,7 @@ import io.druid.common.utils.JodaUtils; import io.druid.java.util.common.StringUtils; import io.druid.query.BaseQuery; import io.druid.query.DataSource; +import io.druid.query.Druids; import io.druid.query.Query; import io.druid.query.Result; import io.druid.query.filter.DimFilter; @@ -82,10 +83,11 @@ public class TimeBoundaryQuery extends BaseQuery return dimFilter != null; } + @JsonProperty("filter") @Override public DimFilter getFilter() { - return null; + return dimFilter; } @Override @@ -94,12 +96,6 @@ public class TimeBoundaryQuery extends BaseQuery return Query.TIME_BOUNDARY; } - @JsonProperty("filter") - public DimFilter getDimensionsFilter() - { - return dimFilter; - } - @JsonProperty public String getBound() { @@ -109,37 +105,20 @@ public class TimeBoundaryQuery extends BaseQuery @Override public TimeBoundaryQuery withOverriddenContext(Map contextOverrides) { - return new TimeBoundaryQuery( - getDataSource(), - getQuerySegmentSpec(), - bound, - dimFilter, - computeOverridenContext(contextOverrides) - ); + Map newContext = computeOverriddenContext(getContext(), contextOverrides); + return Druids.TimeBoundaryQueryBuilder.copy(this).context(newContext).build(); } @Override public TimeBoundaryQuery withQuerySegmentSpec(QuerySegmentSpec spec) { - return new TimeBoundaryQuery( - getDataSource(), - spec, - bound, - dimFilter, - getContext() - ); + return Druids.TimeBoundaryQueryBuilder.copy(this).intervals(spec).build(); } @Override public Query> withDataSource(DataSource dataSource) { - return new TimeBoundaryQuery( - dataSource, - getQuerySegmentSpec(), - bound, - dimFilter, - getContext() - ); + return Druids.TimeBoundaryQueryBuilder.copy(this).dataSource(dataSource).build(); } public byte[] getCacheKey() diff --git a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java index e3b383cc78e..913933ffd33 100644 --- a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java +++ b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java @@ -112,7 +112,8 @@ public class TimeBoundaryQueryRunnerFactory final Sequence> resultSequence = QueryRunnerHelper.makeCursorBasedQuery( adapter, legacyQuery.getQuerySegmentSpec().getIntervals(), - Filters.toFilter(legacyQuery.getDimensionsFilter()), VirtualColumns.EMPTY, + Filters.toFilter(legacyQuery.getFilter()), + VirtualColumns.EMPTY, descending, Granularities.ALL, this.skipToFirstMatching @@ -154,7 +155,7 @@ public class TimeBoundaryQueryRunnerFactory final DateTime minTime; final DateTime maxTime; - if (legacyQuery.getDimensionsFilter() != null) { + if (legacyQuery.getFilter() != null) { minTime = getTimeBoundary(adapter, legacyQuery, false); if (minTime == null) { maxTime = null; diff --git a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQuery.java b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQuery.java index e8165273d3a..32840029134 100644 --- a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQuery.java +++ b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQuery.java @@ -26,6 +26,7 @@ import com.google.common.collect.ImmutableList; import io.druid.java.util.common.granularity.Granularity; import io.druid.query.BaseQuery; import io.druid.query.DataSource; +import io.druid.query.Druids; import io.druid.query.Queries; import io.druid.query.Query; import io.druid.query.Result; @@ -64,15 +65,14 @@ public class TimeseriesQuery extends BaseQuery> ) { super(dataSource, querySegmentSpec, descending, context); + this.virtualColumns = VirtualColumns.nullToEmpty(virtualColumns); this.dimFilter = dimFilter; this.granularity = granularity; - this.aggregatorSpecs = aggregatorSpecs == null ? ImmutableList.of() : aggregatorSpecs; + this.aggregatorSpecs = aggregatorSpecs == null ? ImmutableList.of() : aggregatorSpecs; this.postAggregatorSpecs = Queries.prepareAggregations( this.aggregatorSpecs, - postAggregatorSpecs == null - ? ImmutableList.of() - : postAggregatorSpecs + postAggregatorSpecs == null ? ImmutableList.of() : postAggregatorSpecs ); } @@ -131,78 +131,29 @@ public class TimeseriesQuery extends BaseQuery> public TimeseriesQuery withQuerySegmentSpec(QuerySegmentSpec querySegmentSpec) { - return new TimeseriesQuery( - getDataSource(), - querySegmentSpec, - isDescending(), - virtualColumns, - dimFilter, - granularity, - aggregatorSpecs, - postAggregatorSpecs, - getContext() - ); + return Druids.TimeseriesQueryBuilder.copy(this).intervals(querySegmentSpec).build(); } @Override public Query> withDataSource(DataSource dataSource) { - return new TimeseriesQuery( - dataSource, - getQuerySegmentSpec(), - isDescending(), - virtualColumns, - dimFilter, - granularity, - aggregatorSpecs, - postAggregatorSpecs, - getContext() - ); + return Druids.TimeseriesQueryBuilder.copy(this).dataSource(dataSource).build(); } public TimeseriesQuery withOverriddenContext(Map contextOverrides) { - return new TimeseriesQuery( - getDataSource(), - getQuerySegmentSpec(), - isDescending(), - virtualColumns, - dimFilter, - granularity, - aggregatorSpecs, - postAggregatorSpecs, - computeOverridenContext(contextOverrides) - ); + Map newContext = computeOverriddenContext(getContext(), contextOverrides); + return Druids.TimeseriesQueryBuilder.copy(this).context(newContext).build(); } public TimeseriesQuery withDimFilter(DimFilter dimFilter) { - return new TimeseriesQuery( - getDataSource(), - getQuerySegmentSpec(), - isDescending(), - virtualColumns, - dimFilter, - granularity, - aggregatorSpecs, - postAggregatorSpecs, - getContext() - ); + return Druids.TimeseriesQueryBuilder.copy(this).filters(dimFilter).build(); } public TimeseriesQuery withPostAggregatorSpecs(final List postAggregatorSpecs) { - return new TimeseriesQuery( - getDataSource(), - getQuerySegmentSpec(), - isDescending(), - virtualColumns, - dimFilter, - granularity, - aggregatorSpecs, - postAggregatorSpecs, - getContext() - ); + return Druids.TimeseriesQueryBuilder.copy(this).postAggregators(postAggregatorSpecs).build(); } @Override diff --git a/processing/src/main/java/io/druid/query/topn/TopNQuery.java b/processing/src/main/java/io/druid/query/topn/TopNQuery.java index 29ca0cddcba..df7583ebd76 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNQuery.java +++ b/processing/src/main/java/io/druid/query/topn/TopNQuery.java @@ -71,6 +71,7 @@ public class TopNQuery extends BaseQuery> ) { super(dataSource, querySegmentSpec, false, context); + this.virtualColumns = VirtualColumns.nullToEmpty(virtualColumns); this.dimensionSpec = dimensionSpec; this.topNMetricSpec = topNMetricSpec; @@ -169,139 +170,43 @@ public class TopNQuery extends BaseQuery> public TopNQuery withQuerySegmentSpec(QuerySegmentSpec querySegmentSpec) { - return new TopNQuery( - getDataSource(), - virtualColumns, - dimensionSpec, - topNMetricSpec, - threshold, - querySegmentSpec, - dimFilter, - granularity, - aggregatorSpecs, - postAggregatorSpecs, - getContext() - ); + return new TopNQueryBuilder(this).intervals(querySegmentSpec).build(); } public TopNQuery withDimensionSpec(DimensionSpec spec) { - return new TopNQuery( - getDataSource(), - virtualColumns, - spec, - topNMetricSpec, - threshold, - getQuerySegmentSpec(), - dimFilter, - granularity, - aggregatorSpecs, - postAggregatorSpecs, - getContext() - ); + return new TopNQueryBuilder(this).dimension(spec).build(); } public TopNQuery withAggregatorSpecs(List aggregatorSpecs) { - return new TopNQuery( - getDataSource(), - virtualColumns, - getDimensionSpec(), - topNMetricSpec, - threshold, - getQuerySegmentSpec(), - dimFilter, - granularity, - aggregatorSpecs, - postAggregatorSpecs, - getContext() - ); + return new TopNQueryBuilder(this).aggregators(aggregatorSpecs).build(); } public TopNQuery withPostAggregatorSpecs(List postAggregatorSpecs) { - return new TopNQuery( - getDataSource(), - virtualColumns, - getDimensionSpec(), - topNMetricSpec, - threshold, - getQuerySegmentSpec(), - dimFilter, - granularity, - aggregatorSpecs, - postAggregatorSpecs, - getContext() - ); + return new TopNQueryBuilder(this).postAggregators(postAggregatorSpecs).build(); } @Override public Query> withDataSource(DataSource dataSource) { - return new TopNQuery( - dataSource, - virtualColumns, - dimensionSpec, - topNMetricSpec, - threshold, - getQuerySegmentSpec(), - dimFilter, - granularity, - aggregatorSpecs, - postAggregatorSpecs, - getContext() - ); + return new TopNQueryBuilder(this).dataSource(dataSource).build(); } public TopNQuery withThreshold(int threshold) { - return new TopNQuery( - getDataSource(), - virtualColumns, - dimensionSpec, - topNMetricSpec, - threshold, - getQuerySegmentSpec(), - dimFilter, - granularity, - aggregatorSpecs, - postAggregatorSpecs, - getContext() - ); + return new TopNQueryBuilder(this).threshold(threshold).build(); } public TopNQuery withOverriddenContext(Map contextOverrides) { - return new TopNQuery( - getDataSource(), - virtualColumns, - dimensionSpec, - topNMetricSpec, - threshold, - getQuerySegmentSpec(), - dimFilter, - granularity, - aggregatorSpecs, - postAggregatorSpecs, - computeOverridenContext(contextOverrides) - ); + return new TopNQueryBuilder(this).context(computeOverriddenContext(getContext(), contextOverrides)).build(); } public TopNQuery withDimFilter(DimFilter dimFilter) { - return new TopNQuery( - getDataSource(), - virtualColumns, - getDimensionSpec(), - topNMetricSpec, - threshold, - getQuerySegmentSpec(), - dimFilter, - granularity, - aggregatorSpecs, - postAggregatorSpecs, - getContext() - ); + return new TopNQueryBuilder(this).filters(dimFilter).build(); } @Override diff --git a/processing/src/main/java/io/druid/query/topn/TopNQueryBuilder.java b/processing/src/main/java/io/druid/query/topn/TopNQueryBuilder.java index bdf8dc6153a..4a06a486f39 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNQueryBuilder.java +++ b/processing/src/main/java/io/druid/query/topn/TopNQueryBuilder.java @@ -23,6 +23,7 @@ import com.google.common.collect.Lists; import io.druid.java.util.common.granularity.Granularities; import io.druid.java.util.common.granularity.Granularity; import io.druid.query.DataSource; +import io.druid.query.QueryMetrics; import io.druid.query.TableDataSource; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.PostAggregator; @@ -75,6 +76,7 @@ public class TopNQueryBuilder private List aggregatorSpecs; private List postAggregatorSpecs; private Map context; + private QueryMetrics queryMetrics; public TopNQueryBuilder() { @@ -328,4 +330,10 @@ public class TopNQueryBuilder context = c; return this; } + + public TopNQueryBuilder queryMetrics(QueryMetrics m) + { + queryMetrics = m; + return this; + } } diff --git a/processing/src/test/java/io/druid/query/QueryContextsTest.java b/processing/src/test/java/io/druid/query/QueryContextsTest.java index 79a7ebf992d..2a6945b9d3c 100644 --- a/processing/src/test/java/io/druid/query/QueryContextsTest.java +++ b/processing/src/test/java/io/druid/query/QueryContextsTest.java @@ -78,7 +78,7 @@ public class QueryContextsTest getDataSource(), getQuerySegmentSpec(), isDescending(), - computeOverridenContext(contextOverride) + BaseQuery.computeOverriddenContext(getContext(), contextOverride) ); } } diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java index 3faea8e2ecc..147c8e1bf64 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java @@ -2710,11 +2710,11 @@ public class GroupByQueryRunnerTest TestHelper.assertExpectedObjects(expectedResults, mergeRunner.run(query, context), "no-limit"); TestHelper.assertExpectedObjects( - Iterables.limit(expectedResults, 5), mergeRunner.run(builder.limit(5).build(), context), "limited" + Iterables.limit(expectedResults, 5), mergeRunner.run(builder.setLimit(5).build(), context), "limited" ); // Now try it with an expression based aggregator. - builder.limit(Integer.MAX_VALUE) + builder.setLimit(Integer.MAX_VALUE) .setAggregatorSpecs( Arrays.asList( QueryRunnerTestHelper.rowsCount, @@ -2737,11 +2737,11 @@ public class GroupByQueryRunnerTest TestHelper.assertExpectedObjects(expectedResults, mergeRunner.run(builder.build(), context), "no-limit"); TestHelper.assertExpectedObjects( - Iterables.limit(expectedResults, 5), mergeRunner.run(builder.limit(5).build(), context), "limited" + Iterables.limit(expectedResults, 5), mergeRunner.run(builder.setLimit(5).build(), context), "limited" ); // Now try it with an expression virtual column. - builder.limit(Integer.MAX_VALUE) + builder.setLimit(Integer.MAX_VALUE) .setVirtualColumns( new ExpressionVirtualColumn("expr", "index / 2 + indexMin") ) @@ -2754,7 +2754,7 @@ public class GroupByQueryRunnerTest TestHelper.assertExpectedObjects(expectedResults, mergeRunner.run(builder.build(), context), "no-limit"); TestHelper.assertExpectedObjects( - Iterables.limit(expectedResults, 5), mergeRunner.run(builder.limit(5).build(), context), "limited" + Iterables.limit(expectedResults, 5), mergeRunner.run(builder.setLimit(5).build(), context), "limited" ); } @@ -2794,7 +2794,7 @@ public class GroupByQueryRunnerTest QueryRunner mergeRunner = factory.getToolchest().mergeResults(runner); TestHelper.assertExpectedObjects(expectedResults, mergeRunner.run(query, context), "no-limit"); TestHelper.assertExpectedObjects( - Iterables.limit(expectedResults, 5), mergeRunner.run(builder.limit(5).build(), context), "limited" + Iterables.limit(expectedResults, 5), mergeRunner.run(builder.setLimit(5).build(), context), "limited" ); } @@ -2835,7 +2835,7 @@ public class GroupByQueryRunnerTest QueryRunner mergeRunner = factory.getToolchest().mergeResults(runner); TestHelper.assertExpectedObjects(expectedResults, mergeRunner.run(query, context), "no-limit"); TestHelper.assertExpectedObjects( - Iterables.limit(expectedResults, 5), mergeRunner.run(builder.limit(5).build(), context), "limited" + Iterables.limit(expectedResults, 5), mergeRunner.run(builder.setLimit(5).build(), context), "limited" ); } @@ -2875,7 +2875,7 @@ public class GroupByQueryRunnerTest QueryRunner mergeRunner = factory.getToolchest().mergeResults(runner); TestHelper.assertExpectedObjects(expectedResults, mergeRunner.run(query, context), "no-limit"); TestHelper.assertExpectedObjects( - Iterables.limit(expectedResults, 5), mergeRunner.run(builder.limit(5).build(), context), "limited" + Iterables.limit(expectedResults, 5), mergeRunner.run(builder.setLimit(5).build(), context), "limited" ); }