From 4c8495662f672f146bd1f7eb64cd3bd3096a7aed Mon Sep 17 00:00:00 2001 From: Yuval Oren Date: Wed, 15 Jan 2014 15:38:57 -0800 Subject: [PATCH] Subquery support for GroupBy queries --- .../main/java/io/druid/query/BaseQuery.java | 38 ++- .../main/java/io/druid/query/DataSource.java | 38 +++ .../src/main/java/io/druid/query/Druids.java | 38 ++- .../src/main/java/io/druid/query/Query.java | 3 +- .../java/io/druid/query/QueryDataSource.java | 79 +++++ .../io/druid/query/SubqueryQueryRunner.java | 50 +++ .../java/io/druid/query/TableDataSource.java | 73 +++++ .../aggregation/CountAggregatorFactory.java | 19 ++ .../DoubleSumAggregatorFactory.java | 22 ++ .../HistogramAggregatorFactory.java | 26 ++ .../JavaScriptAggregatorFactory.java | 31 ++ .../aggregation/LongSumAggregatorFactory.java | 22 ++ .../aggregation/MaxAggregatorFactory.java | 22 ++ .../aggregation/MinAggregatorFactory.java | 22 ++ .../ToLowerCaseAggregatorFactory.java | 20 ++ .../post/ArithmeticPostAggregator.java | 26 ++ .../post/ConstantPostAggregator.java | 29 ++ .../post/FieldAccessPostAggregator.java | 22 ++ .../post/JavaScriptPostAggregator.java | 26 ++ .../query/dimension/DefaultDimensionSpec.java | 22 ++ .../dimension/ExtractionDimensionSpec.java | 25 ++ .../io/druid/query/groupby/GroupByQuery.java | 81 ++++- .../groupby/GroupByQueryQueryToolChest.java | 80 +++-- .../groupby/orderby/DefaultLimitSpec.java | 22 ++ .../query/groupby/orderby/NoopLimitSpec.java | 11 + .../SegmentMetadataQueryQueryToolChest.java | 8 +- .../metadata/SegmentMetadataQuery.java | 35 ++- .../search/SearchQueryQueryToolChest.java | 11 +- .../search/FragmentSearchQuerySpec.java | 19 ++ .../InsensitiveContainsSearchQuerySpec.java | 19 ++ .../search/LexicographicSearchSortSpec.java | 5 + .../query/search/search/SearchQuery.java | 40 ++- .../spec/MultipleIntervalSegmentSpec.java | 19 ++ .../spec/MultipleSpecificSegmentSpec.java | 22 ++ .../druid/query/spec/SpecificSegmentSpec.java | 19 ++ .../query/timeboundary/TimeBoundaryQuery.java | 19 +- .../TimeBoundaryQueryQueryToolChest.java | 9 +- .../query/timeseries/TimeseriesQuery.java | 37 ++- .../timeseries/TimeseriesQueryEngine.java | 2 + .../TimeseriesQueryQueryToolChest.java | 13 +- .../TimeseriesQueryRunnerFactory.java | 8 +- .../query/topn/InvertedTopNMetricSpec.java | 19 ++ .../topn/LexicographicTopNMetricSpec.java | 19 ++ .../query/topn/NumericTopNMetricSpec.java | 19 ++ .../java/io/druid/query/topn/TopNQuery.java | 45 ++- .../io/druid/query/topn/TopNQueryBuilder.java | 16 +- .../query/topn/TopNQueryQueryToolChest.java | 2 +- .../java/io/druid/query/DataSourceTest.java | 88 ++++++ .../io/druid/query/QueryRunnerTestHelper.java | 4 + .../query/groupby/GroupByQueryRunnerTest.java | 287 ++++++++++++++---- .../druid/query/groupby/GroupByQueryTest.java | 68 +++++ .../GroupByTimeseriesQueryRunnerTest.java | 46 +-- .../druid/query/search/SearchQueryTest.java | 62 ++++ .../timeboundary/TimeBoundaryQueryTest.java | 51 ++++ .../query/timeseries/TimeseriesQueryTest.java | 62 ++++ .../druid/query/topn/TopNQueryRunnerTest.java | 126 +++----- .../io/druid/query/topn/TopNQueryTest.java | 75 +++++ .../io/druid/client/BrokerServerView.java | 19 +- .../druid/client/CachingClusteredClient.java | 1 + .../io/druid/client/TimelineServerView.java | 3 +- .../java/io/druid/server/QueryResource.java | 2 +- .../server/coordination/ServerManager.java | 59 ++-- 62 files changed, 1894 insertions(+), 311 deletions(-) create mode 100644 processing/src/main/java/io/druid/query/DataSource.java create mode 100644 processing/src/main/java/io/druid/query/QueryDataSource.java create mode 100644 processing/src/main/java/io/druid/query/SubqueryQueryRunner.java create mode 100644 processing/src/main/java/io/druid/query/TableDataSource.java create mode 100644 processing/src/test/java/io/druid/query/DataSourceTest.java create mode 100644 processing/src/test/java/io/druid/query/groupby/GroupByQueryTest.java create mode 100644 processing/src/test/java/io/druid/query/search/SearchQueryTest.java create mode 100644 processing/src/test/java/io/druid/query/timeboundary/TimeBoundaryQueryTest.java create mode 100644 processing/src/test/java/io/druid/query/timeseries/TimeseriesQueryTest.java create mode 100644 processing/src/test/java/io/druid/query/topn/TopNQueryTest.java diff --git a/processing/src/main/java/io/druid/query/BaseQuery.java b/processing/src/main/java/io/druid/query/BaseQuery.java index b1e1a587123..ef837cc67b0 100644 --- a/processing/src/main/java/io/druid/query/BaseQuery.java +++ b/processing/src/main/java/io/druid/query/BaseQuery.java @@ -34,14 +34,14 @@ import java.util.Map; */ public abstract class BaseQuery implements Query { - private final String dataSource; + private final DataSource dataSource; private final Map context; private final QuerySegmentSpec querySegmentSpec; private volatile Duration duration; public BaseQuery( - String dataSource, + DataSource dataSource, QuerySegmentSpec querySegmentSpec, Map context ) @@ -49,14 +49,14 @@ public abstract class BaseQuery implements Query Preconditions.checkNotNull(dataSource, "dataSource can't be null"); Preconditions.checkNotNull(querySegmentSpec, "querySegmentSpec can't be null"); - this.dataSource = dataSource.toLowerCase(); + this.dataSource = dataSource; this.context = context; this.querySegmentSpec = querySegmentSpec; } @JsonProperty @Override - public String getDataSource() + public DataSource getDataSource() { return dataSource; } @@ -130,4 +130,34 @@ public abstract class BaseQuery implements Query return overridden; } + + /** + * Compare the BaseQuery fields with another BaseQuery. For use in subclasses implementing equals() + * @param that + * @return + */ + protected boolean partialEquals(BaseQuery that) + { + if (context != null ? !context.equals(that.context) : that.context != null) return false; + if (dataSource != null ? !dataSource.equals(that.dataSource) : that.dataSource != null) return false; + if (duration != null ? !duration.equals(that.duration) : that.duration != null) return false; + if (querySegmentSpec != null ? !querySegmentSpec.equals(that.querySegmentSpec) : that.querySegmentSpec != null) + return false; + + return true; + } + + /** + * Hash the fields within BaseQuery. For use in subclasses implementing hashCode() + * @return + */ + protected int partialHashCode() + { + int result = dataSource != null ? dataSource.hashCode() : 0; + result = 31 * result + (context != null ? context.hashCode() : 0); + result = 31 * result + (querySegmentSpec != null ? querySegmentSpec.hashCode() : 0); + result = 31 * result + (duration != null ? duration.hashCode() : 0); + return result; + } + } diff --git a/processing/src/main/java/io/druid/query/DataSource.java b/processing/src/main/java/io/druid/query/DataSource.java new file mode 100644 index 00000000000..a9a4ed94b17 --- /dev/null +++ b/processing/src/main/java/io/druid/query/DataSource.java @@ -0,0 +1,38 @@ +/* + * Druid - a distributed column store. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * + * This file Copyright (C) 2014 N3TWORK, Inc. and contributed to the Druid project + * under the Druid Corporate Contributor License Agreement. + */ + +package io.druid.query; + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; + +import java.io.Serializable; +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, + include = JsonTypeInfo.As.PROPERTY, + property = "type", + defaultImpl = TableDataSource.class) +@JsonSubTypes({ + @JsonSubTypes.Type(value = TableDataSource.class, name = "table"), + @JsonSubTypes.Type(value = QueryDataSource.class, name = "query") +}) +public interface DataSource extends Serializable +{ +} diff --git a/processing/src/main/java/io/druid/query/Druids.java b/processing/src/main/java/io/druid/query/Druids.java index 99d5acdc6f6..fd2abd13ab8 100644 --- a/processing/src/main/java/io/druid/query/Druids.java +++ b/processing/src/main/java/io/druid/query/Druids.java @@ -298,7 +298,7 @@ public class Druids */ public static class TimeseriesQueryBuilder { - private String dataSource; + private DataSource dataSource; private QuerySegmentSpec querySegmentSpec; private DimFilter dimFilter; private QueryGranularity granularity; @@ -308,7 +308,7 @@ public class Druids private TimeseriesQueryBuilder() { - dataSource = ""; + dataSource = null; querySegmentSpec = null; dimFilter = null; granularity = QueryGranularity.ALL; @@ -354,7 +354,7 @@ public class Druids .context(builder.context); } - public String getDataSource() + public DataSource getDataSource() { return dataSource; } @@ -390,6 +390,12 @@ public class Druids } public TimeseriesQueryBuilder dataSource(String ds) + { + dataSource = new TableDataSource(ds); + return this; + } + + public TimeseriesQueryBuilder dataSource(DataSource ds) { dataSource = ds; return this; @@ -492,7 +498,7 @@ public class Druids */ public static class SearchQueryBuilder { - private String dataSource; + private DataSource dataSource; private DimFilter dimFilter; private QueryGranularity granularity; private int limit; @@ -503,7 +509,7 @@ public class Druids public SearchQueryBuilder() { - dataSource = ""; + dataSource = null; dimFilter = null; granularity = QueryGranularity.ALL; limit = 0; @@ -531,7 +537,7 @@ public class Druids public SearchQueryBuilder copy(SearchQuery query) { return new SearchQueryBuilder() - .dataSource(query.getDataSource()) + .dataSource(((TableDataSource)query.getDataSource()).getName()) .intervals(query.getQuerySegmentSpec()) .filters(query.getDimensionsFilter()) .granularity(query.getGranularity()) @@ -555,6 +561,12 @@ public class Druids } public SearchQueryBuilder dataSource(String d) + { + dataSource = new TableDataSource(d); + return this; + } + + public SearchQueryBuilder dataSource(DataSource d) { dataSource = d; return this; @@ -676,13 +688,13 @@ public class Druids */ public static class TimeBoundaryQueryBuilder { - private String dataSource; + private DataSource dataSource; private QuerySegmentSpec querySegmentSpec; private Map context; public TimeBoundaryQueryBuilder() { - dataSource = ""; + dataSource = null; querySegmentSpec = null; context = null; } @@ -704,9 +716,15 @@ public class Druids .context(builder.context); } - public TimeBoundaryQueryBuilder dataSource(String d) + public TimeBoundaryQueryBuilder dataSource(String ds) { - dataSource = d; + dataSource = new TableDataSource(ds); + return this; + } + + public TimeBoundaryQueryBuilder dataSource(DataSource ds) + { + dataSource = ds; return this; } diff --git a/processing/src/main/java/io/druid/query/Query.java b/processing/src/main/java/io/druid/query/Query.java index f4ad958e75d..34b145a2cb1 100644 --- a/processing/src/main/java/io/druid/query/Query.java +++ b/processing/src/main/java/io/druid/query/Query.java @@ -19,6 +19,7 @@ package io.druid.query; +import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; import com.metamx.common.guava.Sequence; @@ -53,7 +54,7 @@ public interface Query public static final String SEGMENT_METADATA = "segmentMetadata"; public static final String TOPN = "topN"; - public String getDataSource(); + public DataSource getDataSource(); public boolean hasFilters(); diff --git a/processing/src/main/java/io/druid/query/QueryDataSource.java b/processing/src/main/java/io/druid/query/QueryDataSource.java new file mode 100644 index 00000000000..b1721e54704 --- /dev/null +++ b/processing/src/main/java/io/druid/query/QueryDataSource.java @@ -0,0 +1,79 @@ +/* + * Druid - a distributed column store. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * + * This file Copyright (C) 2014 N3TWORK, Inc. and contributed to the Druid project + * under the Druid Corporate Contributor License Agreement. + */ + +package io.druid.query; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import io.druid.query.groupby.GroupByQuery; +import io.druid.query.metadata.metadata.SegmentMetadataQuery; +import io.druid.query.search.search.SearchQuery; +import io.druid.query.timeboundary.TimeBoundaryQuery; +import io.druid.query.timeseries.TimeseriesQuery; +import io.druid.query.topn.TopNQuery; + +public class QueryDataSource implements DataSource +{ + @JsonProperty + private Query query; + + public QueryDataSource() + { + } + + public QueryDataSource(Query query) + { + this.query = query; + } + + public Query getQuery() + { + return query; + } + + public void setQuery(Query query) + { + this.query = query; + } + + public String toString() { return query.toString(); } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + QueryDataSource that = (QueryDataSource) o; + + if (!query.equals(that.query)) return false; + + return true; + } + + @Override + public int hashCode() + { + return query.hashCode(); + } +} diff --git a/processing/src/main/java/io/druid/query/SubqueryQueryRunner.java b/processing/src/main/java/io/druid/query/SubqueryQueryRunner.java new file mode 100644 index 00000000000..86ce37b5cc8 --- /dev/null +++ b/processing/src/main/java/io/druid/query/SubqueryQueryRunner.java @@ -0,0 +1,50 @@ +/* + * Druid - a distributed column store. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * + * This file Copyright (C) 2014 N3TWORK, Inc. and contributed to the Druid project + * under the Druid Corporate Contributor License Agreement. + */ + +package io.druid.query; + +import com.metamx.common.guava.Sequence; +import org.joda.time.Period; + +/** + * If there's a subquery, run it instead of the outer query + */ +public class SubqueryQueryRunner implements QueryRunner +{ + private final QueryRunner baseRunner; + + public SubqueryQueryRunner(QueryRunner baseRunner) + { + this.baseRunner = baseRunner; + } + + @Override + public Sequence run(final Query query) + { + DataSource dataSource = query.getDataSource(); + if (dataSource instanceof QueryDataSource) { + return run((Query) ((QueryDataSource)dataSource).getQuery()); + } + else { + return baseRunner.run(query); + } + } +} diff --git a/processing/src/main/java/io/druid/query/TableDataSource.java b/processing/src/main/java/io/druid/query/TableDataSource.java new file mode 100644 index 00000000000..a6b091bdc89 --- /dev/null +++ b/processing/src/main/java/io/druid/query/TableDataSource.java @@ -0,0 +1,73 @@ +/* + * Druid - a distributed column store. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * + * This file Copyright (C) 2014 N3TWORK, Inc. and contributed to the Druid project + * under the Druid Corporate Contributor License Agreement. + */ +package io.druid.query; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +public class TableDataSource implements DataSource +{ + @JsonProperty + private String name; + + @JsonCreator + public TableDataSource() + { + + } + + @JsonCreator + public TableDataSource(String name) + { + this.name = name==null? name : name.toLowerCase(); + } + + public String getName() + { + return name; + } + + public void setName(String name) + { + this.name = name; + } + + public String toString() { return name; } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + TableDataSource that = (TableDataSource) o; + + if (!name.equals(that.name)) return false; + + return true; + } + + @Override + public int hashCode() + { + return name.hashCode(); + } +} diff --git a/processing/src/main/java/io/druid/query/aggregation/CountAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/CountAggregatorFactory.java index 1b02f923996..e47999e8719 100644 --- a/processing/src/main/java/io/druid/query/aggregation/CountAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/CountAggregatorFactory.java @@ -132,4 +132,23 @@ public class CountAggregatorFactory implements AggregatorFactory "name='" + name + '\'' + '}'; } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + CountAggregatorFactory that = (CountAggregatorFactory) o; + + if (name != null ? !name.equals(that.name) : that.name != null) return false; + + return true; + } + + @Override + public int hashCode() + { + return name != null ? name.hashCode() : 0; + } } diff --git a/processing/src/main/java/io/druid/query/aggregation/DoubleSumAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/DoubleSumAggregatorFactory.java index f85d0c677f5..ebd4e185ea3 100644 --- a/processing/src/main/java/io/druid/query/aggregation/DoubleSumAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/DoubleSumAggregatorFactory.java @@ -150,4 +150,26 @@ public class DoubleSumAggregatorFactory implements AggregatorFactory ", name='" + name + '\'' + '}'; } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + DoubleSumAggregatorFactory that = (DoubleSumAggregatorFactory) o; + + if (fieldName != null ? !fieldName.equals(that.fieldName) : that.fieldName != null) return false; + if (name != null ? !name.equals(that.name) : that.name != null) return false; + + return true; + } + + @Override + public int hashCode() + { + int result = fieldName != null ? fieldName.hashCode() : 0; + result = 31 * result + (name != null ? name.hashCode() : 0); + return result; + } } diff --git a/processing/src/main/java/io/druid/query/aggregation/HistogramAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/HistogramAggregatorFactory.java index 2abd19d2330..060d40d2798 100644 --- a/processing/src/main/java/io/druid/query/aggregation/HistogramAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/HistogramAggregatorFactory.java @@ -179,4 +179,30 @@ public class HistogramAggregatorFactory implements AggregatorFactory ", breaks=" + Arrays.toString(breaks) + '}'; } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + HistogramAggregatorFactory that = (HistogramAggregatorFactory) o; + + if (!Arrays.equals(breaks, that.breaks)) return false; + if (breaksList != null ? !breaksList.equals(that.breaksList) : that.breaksList != null) return false; + if (fieldName != null ? !fieldName.equals(that.fieldName) : that.fieldName != null) return false; + if (name != null ? !name.equals(that.name) : that.name != null) return false; + + return true; + } + + @Override + public int hashCode() + { + int result = name != null ? name.hashCode() : 0; + result = 31 * result + (fieldName != null ? fieldName.hashCode() : 0); + result = 31 * result + (breaksList != null ? breaksList.hashCode() : 0); + result = 31 * result + (breaks != null ? Arrays.hashCode(breaks) : 0); + return result; + } } diff --git a/processing/src/main/java/io/druid/query/aggregation/JavaScriptAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/JavaScriptAggregatorFactory.java index 927ab89676f..6de6be09ad8 100644 --- a/processing/src/main/java/io/druid/query/aggregation/JavaScriptAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/JavaScriptAggregatorFactory.java @@ -317,4 +317,35 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory } }; } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + JavaScriptAggregatorFactory that = (JavaScriptAggregatorFactory) o; + + if (compiledScript != null ? !compiledScript.equals(that.compiledScript) : that.compiledScript != null) + return false; + if (fieldNames != null ? !fieldNames.equals(that.fieldNames) : that.fieldNames != null) return false; + if (fnAggregate != null ? !fnAggregate.equals(that.fnAggregate) : that.fnAggregate != null) return false; + if (fnCombine != null ? !fnCombine.equals(that.fnCombine) : that.fnCombine != null) return false; + if (fnReset != null ? !fnReset.equals(that.fnReset) : that.fnReset != null) return false; + if (name != null ? !name.equals(that.name) : that.name != null) return false; + + return true; + } + + @Override + public int hashCode() + { + int result = name != null ? name.hashCode() : 0; + result = 31 * result + (fieldNames != null ? fieldNames.hashCode() : 0); + result = 31 * result + (fnAggregate != null ? fnAggregate.hashCode() : 0); + result = 31 * result + (fnReset != null ? fnReset.hashCode() : 0); + result = 31 * result + (fnCombine != null ? fnCombine.hashCode() : 0); + result = 31 * result + (compiledScript != null ? compiledScript.hashCode() : 0); + return result; + } } diff --git a/processing/src/main/java/io/druid/query/aggregation/LongSumAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/LongSumAggregatorFactory.java index f1372ff024c..50ef5130756 100644 --- a/processing/src/main/java/io/druid/query/aggregation/LongSumAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/LongSumAggregatorFactory.java @@ -150,4 +150,26 @@ public class LongSumAggregatorFactory implements AggregatorFactory ", name='" + name + '\'' + '}'; } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + LongSumAggregatorFactory that = (LongSumAggregatorFactory) o; + + if (fieldName != null ? !fieldName.equals(that.fieldName) : that.fieldName != null) return false; + if (name != null ? !name.equals(that.name) : that.name != null) return false; + + return true; + } + + @Override + public int hashCode() + { + int result = fieldName != null ? fieldName.hashCode() : 0; + result = 31 * result + (name != null ? name.hashCode() : 0); + return result; + } } diff --git a/processing/src/main/java/io/druid/query/aggregation/MaxAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/MaxAggregatorFactory.java index d66d9c5b2d9..ee8217f820b 100644 --- a/processing/src/main/java/io/druid/query/aggregation/MaxAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/MaxAggregatorFactory.java @@ -150,4 +150,26 @@ public class MaxAggregatorFactory implements AggregatorFactory ", name='" + name + '\'' + '}'; } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + MaxAggregatorFactory that = (MaxAggregatorFactory) o; + + if (fieldName != null ? !fieldName.equals(that.fieldName) : that.fieldName != null) return false; + if (name != null ? !name.equals(that.name) : that.name != null) return false; + + return true; + } + + @Override + public int hashCode() + { + int result = fieldName != null ? fieldName.hashCode() : 0; + result = 31 * result + (name != null ? name.hashCode() : 0); + return result; + } } diff --git a/processing/src/main/java/io/druid/query/aggregation/MinAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/MinAggregatorFactory.java index 0a168114358..9c3d560bacf 100644 --- a/processing/src/main/java/io/druid/query/aggregation/MinAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/MinAggregatorFactory.java @@ -150,4 +150,26 @@ public class MinAggregatorFactory implements AggregatorFactory ", name='" + name + '\'' + '}'; } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + MinAggregatorFactory that = (MinAggregatorFactory) o; + + if (fieldName != null ? !fieldName.equals(that.fieldName) : that.fieldName != null) return false; + if (name != null ? !name.equals(that.name) : that.name != null) return false; + + return true; + } + + @Override + public int hashCode() + { + int result = fieldName != null ? fieldName.hashCode() : 0; + result = 31 * result + (name != null ? name.hashCode() : 0); + return result; + } } diff --git a/processing/src/main/java/io/druid/query/aggregation/ToLowerCaseAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/ToLowerCaseAggregatorFactory.java index 457d22f2cdf..6c559ba8ec6 100644 --- a/processing/src/main/java/io/druid/query/aggregation/ToLowerCaseAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/ToLowerCaseAggregatorFactory.java @@ -112,4 +112,24 @@ public class ToLowerCaseAggregatorFactory implements AggregatorFactory { return baseAggregatorFactory.getAggregatorStartValue(); } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + ToLowerCaseAggregatorFactory that = (ToLowerCaseAggregatorFactory) o; + + if (baseAggregatorFactory != null ? !baseAggregatorFactory.equals(that.baseAggregatorFactory) : that.baseAggregatorFactory != null) + return false; + + return true; + } + + @Override + public int hashCode() + { + return baseAggregatorFactory != null ? baseAggregatorFactory.hashCode() : 0; + } } diff --git a/processing/src/main/java/io/druid/query/aggregation/post/ArithmeticPostAggregator.java b/processing/src/main/java/io/druid/query/aggregation/post/ArithmeticPostAggregator.java index cf6881d18ad..87138dadef8 100644 --- a/processing/src/main/java/io/druid/query/aggregation/post/ArithmeticPostAggregator.java +++ b/processing/src/main/java/io/druid/query/aggregation/post/ArithmeticPostAggregator.java @@ -193,4 +193,30 @@ public class ArithmeticPostAggregator implements PostAggregator return lookupMap.keySet(); } } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + ArithmeticPostAggregator that = (ArithmeticPostAggregator) o; + + if (fields != null ? !fields.equals(that.fields) : that.fields != null) return false; + if (fnName != null ? !fnName.equals(that.fnName) : that.fnName != null) return false; + if (name != null ? !name.equals(that.name) : that.name != null) return false; + if (op != that.op) return false; + + return true; + } + + @Override + public int hashCode() + { + int result = name != null ? name.hashCode() : 0; + result = 31 * result + (fnName != null ? fnName.hashCode() : 0); + result = 31 * result + (fields != null ? fields.hashCode() : 0); + result = 31 * result + (op != null ? op.hashCode() : 0); + return result; + } } diff --git a/processing/src/main/java/io/druid/query/aggregation/post/ConstantPostAggregator.java b/processing/src/main/java/io/druid/query/aggregation/post/ConstantPostAggregator.java index d5dce4d3140..7bf4ed23d4c 100644 --- a/processing/src/main/java/io/druid/query/aggregation/post/ConstantPostAggregator.java +++ b/processing/src/main/java/io/druid/query/aggregation/post/ConstantPostAggregator.java @@ -91,4 +91,33 @@ public class ConstantPostAggregator implements PostAggregator ", constantValue=" + constantValue + '}'; } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + ConstantPostAggregator that = (ConstantPostAggregator) o; + + if (constantValue != null && that.constantValue != null) { + if (constantValue.doubleValue() != that.constantValue.doubleValue()) + return false; + } + else if (constantValue != that.constantValue) { + return false; + } + + if (name != null ? !name.equals(that.name) : that.name != null) return false; + + return true; + } + + @Override + public int hashCode() + { + int result = name != null ? name.hashCode() : 0; + result = 31 * result + (constantValue != null ? constantValue.hashCode() : 0); + return result; + } } diff --git a/processing/src/main/java/io/druid/query/aggregation/post/FieldAccessPostAggregator.java b/processing/src/main/java/io/druid/query/aggregation/post/FieldAccessPostAggregator.java index 59962836d2e..6c2321b10fd 100644 --- a/processing/src/main/java/io/druid/query/aggregation/post/FieldAccessPostAggregator.java +++ b/processing/src/main/java/io/druid/query/aggregation/post/FieldAccessPostAggregator.java @@ -84,4 +84,26 @@ public class FieldAccessPostAggregator implements PostAggregator ", fieldName='" + fieldName + '\'' + '}'; } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + FieldAccessPostAggregator that = (FieldAccessPostAggregator) o; + + if (fieldName != null ? !fieldName.equals(that.fieldName) : that.fieldName != null) return false; + if (name != null ? !name.equals(that.name) : that.name != null) return false; + + return true; + } + + @Override + public int hashCode() + { + int result = name != null ? name.hashCode() : 0; + result = 31 * result + (fieldName != null ? fieldName.hashCode() : 0); + return result; + } } diff --git a/processing/src/main/java/io/druid/query/aggregation/post/JavaScriptPostAggregator.java b/processing/src/main/java/io/druid/query/aggregation/post/JavaScriptPostAggregator.java index fef4b26f0f2..666dbc64a82 100644 --- a/processing/src/main/java/io/druid/query/aggregation/post/JavaScriptPostAggregator.java +++ b/processing/src/main/java/io/druid/query/aggregation/post/JavaScriptPostAggregator.java @@ -142,4 +142,30 @@ public class JavaScriptPostAggregator implements PostAggregator { return function; } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + JavaScriptPostAggregator that = (JavaScriptPostAggregator) o; + + if (fieldNames != null ? !fieldNames.equals(that.fieldNames) : that.fieldNames != null) return false; + if (fn != null ? !fn.equals(that.fn) : that.fn != null) return false; + if (function != null ? !function.equals(that.function) : that.function != null) return false; + if (name != null ? !name.equals(that.name) : that.name != null) return false; + + return true; + } + + @Override + public int hashCode() + { + int result = name != null ? name.hashCode() : 0; + result = 31 * result + (fieldNames != null ? fieldNames.hashCode() : 0); + result = 31 * result + (function != null ? function.hashCode() : 0); + result = 31 * result + (fn != null ? fn.hashCode() : 0); + return result; + } } diff --git a/processing/src/main/java/io/druid/query/dimension/DefaultDimensionSpec.java b/processing/src/main/java/io/druid/query/dimension/DefaultDimensionSpec.java index 3a9137b008e..8e18ce61228 100644 --- a/processing/src/main/java/io/druid/query/dimension/DefaultDimensionSpec.java +++ b/processing/src/main/java/io/druid/query/dimension/DefaultDimensionSpec.java @@ -84,4 +84,26 @@ public class DefaultDimensionSpec implements DimensionSpec ", outputName='" + outputName + '\'' + '}'; } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + DefaultDimensionSpec that = (DefaultDimensionSpec) o; + + if (dimension != null ? !dimension.equals(that.dimension) : that.dimension != null) return false; + if (outputName != null ? !outputName.equals(that.outputName) : that.outputName != null) return false; + + return true; + } + + @Override + public int hashCode() + { + int result = dimension != null ? dimension.hashCode() : 0; + result = 31 * result + (outputName != null ? outputName.hashCode() : 0); + return result; + } } diff --git a/processing/src/main/java/io/druid/query/dimension/ExtractionDimensionSpec.java b/processing/src/main/java/io/druid/query/dimension/ExtractionDimensionSpec.java index 82fba73d0a1..9fe480e396d 100644 --- a/processing/src/main/java/io/druid/query/dimension/ExtractionDimensionSpec.java +++ b/processing/src/main/java/io/druid/query/dimension/ExtractionDimensionSpec.java @@ -92,4 +92,29 @@ public class ExtractionDimensionSpec implements DimensionSpec ", outputName='" + outputName + '\'' + '}'; } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + ExtractionDimensionSpec that = (ExtractionDimensionSpec) o; + + if (dimExtractionFn != null ? !dimExtractionFn.equals(that.dimExtractionFn) : that.dimExtractionFn != null) + return false; + if (dimension != null ? !dimension.equals(that.dimension) : that.dimension != null) return false; + if (outputName != null ? !outputName.equals(that.outputName) : that.outputName != null) return false; + + return true; + } + + @Override + public int hashCode() + { + int result = dimension != null ? dimension.hashCode() : 0; + result = 31 * result + (dimExtractionFn != null ? dimExtractionFn.hashCode() : 0); + result = 31 * result + (outputName != null ? outputName.hashCode() : 0); + return result; + } } diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java b/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java index 93db02101ba..ebdd7a06489 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java @@ -32,8 +32,7 @@ import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; import io.druid.data.input.Row; import io.druid.granularity.QueryGranularity; -import io.druid.query.BaseQuery; -import io.druid.query.Queries; +import io.druid.query.*; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.PostAggregator; import io.druid.query.dimension.DefaultDimensionSpec; @@ -72,7 +71,7 @@ public class GroupByQuery extends BaseQuery @JsonCreator public GroupByQuery( - @JsonProperty("dataSource") String dataSource, + @JsonProperty("dataSource") DataSource dataSource, @JsonProperty("intervals") QuerySegmentSpec querySegmentSpec, @JsonProperty("filter") DimFilter dimFilter, @JsonProperty("granularity") QueryGranularity granularity, @@ -133,7 +132,7 @@ public class GroupByQuery extends BaseQuery * have already passed in order for the object to exist. */ private GroupByQuery( - String dataSource, + DataSource dataSource, QuerySegmentSpec querySegmentSpec, DimFilter dimFilter, QueryGranularity granularity, @@ -255,7 +254,7 @@ public class GroupByQuery extends BaseQuery public static class Builder { - private String dataSource; + private DataSource dataSource; private QuerySegmentSpec querySegmentSpec; private DimFilter dimFilter; private QueryGranularity granularity; @@ -270,7 +269,9 @@ public class GroupByQuery extends BaseQuery private List orderByColumnSpecs = Lists.newArrayList(); private int limit = Integer.MAX_VALUE; - private Builder() {} + private Builder() + { + } private Builder(Builder builder) { @@ -288,12 +289,24 @@ public class GroupByQuery extends BaseQuery context = builder.context; } - public Builder setDataSource(String dataSource) + public Builder setDataSource(DataSource dataSource) { this.dataSource = dataSource; return this; } + public Builder setDataSource(String dataSource) + { + this.dataSource = new TableDataSource(dataSource); + return this; + } + + public Builder setDataSource(Query query) + { + this.dataSource = new QueryDataSource(query); + return this; + } + public Builder setInterval(Object interval) { return setQuerySegmentSpec(new LegacySegmentSpec(interval)); @@ -479,13 +492,51 @@ public class GroupByQuery extends BaseQuery public String toString() { return "GroupByQuery{" + - "limitSpec=" + limitSpec + - ", dimFilter=" + dimFilter + - ", granularity=" + granularity + - ", dimensions=" + dimensions + - ", aggregatorSpecs=" + aggregatorSpecs + - ", postAggregatorSpecs=" + postAggregatorSpecs + - ", orderByLimitFn=" + orderByLimitFn + - '}'; + "limitSpec=" + limitSpec + + ", dimFilter=" + dimFilter + + ", granularity=" + granularity + + ", dimensions=" + dimensions + + ", aggregatorSpecs=" + aggregatorSpecs + + ", postAggregatorSpecs=" + postAggregatorSpecs + + ", orderByLimitFn=" + orderByLimitFn + + '}'; + } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + GroupByQuery that = (GroupByQuery) o; + if (!partialEquals(that)) + return false; + + if (aggregatorSpecs != null ? !aggregatorSpecs.equals(that.aggregatorSpecs) : that.aggregatorSpecs != null) + return false; + if (dimFilter != null ? !dimFilter.equals(that.dimFilter) : that.dimFilter != null) return false; + if (dimensions != null ? !dimensions.equals(that.dimensions) : that.dimensions != null) return false; + if (granularity != null ? !granularity.equals(that.granularity) : that.granularity != null) return false; + if (havingSpec != null ? !havingSpec.equals(that.havingSpec) : that.havingSpec != null) return false; + if (limitSpec != null ? !limitSpec.equals(that.limitSpec) : that.limitSpec != null) return false; + if (postAggregatorSpecs != null ? !postAggregatorSpecs.equals(that.postAggregatorSpecs) : that.postAggregatorSpecs != null) + return false; + + return true; + } + + @Override + public int hashCode() + { + int result = limitSpec != null ? limitSpec.hashCode() : 0; + result = 31 * result + partialHashCode(); + result = 31 * result + (havingSpec != null ? havingSpec.hashCode() : 0); + result = 31 * result + (dimFilter != null ? dimFilter.hashCode() : 0); + result = 31 * result + (granularity != null ? granularity.hashCode() : 0); + result = 31 * result + (dimensions != null ? dimensions.hashCode() : 0); + result = 31 * result + (aggregatorSpecs != null ? aggregatorSpecs.hashCode() : 0); + result = 31 * result + (postAggregatorSpecs != null ? postAggregatorSpecs.hashCode() : 0); + result = 31 * result + (orderByLimitFn != null ? orderByLimitFn.hashCode() : 0); + return result; } } diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java index 9bae22699c6..f85cf4ceab1 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -37,14 +37,12 @@ import io.druid.data.input.MapBasedRow; import io.druid.data.input.Row; import io.druid.data.input.Rows; import io.druid.granularity.QueryGranularity; -import io.druid.query.IntervalChunkingQueryRunner; -import io.druid.query.Query; -import io.druid.query.QueryRunner; -import io.druid.query.QueryToolChest; +import io.druid.query.*; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.MetricManipulationFn; import io.druid.query.dimension.DimensionSpec; import io.druid.segment.incremental.IncrementalIndex; +import io.druid.segment.incremental.IncrementalIndexStorageAdapter; import org.joda.time.Interval; import org.joda.time.Minutes; @@ -63,13 +61,16 @@ public class GroupByQueryQueryToolChest extends QueryToolChest NO_MERGE_CONTEXT = ImmutableMap.of(GROUP_BY_MERGE_KEY, "false"); private final Supplier configSupplier; + private GroupByQueryEngine engine; // For running the outer query around a subquery @Inject public GroupByQueryQueryToolChest( - Supplier configSupplier + Supplier configSupplier, + GroupByQueryEngine engine ) { this.configSupplier = configSupplier; + this.engine = engine; } @Override @@ -90,6 +91,52 @@ public class GroupByQueryQueryToolChest extends QueryToolChest mergeGroupByResults(final GroupByQuery query, QueryRunner runner) + { + + Sequence result; + + // If there's a subquery, merge subquery results and then apply the aggregator + DataSource dataSource = query.getDataSource(); + if (dataSource instanceof QueryDataSource) { + GroupByQuery subquery; + try { + subquery = (GroupByQuery) ((QueryDataSource)dataSource).getQuery(); + } catch (ClassCastException e) { + throw new UnsupportedOperationException("Subqueries must be of type 'group by'"); + } + Sequence subqueryResult = mergeGroupByResults(subquery, runner); + IncrementalIndexStorageAdapter adapter + = new IncrementalIndexStorageAdapter(makeIncrementalIndex(subquery, subqueryResult)); + result = engine.process(query, adapter); + } + else { + result = runner.run(query); + } + + return postAggregate(query, result); + } + + private Sequence postAggregate(final GroupByQuery query, Sequence result) + { + IncrementalIndex index = makeIncrementalIndex(query, result); + // convert millis back to timestamp according to granularity to preserve time zone information + Sequence retVal = Sequences.map( + Sequences.simple(index.iterableWithPostAggregations(query.getPostAggregatorSpecs())), + new Function() + { + @Override + public Row apply(Row input) + { + final MapBasedRow row = (MapBasedRow) input; + return new MapBasedRow(query.getGranularity().toDateTime(row.getTimestampFromEpoch()), row.getEvent()); + } + } + ); + + return query.applyLimit(retVal); + } + + private IncrementalIndex makeIncrementalIndex(GroupByQuery query, Sequence result) { final GroupByQueryConfig config = configSupplier.get(); final QueryGranularity gran = query.getGranularity(); @@ -122,7 +169,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest retVal = Sequences.map( - Sequences.simple(index.iterableWithPostAggregations(query.getPostAggregatorSpecs())), - new Function() - { - @Override - public Row apply(Row input) - { - final MapBasedRow row = (MapBasedRow) input; - return new MapBasedRow(gran.toDateTime(row.getTimestampFromEpoch()), row.getEvent()); - } - } - ); - - return query.applyLimit(retVal); + return index; } @Override @@ -176,7 +209,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest preMergeQueryDecoration(QueryRunner runner) { - return new IntervalChunkingQueryRunner(runner, configSupplier.get().getChunkPeriod()); + return new SubqueryQueryRunner( + new IntervalChunkingQueryRunner(runner, configSupplier.get().getChunkPeriod())); } } diff --git a/processing/src/main/java/io/druid/query/groupby/orderby/DefaultLimitSpec.java b/processing/src/main/java/io/druid/query/groupby/orderby/DefaultLimitSpec.java index 4cd052058bf..376a982ebe3 100644 --- a/processing/src/main/java/io/druid/query/groupby/orderby/DefaultLimitSpec.java +++ b/processing/src/main/java/io/druid/query/groupby/orderby/DefaultLimitSpec.java @@ -232,4 +232,26 @@ public class DefaultLimitSpec implements LimitSpec return Sequences.simple(sorter.toTopN(materializedList, limit)); } } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + DefaultLimitSpec that = (DefaultLimitSpec) o; + + if (limit != that.limit) return false; + if (columns != null ? !columns.equals(that.columns) : that.columns != null) return false; + + return true; + } + + @Override + public int hashCode() + { + int result = columns != null ? columns.hashCode() : 0; + result = 31 * result + limit; + return result; + } } diff --git a/processing/src/main/java/io/druid/query/groupby/orderby/NoopLimitSpec.java b/processing/src/main/java/io/druid/query/groupby/orderby/NoopLimitSpec.java index 6fbc063c72d..d975e24a65f 100644 --- a/processing/src/main/java/io/druid/query/groupby/orderby/NoopLimitSpec.java +++ b/processing/src/main/java/io/druid/query/groupby/orderby/NoopLimitSpec.java @@ -46,4 +46,15 @@ public class NoopLimitSpec implements LimitSpec { return "NoopLimitSpec"; } + + @Override + public boolean equals(Object other) + { + return (other instanceof NoopLimitSpec); + } + + @Override + public int hashCode() { + return 0; + } } diff --git a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java index a5e1a1a8319..aff0d90d147 100644 --- a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java @@ -34,11 +34,7 @@ import com.metamx.common.guava.nary.BinaryFn; import com.metamx.emitter.service.ServiceMetricEvent; import io.druid.collections.OrderedMergeSequence; import io.druid.common.utils.JodaUtils; -import io.druid.query.CacheStrategy; -import io.druid.query.Query; -import io.druid.query.QueryRunner; -import io.druid.query.QueryToolChest; -import io.druid.query.ResultMergeQueryRunner; +import io.druid.query.*; import io.druid.query.aggregation.MetricManipulationFn; import io.druid.query.metadata.metadata.ColumnAnalysis; import io.druid.query.metadata.metadata.SegmentAnalysis; @@ -147,7 +143,7 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest @JsonProperty("context") Map context ) { - super(dataSource, querySegmentSpec, context); + super(new TableDataSource(dataSource), querySegmentSpec, context); this.toInclude = toInclude == null ? new AllColumnIncluderator() : toInclude; this.merge = merge == null ? false : merge; @@ -76,13 +77,41 @@ public class SegmentMetadataQuery extends BaseQuery public Query withOverriddenContext(Map contextOverride) { return new SegmentMetadataQuery( - getDataSource(), getQuerySegmentSpec(), toInclude, merge, computeOverridenContext(contextOverride) + ((TableDataSource)getDataSource()).getName(), + getQuerySegmentSpec(), toInclude, merge, computeOverridenContext(contextOverride) ); } @Override public Query withQuerySegmentSpec(QuerySegmentSpec spec) { - return new SegmentMetadataQuery(getDataSource(), spec, toInclude, merge, getContext()); + return new SegmentMetadataQuery( + ((TableDataSource)getDataSource()).getName(), + spec, toInclude, merge, getContext()); + } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + SegmentMetadataQuery that = (SegmentMetadataQuery) o; + if (!partialEquals(that)) + return false; + + if (merge != that.merge) return false; + if (toInclude != null ? !toInclude.equals(that.toInclude) : that.toInclude != null) return false; + + return true; + } + + @Override + public int hashCode() + { + int result = toInclude != null ? toInclude.hashCode() : 0; + result = 31 * result + partialHashCode(); + result = 31 * result + (merge ? 1 : 0); + return result; } } diff --git a/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java index 07f51a0f743..99e592a19f3 100644 --- a/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java @@ -37,14 +37,7 @@ import com.metamx.common.guava.Sequences; import com.metamx.common.guava.nary.BinaryFn; import com.metamx.emitter.service.ServiceMetricEvent; import io.druid.collections.OrderedMergeSequence; -import io.druid.query.CacheStrategy; -import io.druid.query.IntervalChunkingQueryRunner; -import io.druid.query.Query; -import io.druid.query.QueryRunner; -import io.druid.query.QueryToolChest; -import io.druid.query.Result; -import io.druid.query.ResultGranularTimestampComparator; -import io.druid.query.ResultMergeQueryRunner; +import io.druid.query.*; import io.druid.query.aggregation.MetricManipulationFn; import io.druid.query.filter.DimFilter; import io.druid.query.search.search.SearchHit; @@ -119,7 +112,7 @@ public class SearchQueryQueryToolChest extends QueryToolChest> @JsonCreator public SearchQuery( - @JsonProperty("dataSource") String dataSource, + @JsonProperty("dataSource") DataSource dataSource, @JsonProperty("filter") DimFilter dimFilter, @JsonProperty("granularity") QueryGranularity granularity, @JsonProperty("limit") int limit, @@ -190,4 +188,38 @@ public class SearchQuery extends BaseQuery> ", limit=" + limit + '}'; } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + SearchQuery that = (SearchQuery) o; + + if (!partialEquals(that)) + return false; + + if (limit != that.limit) return false; + if (dimFilter != null ? !dimFilter.equals(that.dimFilter) : that.dimFilter != null) return false; + if (dimensions != null ? !dimensions.equals(that.dimensions) : that.dimensions != null) return false; + if (granularity != null ? !granularity.equals(that.granularity) : that.granularity != null) return false; + if (querySpec != null ? !querySpec.equals(that.querySpec) : that.querySpec != null) return false; + if (sortSpec != null ? !sortSpec.equals(that.sortSpec) : that.sortSpec != null) return false; + + return true; + } + + @Override + public int hashCode() + { + int result = dimFilter != null ? dimFilter.hashCode() : 0; + result = 31 * result + partialHashCode(); + result = 31 * result + (sortSpec != null ? sortSpec.hashCode() : 0); + result = 31 * result + (granularity != null ? granularity.hashCode() : 0); + result = 31 * result + (dimensions != null ? dimensions.hashCode() : 0); + result = 31 * result + (querySpec != null ? querySpec.hashCode() : 0); + result = 31 * result + limit; + return result; + } } diff --git a/processing/src/main/java/io/druid/query/spec/MultipleIntervalSegmentSpec.java b/processing/src/main/java/io/druid/query/spec/MultipleIntervalSegmentSpec.java index 07d4436416c..f50aef4eaea 100644 --- a/processing/src/main/java/io/druid/query/spec/MultipleIntervalSegmentSpec.java +++ b/processing/src/main/java/io/druid/query/spec/MultipleIntervalSegmentSpec.java @@ -64,4 +64,23 @@ public class MultipleIntervalSegmentSpec implements QuerySegmentSpec "intervals=" + intervals + '}'; } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + MultipleIntervalSegmentSpec that = (MultipleIntervalSegmentSpec) o; + + if (intervals != null ? !intervals.equals(that.intervals) : that.intervals != null) return false; + + return true; + } + + @Override + public int hashCode() + { + return intervals != null ? intervals.hashCode() : 0; + } } diff --git a/processing/src/main/java/io/druid/query/spec/MultipleSpecificSegmentSpec.java b/processing/src/main/java/io/druid/query/spec/MultipleSpecificSegmentSpec.java index 59dda1e0d67..125e959e8da 100644 --- a/processing/src/main/java/io/druid/query/spec/MultipleSpecificSegmentSpec.java +++ b/processing/src/main/java/io/druid/query/spec/MultipleSpecificSegmentSpec.java @@ -91,4 +91,26 @@ public class MultipleSpecificSegmentSpec implements QuerySegmentSpec "descriptors=" + descriptors + '}'; } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + MultipleSpecificSegmentSpec that = (MultipleSpecificSegmentSpec) o; + + if (descriptors != null ? !descriptors.equals(that.descriptors) : that.descriptors != null) return false; + if (intervals != null ? !intervals.equals(that.intervals) : that.intervals != null) return false; + + return true; + } + + @Override + public int hashCode() + { + int result = descriptors != null ? descriptors.hashCode() : 0; + result = 31 * result + (intervals != null ? intervals.hashCode() : 0); + return result; + } } diff --git a/processing/src/main/java/io/druid/query/spec/SpecificSegmentSpec.java b/processing/src/main/java/io/druid/query/spec/SpecificSegmentSpec.java index be48d55e651..d83ef60b894 100644 --- a/processing/src/main/java/io/druid/query/spec/SpecificSegmentSpec.java +++ b/processing/src/main/java/io/druid/query/spec/SpecificSegmentSpec.java @@ -51,4 +51,23 @@ public class SpecificSegmentSpec implements QuerySegmentSpec { return walker.getQueryRunnerForSegments(query, Arrays.asList(descriptor)); } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + SpecificSegmentSpec that = (SpecificSegmentSpec) o; + + if (descriptor != null ? !descriptor.equals(that.descriptor) : that.descriptor != null) return false; + + return true; + } + + @Override + public int hashCode() + { + return descriptor != null ? descriptor.hashCode() : 0; + } } diff --git a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQuery.java b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQuery.java index 957390acf15..ba1e9e9a237 100644 --- a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQuery.java +++ b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQuery.java @@ -24,9 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import io.druid.query.BaseQuery; -import io.druid.query.Query; -import io.druid.query.Result; +import io.druid.query.*; import io.druid.query.spec.MultipleIntervalSegmentSpec; import io.druid.query.spec.QuerySegmentSpec; import org.joda.time.DateTime; @@ -51,7 +49,7 @@ public class TimeBoundaryQuery extends BaseQuery @JsonCreator public TimeBoundaryQuery( - @JsonProperty("dataSource") String dataSource, + @JsonProperty("dataSource") DataSource dataSource, @JsonProperty("intervals") QuerySegmentSpec querySegmentSpec, @JsonProperty("context") Map context ) @@ -164,4 +162,17 @@ public class TimeBoundaryQuery extends BaseQuery ) ); } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + return partialEquals((TimeBoundaryQuery) o); + } + + @Override + public int hashCode() { + return partialHashCode(); + } } diff --git a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java index ba2ab0feb9e..fd6c49c1aed 100644 --- a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java @@ -31,12 +31,7 @@ import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; import com.metamx.emitter.service.ServiceMetricEvent; import io.druid.collections.OrderedMergeSequence; -import io.druid.query.BySegmentSkippingQueryRunner; -import io.druid.query.CacheStrategy; -import io.druid.query.Query; -import io.druid.query.QueryRunner; -import io.druid.query.QueryToolChest; -import io.druid.query.Result; +import io.druid.query.*; import io.druid.query.aggregation.MetricManipulationFn; import io.druid.timeline.LogicalSegment; import org.joda.time.DateTime; @@ -117,7 +112,7 @@ public class TimeBoundaryQueryQueryToolChest public ServiceMetricEvent.Builder makeMetricBuilder(TimeBoundaryQuery query) { return new ServiceMetricEvent.Builder() - .setUser2(query.getDataSource()) + .setUser2(query.getDataSource().toString()) .setUser4(query.getType()) .setUser6("false"); } diff --git a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQuery.java b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQuery.java index ab5b649a896..b86ec6f6469 100644 --- a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQuery.java +++ b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQuery.java @@ -24,10 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; import com.google.common.collect.ImmutableList; import io.druid.granularity.QueryGranularity; -import io.druid.query.BaseQuery; -import io.druid.query.Queries; -import io.druid.query.Query; -import io.druid.query.Result; +import io.druid.query.*; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.PostAggregator; import io.druid.query.filter.DimFilter; @@ -48,7 +45,7 @@ public class TimeseriesQuery extends BaseQuery> @JsonCreator public TimeseriesQuery( - @JsonProperty("dataSource") String dataSource, + @JsonProperty("dataSource") DataSource dataSource, @JsonProperty("intervals") QuerySegmentSpec querySegmentSpec, @JsonProperty("filter") DimFilter dimFilter, @JsonProperty("granularity") QueryGranularity granularity, @@ -142,4 +139,34 @@ public class TimeseriesQuery extends BaseQuery> '}'; } + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + TimeseriesQuery that = (TimeseriesQuery) o; + if (!partialEquals(that)) + return false; + + if (aggregatorSpecs != null ? !aggregatorSpecs.equals(that.aggregatorSpecs) : that.aggregatorSpecs != null) + return false; + if (dimFilter != null ? !dimFilter.equals(that.dimFilter) : that.dimFilter != null) return false; + if (granularity != null ? !granularity.equals(that.granularity) : that.granularity != null) return false; + if (postAggregatorSpecs != null ? !postAggregatorSpecs.equals(that.postAggregatorSpecs) : that.postAggregatorSpecs != null) + return false; + + return true; + } + + @Override + public int hashCode() + { + int result = dimFilter != null ? dimFilter.hashCode() : 0; + result = 31 * result + partialHashCode(); + result = 31 * result + (granularity != null ? granularity.hashCode() : 0); + result = 31 * result + (aggregatorSpecs != null ? aggregatorSpecs.hashCode() : 0); + result = 31 * result + (postAggregatorSpecs != null ? postAggregatorSpecs.hashCode() : 0); + return result; + } } diff --git a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryEngine.java b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryEngine.java index b57105aa654..c6d9d31d7e4 100644 --- a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryEngine.java +++ b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryEngine.java @@ -22,6 +22,8 @@ package io.druid.query.timeseries; import com.google.common.base.Function; import com.metamx.common.guava.BaseSequence; import com.metamx.common.guava.Sequence; +import io.druid.query.DataSource; +import io.druid.query.Query; import io.druid.query.QueryRunnerHelper; import io.druid.query.Result; import io.druid.query.aggregation.Aggregator; diff --git a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java index c328822e754..7a11d42a325 100644 --- a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java @@ -32,16 +32,7 @@ import com.metamx.common.guava.nary.BinaryFn; import com.metamx.emitter.service.ServiceMetricEvent; import io.druid.collections.OrderedMergeSequence; import io.druid.granularity.QueryGranularity; -import io.druid.query.CacheStrategy; -import io.druid.query.IntervalChunkingQueryRunner; -import io.druid.query.Query; -import io.druid.query.QueryCacheHelper; -import io.druid.query.QueryConfig; -import io.druid.query.QueryRunner; -import io.druid.query.QueryToolChest; -import io.druid.query.Result; -import io.druid.query.ResultGranularTimestampComparator; -import io.druid.query.ResultMergeQueryRunner; +import io.druid.query.*; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.MetricManipulationFn; import io.druid.query.aggregation.PostAggregator; @@ -122,7 +113,7 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest> @JsonCreator public TopNQuery( - @JsonProperty("dataSource") String dataSource, + @JsonProperty("dataSource") DataSource dataSource, @JsonProperty("dimension") DimensionSpec dimensionSpec, @JsonProperty("metric") TopNMetricSpec topNMetricSpec, @JsonProperty("threshold") int threshold, @@ -208,4 +206,43 @@ public class TopNQuery extends BaseQuery> ", postAggregatorSpecs=" + postAggregatorSpecs + '}'; } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + TopNQuery topNQuery = (TopNQuery) o; + if (!partialEquals(topNQuery)) + return false; + + if (threshold != topNQuery.threshold) return false; + if (aggregatorSpecs != null ? !aggregatorSpecs.equals(topNQuery.aggregatorSpecs) : topNQuery.aggregatorSpecs != null) + return false; + if (dimFilter != null ? !dimFilter.equals(topNQuery.dimFilter) : topNQuery.dimFilter != null) return false; + if (dimensionSpec != null ? !dimensionSpec.equals(topNQuery.dimensionSpec) : topNQuery.dimensionSpec != null) + return false; + if (granularity != null ? !granularity.equals(topNQuery.granularity) : topNQuery.granularity != null) return false; + if (postAggregatorSpecs != null ? !postAggregatorSpecs.equals(topNQuery.postAggregatorSpecs) : topNQuery.postAggregatorSpecs != null) + return false; + if (topNMetricSpec != null ? !topNMetricSpec.equals(topNQuery.topNMetricSpec) : topNQuery.topNMetricSpec != null) + return false; + + return true; + } + + @Override + public int hashCode() + { + int result = dimensionSpec != null ? dimensionSpec.hashCode() : 0; + result = 31 * result + partialHashCode(); + result = 31 * result + (topNMetricSpec != null ? topNMetricSpec.hashCode() : 0); + result = 31 * result + threshold; + result = 31 * result + (dimFilter != null ? dimFilter.hashCode() : 0); + result = 31 * result + (granularity != null ? granularity.hashCode() : 0); + result = 31 * result + (aggregatorSpecs != null ? aggregatorSpecs.hashCode() : 0); + result = 31 * result + (postAggregatorSpecs != null ? postAggregatorSpecs.hashCode() : 0); + return result; + } } diff --git a/processing/src/main/java/io/druid/query/topn/TopNQueryBuilder.java b/processing/src/main/java/io/druid/query/topn/TopNQueryBuilder.java index 1bfb690f490..21efd3b8351 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNQueryBuilder.java +++ b/processing/src/main/java/io/druid/query/topn/TopNQueryBuilder.java @@ -21,6 +21,8 @@ package io.druid.query.topn; import com.google.common.collect.Lists; import io.druid.granularity.QueryGranularity; +import io.druid.query.DataSource; +import io.druid.query.TableDataSource; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.PostAggregator; import io.druid.query.dimension.DefaultDimensionSpec; @@ -58,7 +60,7 @@ import java.util.Map; */ public class TopNQueryBuilder { - private String dataSource; + private DataSource dataSource; private DimensionSpec dimensionSpec; private TopNMetricSpec topNMetricSpec; private int threshold; @@ -71,7 +73,7 @@ public class TopNQueryBuilder public TopNQueryBuilder() { - dataSource = ""; + dataSource = null; dimensionSpec = null; topNMetricSpec = null; threshold = 0; @@ -83,7 +85,7 @@ public class TopNQueryBuilder context = null; } - public String getDataSource() + public DataSource getDataSource() { return dataSource; } @@ -152,7 +154,7 @@ public class TopNQueryBuilder public TopNQueryBuilder copy(TopNQuery query) { return new TopNQueryBuilder() - .dataSource(query.getDataSource()) + .dataSource(query.getDataSource().toString()) .dimension(query.getDimensionSpec()) .metric(query.getTopNMetricSpec()) .threshold(query.getThreshold()) @@ -180,6 +182,12 @@ public class TopNQueryBuilder } public TopNQueryBuilder dataSource(String d) + { + dataSource = new TableDataSource(d); + return this; + } + + public TopNQueryBuilder dataSource(DataSource d) { dataSource = d; return this; diff --git a/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java index 6e1d816cbd9..6f0400a1f1b 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java @@ -128,7 +128,7 @@ public class TopNQueryQueryToolChest extends QueryToolChestnewArrayList(new DefaultDimensionSpec("quality", "alias"))) + .setAggregatorSpecs( + Arrays.asList( + QueryRunnerTestHelper.rowsCount, + new LongSumAggregatorFactory("idx", "index") + ) + ) + .setGranularity(QueryRunnerTestHelper.dayGran) + .build(); + + String dataSourceJSON = "{\"type\":\"query\", \"query\":" + jsonMapper.writeValueAsString(query) + "}"; + + DataSource dataSource = jsonMapper.readValue(dataSourceJSON, DataSource.class); + Assert.assertEquals(new QueryDataSource(query), dataSource); + } + +} diff --git a/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java b/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java index 58c1e97034c..91243d22679 100644 --- a/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java +++ b/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java @@ -55,6 +55,7 @@ public class QueryRunnerTestHelper public static final String qualityDimension = "quality"; public static final String placementishDimension = "placementish"; public static final String indexMetric = "index"; + public static final String addRowsIndexConstantMetric = "addRowsIndexConstant"; public static final CountAggregatorFactory rowsCount = new CountAggregatorFactory("rows"); public static final LongSumAggregatorFactory indexLongSum = new LongSumAggregatorFactory("index", "index"); public static final DoubleSumAggregatorFactory indexDoubleSum = new DoubleSumAggregatorFactory("index", "index"); @@ -94,6 +95,9 @@ public class QueryRunnerTestHelper public static final QuerySegmentSpec firstToThird = new MultipleIntervalSegmentSpec( Arrays.asList(new Interval("2011-04-01T00:00:00.000Z/2011-04-03T00:00:00.000Z")) ); + public static final QuerySegmentSpec secondOnly = new MultipleIntervalSegmentSpec( + Arrays.asList(new Interval("2011-04-02T00:00:00.000Z/P1D")) + ); public static final QuerySegmentSpec fullOnInterval = new MultipleIntervalSegmentSpec( Arrays.asList(new Interval("1970-01-01T00:00:00.000Z/2020-01-01T00:00:00.000Z")) ); diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java index e1556e94da7..102bf24f3d3 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java @@ -38,9 +38,11 @@ import io.druid.granularity.QueryGranularity; import io.druid.query.Query; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerTestHelper; +import io.druid.query.QueryToolChest; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.DoubleSumAggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory; +import io.druid.query.aggregation.MaxAggregatorFactory; import io.druid.query.dimension.DefaultDimensionSpec; import io.druid.query.dimension.DimensionSpec; import io.druid.query.filter.RegexDimFilter; @@ -90,22 +92,24 @@ public class GroupByQueryRunnerTest config.setMaxIntermediateRows(10000); final Supplier configSupplier = Suppliers.ofInstance(config); - final GroupByQueryRunnerFactory factory = new GroupByQueryRunnerFactory( - new GroupByQueryEngine( - configSupplier, - new StupidPool( - new Supplier() - { - @Override - public ByteBuffer get() - { - return ByteBuffer.allocate(1024 * 1024); - } - } - ) - ), + final GroupByQueryEngine engine = new GroupByQueryEngine( configSupplier, - new GroupByQueryQueryToolChest(configSupplier) + new StupidPool( + new Supplier() + { + @Override + public ByteBuffer get() + { + return ByteBuffer.allocate(1024 * 1024); + } + } + ) + ); + + final GroupByQueryRunnerFactory factory = new GroupByQueryRunnerFactory( + engine, + configSupplier, + new GroupByQueryQueryToolChest(configSupplier, engine) ); return Lists.newArrayList( @@ -167,8 +171,7 @@ public class GroupByQueryRunnerTest createExpectedRow("2011-04-02", "alias", "travel", "rows", 1L, "idx", 126L) ); - Iterable results = Sequences.toList(runner.run(query), Lists.newArrayList()); - + Iterable results = runQuery(query); TestHelper.assertExpectedObjects(expectedResults, results, ""); } @@ -178,33 +181,33 @@ public class GroupByQueryRunnerTest DateTimeZone tz = DateTimeZone.forID("America/Los_Angeles"); GroupByQuery query = GroupByQuery.builder() - .setDataSource(QueryRunnerTestHelper.dataSource) - .setInterval("2011-03-31T00:00:00-07:00/2011-04-02T00:00:00-07:00") - .setDimensions( - Lists.newArrayList( - (DimensionSpec) new DefaultDimensionSpec( - "quality", - "alias" - ) - ) - ) - .setAggregatorSpecs( - Arrays.asList( - QueryRunnerTestHelper.rowsCount, - new LongSumAggregatorFactory( - "idx", - "index" - ) - ) - ) - .setGranularity( - new PeriodGranularity( - new Period("P1D"), - null, - tz - ) - ) - .build(); + .setDataSource(QueryRunnerTestHelper.dataSource) + .setInterval("2011-03-31T00:00:00-07:00/2011-04-02T00:00:00-07:00") + .setDimensions( + Lists.newArrayList( + (DimensionSpec) new DefaultDimensionSpec( + "quality", + "alias" + ) + ) + ) + .setAggregatorSpecs( + Arrays.asList( + QueryRunnerTestHelper.rowsCount, + new LongSumAggregatorFactory( + "idx", + "index" + ) + ) + ) + .setGranularity( + new PeriodGranularity( + new Period("P1D"), + null, + tz + ) + ) + .build(); List expectedResults = Arrays.asList( createExpectedRow(new DateTime("2011-03-31", tz), "alias", "automotive", "rows", 1L, "idx", 135L), @@ -228,11 +231,7 @@ public class GroupByQueryRunnerTest createExpectedRow(new DateTime("2011-04-01", tz), "alias", "travel", "rows", 1L, "idx", 126L) ); - Iterable results = Sequences.toList( - runner.run(query), - Lists.newArrayList() - ); - + Iterable results = runQuery(query); TestHelper.assertExpectedObjects(expectedResults, results, ""); } @@ -661,7 +660,21 @@ public class GroupByQueryRunnerTest createExpectedRow("2011-04-01", "quality", "automotive", "rows", 2L) ); - QueryRunner mergeRunner = new GroupByQueryQueryToolChest(configSupplier).mergeResults(runner); + final GroupByQueryEngine engine = new GroupByQueryEngine( + configSupplier, + new StupidPool( + new Supplier() + { + @Override + public ByteBuffer get() + { + return ByteBuffer.allocate(1024 * 1024); + } + } + ) + ); + + QueryRunner mergeRunner = new GroupByQueryQueryToolChest(configSupplier, engine).mergeResults(runner); TestHelper.assertExpectedObjects(expectedResults, mergeRunner.run(query), "no-limit"); } @@ -696,7 +709,21 @@ public class GroupByQueryRunnerTest ); TestHelper.assertExpectedObjects(expectedResults, runner.run(query), "normal"); - QueryRunner mergeRunner = new GroupByQueryQueryToolChest(configSupplier).mergeResults(runner); + final GroupByQueryEngine engine = new GroupByQueryEngine( + configSupplier, + new StupidPool( + new Supplier() + { + @Override + public ByteBuffer get() + { + return ByteBuffer.allocate(1024 * 1024); + } + } + ) + ); + + QueryRunner mergeRunner = new GroupByQueryQueryToolChest(configSupplier, engine).mergeResults(runner); TestHelper.assertExpectedObjects(expectedResults, mergeRunner.run(query), "no-limit"); } @@ -731,10 +758,166 @@ public class GroupByQueryRunnerTest ); TestHelper.assertExpectedObjects(expectedResults, runner.run(query), "normal"); - QueryRunner mergeRunner = new GroupByQueryQueryToolChest(configSupplier).mergeResults(runner); + final GroupByQueryEngine engine = new GroupByQueryEngine( + configSupplier, + new StupidPool( + new Supplier() + { + @Override + public ByteBuffer get() + { + return ByteBuffer.allocate(1024 * 1024); + } + } + ) + ); + + QueryRunner mergeRunner = new GroupByQueryQueryToolChest(configSupplier, engine).mergeResults(runner); TestHelper.assertExpectedObjects(expectedResults, mergeRunner.run(query), "no-limit"); } + // A subquery identical to the query should yield identical results + @Test + public void testIdenticalSubquery() + { + GroupByQuery subquery = GroupByQuery + .builder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird) + .setDimensions(Lists.newArrayList(new DefaultDimensionSpec("quality", "alias"))) + .setAggregatorSpecs( + Arrays.asList( + QueryRunnerTestHelper.rowsCount, + new LongSumAggregatorFactory("idx", "index") + ) + ) + .setGranularity(QueryRunnerTestHelper.dayGran) + .build(); + + GroupByQuery query = GroupByQuery + .builder() + .setDataSource(subquery) + .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird) + .setDimensions(Lists.newArrayList(new DefaultDimensionSpec("alias", "alias"))) + .setAggregatorSpecs( + Arrays.asList( + new LongSumAggregatorFactory("rows", "rows"), + new LongSumAggregatorFactory("idx", "idx") + ) + ) + .setGranularity(QueryRunnerTestHelper.dayGran) + .build(); + + List expectedResults = Arrays.asList( + createExpectedRow("2011-04-01", "alias", "automotive", "rows", 1L, "idx", 135L), + createExpectedRow("2011-04-01", "alias", "business", "rows", 1L, "idx", 118L), + createExpectedRow("2011-04-01", "alias", "entertainment", "rows", 1L, "idx", 158L), + createExpectedRow("2011-04-01", "alias", "health", "rows", 1L, "idx", 120L), + createExpectedRow("2011-04-01", "alias", "mezzanine", "rows", 3L, "idx", 2870L), + createExpectedRow("2011-04-01", "alias", "news", "rows", 1L, "idx", 121L), + createExpectedRow("2011-04-01", "alias", "premium", "rows", 3L, "idx", 2900L), + createExpectedRow("2011-04-01", "alias", "technology", "rows", 1L, "idx", 78L), + createExpectedRow("2011-04-01", "alias", "travel", "rows", 1L, "idx", 119L), + + createExpectedRow("2011-04-02", "alias", "automotive", "rows", 1L, "idx", 147L), + createExpectedRow("2011-04-02", "alias", "business", "rows", 1L, "idx", 112L), + createExpectedRow("2011-04-02", "alias", "entertainment", "rows", 1L, "idx", 166L), + createExpectedRow("2011-04-02", "alias", "health", "rows", 1L, "idx", 113L), + createExpectedRow("2011-04-02", "alias", "mezzanine", "rows", 3L, "idx", 2447L), + createExpectedRow("2011-04-02", "alias", "news", "rows", 1L, "idx", 114L), + createExpectedRow("2011-04-02", "alias", "premium", "rows", 3L, "idx", 2505L), + createExpectedRow("2011-04-02", "alias", "technology", "rows", 1L, "idx", 97L), + createExpectedRow("2011-04-02", "alias", "travel", "rows", 1L, "idx", 126L) + ); + + // Subqueries are handled by the ToolChest + Iterable results = runQuery(query); + TestHelper.assertExpectedObjects(expectedResults, results, ""); + } + + @Test + public void testDifferentGroupingSubquery() + { + GroupByQuery subquery = GroupByQuery + .builder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird) + .setDimensions(Lists.newArrayList(new DefaultDimensionSpec("quality", "alias"))) + .setAggregatorSpecs( + Arrays.asList( + QueryRunnerTestHelper.rowsCount, + new LongSumAggregatorFactory("idx", "index") + ) + ) + .setGranularity(QueryRunnerTestHelper.dayGran) + .build(); + + GroupByQuery query = GroupByQuery + .builder() + .setDataSource(subquery) + .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird) + .setAggregatorSpecs( + Arrays.asList( + new MaxAggregatorFactory("idx", "idx") + ) + ) + .setGranularity(QueryRunnerTestHelper.dayGran) + .build(); + + List expectedResults = Arrays.asList( + createExpectedRow("2011-04-01", "idx", 2900.0), + createExpectedRow("2011-04-02", "idx", 2505.0) + ); + + Iterable results = runQuery(query); + TestHelper.assertExpectedObjects(expectedResults, results, ""); + } + + @Test + public void testDifferentIntervalSubquery() + { + GroupByQuery subquery = GroupByQuery + .builder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird) + .setDimensions(Lists.newArrayList(new DefaultDimensionSpec("quality", "alias"))) + .setAggregatorSpecs( + Arrays.asList( + QueryRunnerTestHelper.rowsCount, + new LongSumAggregatorFactory("idx", "index") + ) + ) + .setGranularity(QueryRunnerTestHelper.dayGran) + .build(); + + GroupByQuery query = GroupByQuery + .builder() + .setDataSource(subquery) + .setQuerySegmentSpec(QueryRunnerTestHelper.secondOnly) + .setAggregatorSpecs( + Arrays.asList( + new MaxAggregatorFactory("idx", "idx") + ) + ) + .setGranularity(QueryRunnerTestHelper.dayGran) + .build(); + + List expectedResults = Arrays.asList( + createExpectedRow("2011-04-02", "idx", 2505.0) + ); + + Iterable results = runQuery(query); + TestHelper.assertExpectedObjects(expectedResults, results, ""); + } + + private Iterable runQuery(GroupByQuery query) + { + QueryToolChest toolChest = factory.getToolchest(); + Sequence queryResult = toolChest.mergeResults(toolChest.preMergeQueryDecoration(runner)).run(query); + return Sequences.toList(queryResult, Lists.newArrayList()); + } + + private Row createExpectedRow(final String timestamp, Object... vals) { return createExpectedRow(new DateTime(timestamp), vals); diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryTest.java new file mode 100644 index 00000000000..8557ba04c24 --- /dev/null +++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryTest.java @@ -0,0 +1,68 @@ +/* + * Druid - a distributed column store. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * + * This file Copyright (C) 2014 N3TWORK, Inc. and contributed to the Druid project + * under the Druid Corporate Contributor License Agreement. + */ + +package io.druid.query.groupby; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Lists; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.query.Druids; +import io.druid.query.Query; +import io.druid.query.QueryRunnerTestHelper; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.LongSumAggregatorFactory; +import io.druid.query.aggregation.PostAggregator; +import io.druid.query.dimension.DefaultDimensionSpec; +import io.druid.query.dimension.DimensionSpec; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.util.Arrays; + +public class GroupByQueryTest +{ + private static final ObjectMapper jsonMapper = new DefaultObjectMapper(); + + @Test + public void testQuerySerialization() throws IOException + { + Query query = GroupByQuery + .builder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird) + .setDimensions(Lists.newArrayList(new DefaultDimensionSpec("quality", "alias"))) + .setAggregatorSpecs( + Arrays.asList( + QueryRunnerTestHelper.rowsCount, + new LongSumAggregatorFactory("idx", "index") + ) + ) + .setGranularity(QueryRunnerTestHelper.dayGran) + .build(); + + String json = jsonMapper.writeValueAsString(query); + Query serdeQuery = jsonMapper.readValue(json, Query.class); + + Assert.assertEquals(query, serdeQuery); + } + +} diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByTimeseriesQueryRunnerTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByTimeseriesQueryRunnerTest.java index 562527556a9..2538e91bc76 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByTimeseriesQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByTimeseriesQueryRunnerTest.java @@ -56,22 +56,24 @@ public class GroupByTimeseriesQueryRunnerTest extends TimeseriesQueryRunnerTest config.setMaxIntermediateRows(10000); final Supplier configSupplier = Suppliers.ofInstance(config); - final GroupByQueryRunnerFactory factory = new GroupByQueryRunnerFactory( - new GroupByQueryEngine( - configSupplier, - new StupidPool( - new Supplier() - { - @Override - public ByteBuffer get() - { - return ByteBuffer.allocate(1024 * 1024); - } - } - ) - ), + final GroupByQueryEngine engine = new GroupByQueryEngine( configSupplier, - new GroupByQueryQueryToolChest(configSupplier) + new StupidPool( + new Supplier() + { + @Override + public ByteBuffer get() + { + return ByteBuffer.allocate(1024 * 1024); + } + } + ) + ); + + final GroupByQueryRunnerFactory factory = new GroupByQueryRunnerFactory( + engine, + configSupplier, + new GroupByQueryQueryToolChest(configSupplier, engine) ); final Collection objects = QueryRunnerTestHelper.makeQueryRunners(factory); @@ -95,13 +97,13 @@ public class GroupByTimeseriesQueryRunnerTest extends TimeseriesQueryRunnerTest return Sequences.map( groupByRunner.run( GroupByQuery.builder() - .setDataSource(tsQuery.getDataSource()) - .setQuerySegmentSpec(tsQuery.getQuerySegmentSpec()) - .setGranularity(tsQuery.getGranularity()) - .setDimFilter(tsQuery.getDimensionsFilter()) - .setAggregatorSpecs(tsQuery.getAggregatorSpecs()) - .setPostAggregatorSpecs(tsQuery.getPostAggregatorSpecs()) - .build() + .setDataSource(tsQuery.getDataSource()) + .setQuerySegmentSpec(tsQuery.getQuerySegmentSpec()) + .setGranularity(tsQuery.getGranularity()) + .setDimFilter(tsQuery.getDimensionsFilter()) + .setAggregatorSpecs(tsQuery.getAggregatorSpecs()) + .setPostAggregatorSpecs(tsQuery.getPostAggregatorSpecs()) + .build() ), new Function>() { diff --git a/processing/src/test/java/io/druid/query/search/SearchQueryTest.java b/processing/src/test/java/io/druid/query/search/SearchQueryTest.java new file mode 100644 index 00000000000..7a5695a6ab6 --- /dev/null +++ b/processing/src/test/java/io/druid/query/search/SearchQueryTest.java @@ -0,0 +1,62 @@ +/* + * Druid - a distributed column store. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * + * This file Copyright (C) 2014 N3TWORK, Inc. and contributed to the Druid project + * under the Druid Corporate Contributor License Agreement. + */ + +package io.druid.query.search; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Lists; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.query.Druids; +import io.druid.query.Query; +import io.druid.query.QueryRunnerTestHelper; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.LongSumAggregatorFactory; +import io.druid.query.dimension.DefaultDimensionSpec; +import io.druid.query.dimension.DimensionSpec; +import io.druid.query.groupby.GroupByQuery; +import io.druid.query.search.search.SearchQuery; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.util.Arrays; + +public class SearchQueryTest +{ + private static final ObjectMapper jsonMapper = new DefaultObjectMapper(); + + @Test + public void testQuerySerialization() throws IOException + { + Query query = Druids.newSearchQueryBuilder() + .dataSource(QueryRunnerTestHelper.dataSource) + .granularity(QueryRunnerTestHelper.allGran) + .intervals(QueryRunnerTestHelper.fullOnInterval) + .query("a") + .build(); + + String json = jsonMapper.writeValueAsString(query); + Query serdeQuery = jsonMapper.readValue(json, Query.class); + + Assert.assertEquals(query, serdeQuery); + } + +} diff --git a/processing/src/test/java/io/druid/query/timeboundary/TimeBoundaryQueryTest.java b/processing/src/test/java/io/druid/query/timeboundary/TimeBoundaryQueryTest.java new file mode 100644 index 00000000000..1f7fb96a2b8 --- /dev/null +++ b/processing/src/test/java/io/druid/query/timeboundary/TimeBoundaryQueryTest.java @@ -0,0 +1,51 @@ +/* + * Druid - a distributed column store. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * + * This file Copyright (C) 2014 N3TWORK, Inc. and contributed to the Druid project + * under the Druid Corporate Contributor License Agreement. + */ + +package io.druid.query.timeboundary; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.query.Druids; +import io.druid.query.Query; +import io.druid.query.QueryRunnerTestHelper; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; + +public class TimeBoundaryQueryTest +{ + private static final ObjectMapper jsonMapper = new DefaultObjectMapper(); + + @Test + public void testQuerySerialization() throws IOException + { + Query query = Druids.newTimeBoundaryQueryBuilder() + .dataSource("testing") + .build(); + + String json = jsonMapper.writeValueAsString(query); + Query serdeQuery = jsonMapper.readValue(json, Query.class); + + Assert.assertEquals(query, serdeQuery); + } + +} diff --git a/processing/src/test/java/io/druid/query/timeseries/TimeseriesQueryTest.java b/processing/src/test/java/io/druid/query/timeseries/TimeseriesQueryTest.java new file mode 100644 index 00000000000..43e22ddfb94 --- /dev/null +++ b/processing/src/test/java/io/druid/query/timeseries/TimeseriesQueryTest.java @@ -0,0 +1,62 @@ +/* + * Druid - a distributed column store. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * + * This file Copyright (C) 2014 N3TWORK, Inc. and contributed to the Druid project + * under the Druid Corporate Contributor License Agreement. + */ + +package io.druid.query.timeseries; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.query.Druids; +import io.druid.query.Query; +import io.druid.query.QueryRunnerTestHelper; +import io.druid.query.aggregation.PostAggregator; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.util.Arrays; + +public class TimeseriesQueryTest +{ + private static final ObjectMapper jsonMapper = new DefaultObjectMapper(); + + @Test + public void testQuerySerialization() throws IOException + { + Query query = Druids.newTimeseriesQueryBuilder() + .dataSource(QueryRunnerTestHelper.dataSource) + .granularity(QueryRunnerTestHelper.dayGran) + .intervals(QueryRunnerTestHelper.fullOnInterval) + .aggregators( + Arrays.asList( + QueryRunnerTestHelper.rowsCount, + QueryRunnerTestHelper.indexDoubleSum + ) + ) + .postAggregators(Arrays.asList(QueryRunnerTestHelper.addRowsIndexConstant)) + .build(); + + String json = jsonMapper.writeValueAsString(query); + Query serdeQuery = jsonMapper.readValue(json, Query.class); + + Assert.assertEquals(query, serdeQuery); + } + +} diff --git a/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerTest.java b/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerTest.java index 839f82c3cbc..af0988fc100 100644 --- a/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerTest.java @@ -61,6 +61,8 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import static io.druid.query.QueryRunnerTestHelper.*; + /** */ @RunWith(Parameterized.class) @@ -108,58 +110,6 @@ public class TopNQueryRunnerTest this.runner = runner; } - final String dataSource = "testing"; - final QueryGranularity gran = QueryGranularity.DAY; - final QueryGranularity allGran = QueryGranularity.ALL; - final String providerDimension = "provider"; - final String qualityDimension = "quality"; - final String placementishDimension = "placementish"; - final String indexMetric = "index"; - final String addRowsIndexConstantMetric = "addRowsIndexConstant"; - final CountAggregatorFactory rowsCount = new CountAggregatorFactory("rows"); - final LongSumAggregatorFactory indexLongSum = new LongSumAggregatorFactory("index", "index"); - final DoubleSumAggregatorFactory indexDoubleSum = new DoubleSumAggregatorFactory("index", "index"); - final ConstantPostAggregator constant = new ConstantPostAggregator("const", 1L); - final FieldAccessPostAggregator rowsPostAgg = new FieldAccessPostAggregator("rows", "rows"); - final FieldAccessPostAggregator indexPostAgg = new FieldAccessPostAggregator("index", "index"); - final ArithmeticPostAggregator addRowsIndexConstant = - new ArithmeticPostAggregator( - "addRowsIndexConstant", "+", Lists.newArrayList(constant, rowsPostAgg, indexPostAgg) - ); - final List commonAggregators = Arrays.asList(rowsCount, indexDoubleSum); - - - final String[] expectedFullOnIndexValues = new String[]{ - "4500.0", "6077.949111938477", "4922.488838195801", "5726.140853881836", "4698.468170166016", - "4651.030891418457", "4398.145851135254", "4596.068244934082", "4434.630561828613", "0.0", - "6162.801361083984", "5590.292701721191", "4994.298484802246", "5179.679672241211", "6288.556800842285", - "6025.663551330566", "5772.855537414551", "5346.517524719238", "5497.331253051758", "5909.684387207031", - "5862.711364746094", "5958.373008728027", "5224.882194519043", "5456.789611816406", "5456.095397949219", - "4642.481948852539", "5023.572692871094", "5155.821723937988", "5350.3723220825195", "5236.997489929199", - "4910.097717285156", "4507.608840942383", "4659.80500793457", "5354.878845214844", "4945.796455383301", - "6459.080368041992", "4390.493583679199", "6545.758262634277", "6922.801231384277", "6023.452911376953", - "6812.107475280762", "6368.713348388672", "6381.748748779297", "5631.245086669922", "4976.192253112793", - "6541.463027954102", "5983.8513107299805", "5967.189498901367", "5567.139289855957", "4863.5944747924805", - "4681.164360046387", "6122.321441650391", "5410.308860778809", "4846.676376342773", "5333.872688293457", - "5013.053741455078", "4836.85563659668", "5264.486434936523", "4581.821243286133", "4680.233596801758", - "4771.363662719727", "5038.354717254639", "4816.808464050293", "4684.095504760742", "5023.663467407227", - "5889.72257232666", "4984.973915100098", "5664.220512390137", "5572.653915405273", "5537.123138427734", - "5980.422874450684", "6243.834693908691", "5372.147285461426", "5690.728981018066", "5827.796455383301", - "6141.0769119262695", "6082.3237228393555", "5678.771339416504", "6814.467971801758", "6626.151596069336", - "5833.2095947265625", "4679.222328186035", "5367.9403076171875", "5410.445640563965", "5689.197135925293", - "5240.5018310546875", "4790.912239074707", "4992.670921325684", "4796.888023376465", "5479.439590454102", - "5506.567192077637", "4743.144546508789", "4913.282669067383", "4723.869743347168" - }; - - final DateTime skippedDay = new DateTime("2011-01-21T00:00:00.000Z"); - - final QuerySegmentSpec firstToThird = new MultipleIntervalSegmentSpec( - Arrays.asList(new Interval("2011-04-01T00:00:00.000Z/2011-04-03T00:00:00.000Z")) - ); - final QuerySegmentSpec fullOnInterval = new MultipleIntervalSegmentSpec( - Arrays.asList(new Interval("1970-01-01T00:00:00.000Z/2020-01-01T00:00:00.000Z")) - ); - @Test public void testFullOnTopN() @@ -191,7 +141,7 @@ public class TopNQueryRunnerTest new TopNResultValue( Arrays.>asList( ImmutableMap.builder() - .put("provider", "total_market") + .put(providerDimension, "total_market") .put("rows", 186L) .put("index", 215679.82879638672D) .put("addRowsIndexConstant", 215866.82879638672D) @@ -199,7 +149,7 @@ public class TopNQueryRunnerTest .put("minIndex", 792.3260498046875D) .build(), ImmutableMap.builder() - .put("provider", "upfront") + .put(providerDimension, "upfront") .put("rows", 186L) .put("index", 192046.1060180664D) .put("addRowsIndexConstant", 192233.1060180664D) @@ -207,7 +157,7 @@ public class TopNQueryRunnerTest .put("minIndex", 545.9906005859375D) .build(), ImmutableMap.builder() - .put("provider", "spot") + .put(providerDimension, "spot") .put("rows", 837L) .put("index", 95606.57232284546D) .put("addRowsIndexConstant", 96444.57232284546D) @@ -252,7 +202,7 @@ public class TopNQueryRunnerTest new TopNResultValue( Arrays.>asList( ImmutableMap.builder() - .put("provider", "total_market") + .put(providerDimension, "total_market") .put("rows", 186L) .put("index", 215679.82879638672D) .put("addRowsIndexConstant", 215866.82879638672D) @@ -260,7 +210,7 @@ public class TopNQueryRunnerTest .put("minIndex", 792.3260498046875D) .build(), ImmutableMap.builder() - .put("provider", "upfront") + .put(providerDimension, "upfront") .put("rows", 186L) .put("index", 192046.1060180664D) .put("addRowsIndexConstant", 192233.1060180664D) @@ -268,7 +218,7 @@ public class TopNQueryRunnerTest .put("minIndex", 545.9906005859375D) .build(), ImmutableMap.builder() - .put("provider", "spot") + .put(providerDimension, "spot") .put("rows", 837L) .put("index", 95606.57232284546D) .put("addRowsIndexConstant", 96444.57232284546D) @@ -304,19 +254,19 @@ public class TopNQueryRunnerTest new TopNResultValue( Arrays.>asList( ImmutableMap.of( - "provider", "total_market", + providerDimension, "total_market", "rows", 4L, "index", 5351.814697265625D, "addRowsIndexConstant", 5356.814697265625D ), ImmutableMap.of( - "provider", "upfront", + providerDimension, "upfront", "rows", 4L, "index", 4875.669677734375D, "addRowsIndexConstant", 4880.669677734375D ), ImmutableMap.of( - "provider", "spot", + providerDimension, "spot", "rows", 18L, "index", 2231.8768157958984D, "addRowsIndexConstant", 2250.8768157958984D @@ -350,19 +300,19 @@ public class TopNQueryRunnerTest new TopNResultValue( Arrays.>asList( ImmutableMap.of( - "provider", "total_market", + providerDimension, "total_market", "rows", 4L, "index", 5351.814697265625D, "addRowsIndexConstant", 5356.814697265625D ), ImmutableMap.of( - "provider", "upfront", + providerDimension, "upfront", "rows", 4L, "index", 4875.669677734375D, "addRowsIndexConstant", 4880.669677734375D ), ImmutableMap.of( - "provider", "spot", + providerDimension, "spot", "rows", 18L, "index", 2231.8768157958984D, "addRowsIndexConstant", 2250.8768157958984D @@ -396,13 +346,13 @@ public class TopNQueryRunnerTest new TopNResultValue( Arrays.>asList( ImmutableMap.of( - "provider", "total_market", + providerDimension, "total_market", "rows", 4L, "index", 5351.814697265625D, "addRowsIndexConstant", 5356.814697265625D ), ImmutableMap.of( - "provider", "upfront", + providerDimension, "upfront", "rows", 4L, "index", 4875.669677734375D, "addRowsIndexConstant", 4880.669677734375D @@ -436,7 +386,7 @@ public class TopNQueryRunnerTest new TopNResultValue( Arrays.>asList( ImmutableMap.of( - "provider", "upfront", + providerDimension, "upfront", "rows", 4L, "index", 4875.669677734375D, "addRowsIndexConstant", 4880.669677734375D @@ -470,19 +420,19 @@ public class TopNQueryRunnerTest new TopNResultValue( Arrays.>asList( ImmutableMap.of( - "provider", "upfront", + providerDimension, "upfront", "rows", 2L, "index", 2591.68359375D, "addRowsIndexConstant", 2594.68359375D ), ImmutableMap.of( - "provider", "total_market", + providerDimension, "total_market", "rows", 2L, "index", 2508.39599609375D, "addRowsIndexConstant", 2511.39599609375D ), ImmutableMap.of( - "provider", "spot", + providerDimension, "spot", "rows", 2L, "index", 220.63774871826172D, "addRowsIndexConstant", 223.63774871826172D @@ -520,19 +470,19 @@ public class TopNQueryRunnerTest new TopNResultValue( Arrays.>asList( ImmutableMap.of( - "provider", "upfront", + providerDimension, "upfront", "rows", 1L, "index", new Float(1447.341160).doubleValue(), "addRowsIndexConstant", new Float(1449.341160).doubleValue() ), ImmutableMap.of( - "provider", "total_market", + providerDimension, "total_market", "rows", 1L, "index", new Float(1314.839715).doubleValue(), "addRowsIndexConstant", new Float(1316.839715).doubleValue() ), ImmutableMap.of( - "provider", "spot", + providerDimension, "spot", "rows", 1L, "index", new Float(109.705815).doubleValue(), "addRowsIndexConstant", new Float(111.705815).doubleValue() @@ -566,13 +516,13 @@ public class TopNQueryRunnerTest new TopNResultValue( Arrays.>asList( ImmutableMap.of( - "provider", "total_market", + providerDimension, "total_market", "rows", 4L, "index", 5351.814697265625D, "addRowsIndexConstant", 5356.814697265625D ), ImmutableMap.of( - "provider", "upfront", + providerDimension, "upfront", "rows", 4L, "index", 4875.669677734375D, "addRowsIndexConstant", 4880.669677734375D @@ -877,19 +827,19 @@ public class TopNQueryRunnerTest new TopNResultValue( Arrays.>asList( ImmutableMap.of( - "provider", "spot", + providerDimension, "spot", "rows", 18L, "index", 2231.8768157958984D, "addRowsIndexConstant", 2250.8768157958984D ), ImmutableMap.of( - "provider", "total_market", + providerDimension, "total_market", "rows", 4L, "index", 5351.814697265625D, "addRowsIndexConstant", 5356.814697265625D ), ImmutableMap.of( - "provider", "upfront", + providerDimension, "upfront", "rows", 4L, "index", 4875.669677734375D, "addRowsIndexConstant", 4880.669677734375D @@ -922,13 +872,13 @@ public class TopNQueryRunnerTest new TopNResultValue( Arrays.>asList( ImmutableMap.of( - "provider", "total_market", + providerDimension, "total_market", "rows", 4L, "index", 5351.814697265625D, "addRowsIndexConstant", 5356.814697265625D ), ImmutableMap.of( - "provider", "upfront", + providerDimension, "upfront", "rows", 4L, "index", 4875.669677734375D, "addRowsIndexConstant", 4880.669677734375D @@ -961,13 +911,13 @@ public class TopNQueryRunnerTest new TopNResultValue( Arrays.>asList( ImmutableMap.of( - "provider", "total_market", + providerDimension, "total_market", "rows", 4L, "index", 5351.814697265625D, "addRowsIndexConstant", 5356.814697265625D ), ImmutableMap.of( - "provider", "upfront", + providerDimension, "upfront", "rows", 4L, "index", 4875.669677734375D, "addRowsIndexConstant", 4880.669677734375D @@ -1004,19 +954,19 @@ public class TopNQueryRunnerTest new TopNResultValue( Arrays.>asList( ImmutableMap.of( - "provider", "s", + providerDimension, "s", "rows", 18L, "index", 2231.8768157958984D, "addRowsIndexConstant", 2250.8768157958984D ), ImmutableMap.of( - "provider", "t", + providerDimension, "t", "rows", 4L, "index", 5351.814697265625D, "addRowsIndexConstant", 5356.814697265625D ), ImmutableMap.of( - "provider", "u", + providerDimension, "u", "rows", 4L, "index", 4875.669677734375D, "addRowsIndexConstant", 4880.669677734375D @@ -1050,19 +1000,19 @@ public class TopNQueryRunnerTest new TopNResultValue( Arrays.>asList( ImmutableMap.of( - "provider", "spot", + providerDimension, "spot", "rows", 18L, "index", 2231.8768157958984D, "addRowsIndexConstant", 2250.8768157958984D ), ImmutableMap.of( - "provider", "upfront", + providerDimension, "upfront", "rows", 4L, "index", 4875.669677734375D, "addRowsIndexConstant", 4880.669677734375D ), ImmutableMap.of( - "provider", "total_market", + providerDimension, "total_market", "rows", 4L, "index", 5351.814697265625D, "addRowsIndexConstant", 5356.814697265625D diff --git a/processing/src/test/java/io/druid/query/topn/TopNQueryTest.java b/processing/src/test/java/io/druid/query/topn/TopNQueryTest.java new file mode 100644 index 00000000000..61c082039f4 --- /dev/null +++ b/processing/src/test/java/io/druid/query/topn/TopNQueryTest.java @@ -0,0 +1,75 @@ +/* + * Druid - a distributed column store. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * + * This file Copyright (C) 2014 N3TWORK, Inc. and contributed to the Druid project + * under the Druid Corporate Contributor License Agreement. + */ + +package io.druid.query.topn; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.query.Query; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.MaxAggregatorFactory; +import io.druid.query.aggregation.MinAggregatorFactory; +import io.druid.query.aggregation.PostAggregator; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.util.Arrays; + +import static io.druid.query.QueryRunnerTestHelper.*; + +public class TopNQueryTest +{ + private static final ObjectMapper jsonMapper = new DefaultObjectMapper(); + + @Test + public void testQuerySerialization() throws IOException + { + Query query = new TopNQueryBuilder() + .dataSource(dataSource) + .granularity(allGran) + .dimension(providerDimension) + .metric(indexMetric) + .threshold(4) + .intervals(fullOnInterval) + .aggregators( + Lists.newArrayList( + Iterables.concat( + commonAggregators, + Lists.newArrayList( + new MaxAggregatorFactory("maxIndex", "index"), + new MinAggregatorFactory("minIndex", "index") + ) + ) + ) + ) + .postAggregators(Arrays.asList(addRowsIndexConstant)) + .build(); + + String json = jsonMapper.writeValueAsString(query); + Query serdeQuery = jsonMapper.readValue(json, Query.class); + + Assert.assertEquals(query, serdeQuery); + } + +} diff --git a/server/src/main/java/io/druid/client/BrokerServerView.java b/server/src/main/java/io/druid/client/BrokerServerView.java index 3e860a4f23c..4ef7a9f76fc 100644 --- a/server/src/main/java/io/druid/client/BrokerServerView.java +++ b/server/src/main/java/io/druid/client/BrokerServerView.java @@ -30,8 +30,7 @@ import io.druid.client.selector.QueryableDruidServer; import io.druid.client.selector.ServerSelectorStrategy; import io.druid.concurrent.Execs; import io.druid.guice.annotations.Client; -import io.druid.query.QueryRunner; -import io.druid.query.QueryToolChestWarehouse; +import io.druid.query.*; import io.druid.timeline.DataSegment; import io.druid.timeline.VersionedIntervalTimeline; import io.druid.timeline.partition.PartitionChunk; @@ -232,10 +231,22 @@ public class BrokerServerView implements TimelineServerView @Override - public VersionedIntervalTimeline getTimeline(String dataSource) + public VersionedIntervalTimeline getTimeline(DataSource dataSource) { + String table; + while (dataSource instanceof QueryDataSource) { + dataSource = ((QueryDataSource)dataSource).getQuery().getDataSource(); + } + + if (dataSource instanceof TableDataSource) { + table = ((TableDataSource)dataSource).getName(); + } + else { + throw new UnsupportedOperationException("Unsupported data source type: " + dataSource.getClass().getSimpleName()); + } + synchronized (lock) { - return timelines.get(dataSource); + return timelines.get(table); } } diff --git a/server/src/main/java/io/druid/client/CachingClusteredClient.java b/server/src/main/java/io/druid/client/CachingClusteredClient.java index 31a644cf887..ed30af51a69 100644 --- a/server/src/main/java/io/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/io/druid/client/CachingClusteredClient.java @@ -140,6 +140,7 @@ public class CachingClusteredClient implements QueryRunner final Query rewrittenQuery = query.withOverriddenContext(contextBuilder.build()); + VersionedIntervalTimeline timeline = serverView.getTimeline(query.getDataSource()); if (timeline == null) { return Sequences.empty(); diff --git a/server/src/main/java/io/druid/client/TimelineServerView.java b/server/src/main/java/io/druid/client/TimelineServerView.java index 7082c599c75..0a6a43c8fdb 100644 --- a/server/src/main/java/io/druid/client/TimelineServerView.java +++ b/server/src/main/java/io/druid/client/TimelineServerView.java @@ -20,6 +20,7 @@ package io.druid.client; import io.druid.client.selector.ServerSelector; +import io.druid.query.DataSource; import io.druid.query.QueryRunner; import io.druid.timeline.VersionedIntervalTimeline; @@ -27,6 +28,6 @@ import io.druid.timeline.VersionedIntervalTimeline; */ public interface TimelineServerView extends ServerView { - VersionedIntervalTimeline getTimeline(String dataSource); + VersionedIntervalTimeline getTimeline(DataSource dataSource); QueryRunner getQueryRunner(DruidServer server); } diff --git a/server/src/main/java/io/druid/server/QueryResource.java b/server/src/main/java/io/druid/server/QueryResource.java index 4c490271e95..0fb92b7d0ce 100644 --- a/server/src/main/java/io/druid/server/QueryResource.java +++ b/server/src/main/java/io/druid/server/QueryResource.java @@ -124,7 +124,7 @@ public class QueryResource emitter.emit( new ServiceMetricEvent.Builder() - .setUser2(query.getDataSource()) + .setUser2(query.getDataSource().toString()) .setUser4(query.getType()) .setUser5(query.getIntervals().get(0).toString()) .setUser6(String.valueOf(query.hasFilters())) diff --git a/server/src/main/java/io/druid/server/coordination/ServerManager.java b/server/src/main/java/io/druid/server/coordination/ServerManager.java index 950be651e86..f9633e30e0b 100644 --- a/server/src/main/java/io/druid/server/coordination/ServerManager.java +++ b/server/src/main/java/io/druid/server/coordination/ServerManager.java @@ -30,18 +30,7 @@ import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceMetricEvent; import io.druid.collections.CountingMap; import io.druid.guice.annotations.Processing; -import io.druid.query.BySegmentQueryRunner; -import io.druid.query.FinalizeResultsQueryRunner; -import io.druid.query.MetricsEmittingQueryRunner; -import io.druid.query.NoopQueryRunner; -import io.druid.query.Query; -import io.druid.query.QueryRunner; -import io.druid.query.QueryRunnerFactory; -import io.druid.query.QueryRunnerFactoryConglomerate; -import io.druid.query.QuerySegmentWalker; -import io.druid.query.QueryToolChest; -import io.druid.query.ReferenceCountingSegmentQueryRunner; -import io.druid.query.SegmentDescriptor; +import io.druid.query.*; import io.druid.query.spec.QuerySegmentSpec; import io.druid.query.spec.SpecificSegmentQueryRunner; import io.druid.query.spec.SpecificSegmentSpec; @@ -118,6 +107,7 @@ public class ServerManager implements QuerySegmentWalker /** * Load a single segment. + * * @param segment segment to load * @return true if the segment was newly loaded, false if it was already loaded * @throws SegmentLoadingException if the segment cannot be loaded @@ -127,12 +117,10 @@ public class ServerManager implements QuerySegmentWalker final Segment adapter; try { adapter = segmentLoader.getSegment(segment); - } - catch (SegmentLoadingException e) { + } catch (SegmentLoadingException e) { try { segmentLoader.cleanup(segment); - } - catch (SegmentLoadingException e1) { + } catch (SegmentLoadingException e1) { // ignore } throw e; @@ -204,12 +192,11 @@ public class ServerManager implements QuerySegmentWalker try { log.info("Attempting to close segment %s", segment.getIdentifier()); oldQueryable.close(); - } - catch (IOException e) { + } catch (IOException e) { log.makeAlert(e, "Exception closing segment") - .addData("dataSource", dataSource) - .addData("segmentId", segment.getIdentifier()) - .emit(); + .addData("dataSource", dataSource) + .addData("segmentId", segment.getIdentifier()) + .emit(); } } else { log.info( @@ -233,7 +220,20 @@ public class ServerManager implements QuerySegmentWalker final QueryToolChest> toolChest = factory.getToolchest(); - final VersionedIntervalTimeline timeline = dataSources.get(query.getDataSource()); + DataSource dataSource = query.getDataSource(); + if (!(dataSource instanceof TableDataSource)) { + throw new UnsupportedOperationException("data source type '" + dataSource.getClass().getName() + "' unsupported"); + } + + String dataSourceName; + try { + dataSourceName = ((TableDataSource)query.getDataSource()).getName(); + } + catch (ClassCastException e) { + throw new UnsupportedOperationException("Subqueries are only supported in the broker"); + } + + final VersionedIntervalTimeline timeline = dataSources.get(dataSourceName); if (timeline == null) { return new NoopQueryRunner(); @@ -294,6 +294,7 @@ public class ServerManager implements QuerySegmentWalker Predicates.>notNull() ); + return new FinalizeResultsQueryRunner(toolChest.mergeResults(factory.mergeRunners(exec, adapters)), toolChest); } @@ -303,14 +304,22 @@ public class ServerManager implements QuerySegmentWalker final QueryRunnerFactory> factory = conglomerate.findFactory(query); if (factory == null) { log.makeAlert("Unknown query type, [%s]", query.getClass()) - .addData("dataSource", query.getDataSource()) - .emit(); + .addData("dataSource", query.getDataSource()) + .emit(); return new NoopQueryRunner(); } final QueryToolChest> toolChest = factory.getToolchest(); - final VersionedIntervalTimeline timeline = dataSources.get(query.getDataSource()); + String dataSourceName; + try { + dataSourceName = ((TableDataSource)query.getDataSource()).getName(); + } + catch (ClassCastException e) { + throw new UnsupportedOperationException("Subqueries are only supported in the broker"); + } + + final VersionedIntervalTimeline timeline = dataSources.get(dataSourceName); if (timeline == null) { return new NoopQueryRunner();