diff --git a/docs/content/DataSource.md b/docs/content/DataSource.md new file mode 100644 index 00000000000..49c583561ba --- /dev/null +++ b/docs/content/DataSource.md @@ -0,0 +1,25 @@ +--- +layout: doc_page +--- +A data source is the Druid equivalent of a database table. However, a query can also masquerade as a data source, providing subquery-like functionality. Query data sources are currently only supported by [GroupBy](GroupByQuery.html) queries. + +### Table Data Source +The table data source the most common type. It's represented by a string, or by the full structure: + +```json +{ + "type": "table", + "name": +} +``` + +### Query Data Source +```json +{ + "type": "query", + "query": { + "type": "groupBy", + ... + } +} +``` diff --git a/docs/content/GroupByQuery.md b/docs/content/GroupByQuery.md index 9edca5d2861..ca6ef8e277a 100644 --- a/docs/content/GroupByQuery.md +++ b/docs/content/GroupByQuery.md @@ -48,7 +48,7 @@ There are 9 main parts to a groupBy query: |property|description|required?| |--------|-----------|---------| |queryType|This String should always be "groupBy"; this is the first thing Druid looks at to figure out how to interpret the query|yes| -|dataSource|A String defining the data source to query, very similar to a table in a relational database|yes| +|dataSource|A String defining the data source to query, very similar to a table in a relational database, or a [DataSource](DataSource.html) structure.|yes| |dimensions|A JSON list of dimensions to do the groupBy over|yes| |orderBy|See [OrderBy](OrderBy.html).|no| |having|See [Having](Having.html).|no| diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/IndexerDBCoordinator.java b/indexing-service/src/main/java/io/druid/indexing/overlord/IndexerDBCoordinator.java index dc02ee9d4ef..2afa4ed0dd5 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/IndexerDBCoordinator.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/IndexerDBCoordinator.java @@ -90,7 +90,7 @@ public class IndexerDBCoordinator final ResultIterator> dbSegments = handle.createQuery( String.format( - "SELECT payload FROM %s WHERE used = 1 AND dataSource = :dataSource", + "SELECT payload FROM %s WHERE used = true AND dataSource = :dataSource", dbTables.getSegmentsTable() ) ) @@ -304,8 +304,8 @@ public class IndexerDBCoordinator return handle.createQuery( String.format( DbConnector.isPostgreSQL(handle)? - "SELECT payload FROM %s WHERE dataSource = :dataSource and start >= :start and \"end\" <= :end and used = 0": - "SELECT payload FROM %s WHERE dataSource = :dataSource and start >= :start and end <= :end and used = 0", + "SELECT payload FROM %s WHERE dataSource = :dataSource and start >= :start and \"end\" <= :end and used = false": + "SELECT payload FROM %s WHERE dataSource = :dataSource and start >= :start and end <= :end and used = false", dbTables.getSegmentsTable() ) ) diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java index 2cc94ac7400..0d69ed5036c 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java @@ -38,6 +38,7 @@ import io.druid.indexing.common.TaskToolboxFactory; import io.druid.indexing.common.task.Task; import io.druid.query.NoopQueryRunner; import io.druid.query.Query; +import io.druid.query.TableDataSource; import io.druid.query.QueryRunner; import io.druid.query.QuerySegmentWalker; import io.druid.query.SegmentDescriptor; @@ -152,10 +153,17 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker private QueryRunner getQueryRunnerImpl(Query query) { QueryRunner queryRunner = null; + String queryDataSource; + try { + queryDataSource = ((TableDataSource)query.getDataSource()).getName(); + } + catch (ClassCastException e) { + throw new IllegalArgumentException("Subqueries are not welcome here"); + } for (final ThreadPoolTaskRunnerWorkItem taskRunnerWorkItem : ImmutableList.copyOf(runningItems)) { final Task task = taskRunnerWorkItem.getTask(); - if (task.getDataSource().equals(query.getDataSource())) { + if (task.getDataSource().equals(queryDataSource)) { final QueryRunner taskQueryRunner = task.getQueryRunner(query); if (taskQueryRunner != null) { @@ -163,7 +171,7 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker queryRunner = taskQueryRunner; } else { log.makeAlert("Found too many query runners for datasource") - .addData("dataSource", query.getDataSource()) + .addData("dataSource", queryDataSource) .emit(); } } diff --git a/processing/src/main/java/io/druid/query/BaseQuery.java b/processing/src/main/java/io/druid/query/BaseQuery.java index 7195e1fcd8e..ed13f9ddf39 100644 --- a/processing/src/main/java/io/druid/query/BaseQuery.java +++ b/processing/src/main/java/io/druid/query/BaseQuery.java @@ -36,13 +36,13 @@ import java.util.Map; public abstract class BaseQuery implements Query { public static String QUERYID = "queryId"; - private final String dataSource; + private final DataSource dataSource; private final Map context; private final QuerySegmentSpec querySegmentSpec; private volatile Duration duration; public BaseQuery( - String dataSource, + DataSource dataSource, QuerySegmentSpec querySegmentSpec, Map context ) @@ -50,14 +50,14 @@ public abstract class BaseQuery implements Query Preconditions.checkNotNull(dataSource, "dataSource can't be null"); Preconditions.checkNotNull(querySegmentSpec, "querySegmentSpec can't be null"); - this.dataSource = dataSource.toLowerCase(); + this.dataSource = dataSource; this.context = context; this.querySegmentSpec = querySegmentSpec; } @JsonProperty @Override - public String getDataSource() + public DataSource getDataSource() { return dataSource; } @@ -143,4 +143,31 @@ public abstract class BaseQuery implements Query { return withOverriddenContext(ImmutableMap.of(QUERYID, id)); } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + BaseQuery baseQuery = (BaseQuery) o; + + if (context != null ? !context.equals(baseQuery.context) : baseQuery.context != null) return false; + if (dataSource != null ? !dataSource.equals(baseQuery.dataSource) : baseQuery.dataSource != null) return false; + if (duration != null ? !duration.equals(baseQuery.duration) : baseQuery.duration != null) return false; + if (querySegmentSpec != null ? !querySegmentSpec.equals(baseQuery.querySegmentSpec) : baseQuery.querySegmentSpec != null) + return false; + + return true; + } + + @Override + public int hashCode() + { + int result = dataSource != null ? dataSource.hashCode() : 0; + result = 31 * result + (context != null ? context.hashCode() : 0); + result = 31 * result + (querySegmentSpec != null ? querySegmentSpec.hashCode() : 0); + result = 31 * result + (duration != null ? duration.hashCode() : 0); + return result; + } } diff --git a/processing/src/main/java/io/druid/query/DataSource.java b/processing/src/main/java/io/druid/query/DataSource.java new file mode 100644 index 00000000000..2560f2b418c --- /dev/null +++ b/processing/src/main/java/io/druid/query/DataSource.java @@ -0,0 +1,37 @@ +/* + * Druid - a distributed column store. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * + * This file Copyright (C) 2014 N3TWORK, Inc. and contributed to the Druid project + * under the Druid Corporate Contributor License Agreement. + */ + +package io.druid.query; + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; + +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, + include = JsonTypeInfo.As.PROPERTY, + property = "type", + defaultImpl = LegacyDataSource.class) +@JsonSubTypes({ + @JsonSubTypes.Type(value = TableDataSource.class, name = "table"), + @JsonSubTypes.Type(value = QueryDataSource.class, name = "query") +}) +public interface DataSource +{ +} diff --git a/processing/src/main/java/io/druid/query/Druids.java b/processing/src/main/java/io/druid/query/Druids.java index 99d5acdc6f6..fd2abd13ab8 100644 --- a/processing/src/main/java/io/druid/query/Druids.java +++ b/processing/src/main/java/io/druid/query/Druids.java @@ -298,7 +298,7 @@ public class Druids */ public static class TimeseriesQueryBuilder { - private String dataSource; + private DataSource dataSource; private QuerySegmentSpec querySegmentSpec; private DimFilter dimFilter; private QueryGranularity granularity; @@ -308,7 +308,7 @@ public class Druids private TimeseriesQueryBuilder() { - dataSource = ""; + dataSource = null; querySegmentSpec = null; dimFilter = null; granularity = QueryGranularity.ALL; @@ -354,7 +354,7 @@ public class Druids .context(builder.context); } - public String getDataSource() + public DataSource getDataSource() { return dataSource; } @@ -390,6 +390,12 @@ public class Druids } public TimeseriesQueryBuilder dataSource(String ds) + { + dataSource = new TableDataSource(ds); + return this; + } + + public TimeseriesQueryBuilder dataSource(DataSource ds) { dataSource = ds; return this; @@ -492,7 +498,7 @@ public class Druids */ public static class SearchQueryBuilder { - private String dataSource; + private DataSource dataSource; private DimFilter dimFilter; private QueryGranularity granularity; private int limit; @@ -503,7 +509,7 @@ public class Druids public SearchQueryBuilder() { - dataSource = ""; + dataSource = null; dimFilter = null; granularity = QueryGranularity.ALL; limit = 0; @@ -531,7 +537,7 @@ public class Druids public SearchQueryBuilder copy(SearchQuery query) { return new SearchQueryBuilder() - .dataSource(query.getDataSource()) + .dataSource(((TableDataSource)query.getDataSource()).getName()) .intervals(query.getQuerySegmentSpec()) .filters(query.getDimensionsFilter()) .granularity(query.getGranularity()) @@ -555,6 +561,12 @@ public class Druids } public SearchQueryBuilder dataSource(String d) + { + dataSource = new TableDataSource(d); + return this; + } + + public SearchQueryBuilder dataSource(DataSource d) { dataSource = d; return this; @@ -676,13 +688,13 @@ public class Druids */ public static class TimeBoundaryQueryBuilder { - private String dataSource; + private DataSource dataSource; private QuerySegmentSpec querySegmentSpec; private Map context; public TimeBoundaryQueryBuilder() { - dataSource = ""; + dataSource = null; querySegmentSpec = null; context = null; } @@ -704,9 +716,15 @@ public class Druids .context(builder.context); } - public TimeBoundaryQueryBuilder dataSource(String d) + public TimeBoundaryQueryBuilder dataSource(String ds) { - dataSource = d; + dataSource = new TableDataSource(ds); + return this; + } + + public TimeBoundaryQueryBuilder dataSource(DataSource ds) + { + dataSource = ds; return this; } diff --git a/processing/src/main/java/io/druid/query/LegacyDataSource.java b/processing/src/main/java/io/druid/query/LegacyDataSource.java new file mode 100644 index 00000000000..07a8c647297 --- /dev/null +++ b/processing/src/main/java/io/druid/query/LegacyDataSource.java @@ -0,0 +1,35 @@ +/* + * Druid - a distributed column store. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * + * This file Copyright (C) 2014 N3TWORK, Inc. and contributed to the Druid project + * under the Druid Corporate Contributor License Agreement. + */ +package io.druid.query; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; + +@JsonTypeName("table") +public class LegacyDataSource extends TableDataSource +{ + @JsonCreator + public LegacyDataSource(String name) + { + super(name); + } +} diff --git a/processing/src/main/java/io/druid/query/Query.java b/processing/src/main/java/io/druid/query/Query.java index 6823e1220ae..d58798539ba 100644 --- a/processing/src/main/java/io/druid/query/Query.java +++ b/processing/src/main/java/io/druid/query/Query.java @@ -56,7 +56,7 @@ public interface Query public static final String SELECT = "select"; public static final String TOPN = "topN"; - public String getDataSource(); + public DataSource getDataSource(); public boolean hasFilters(); diff --git a/processing/src/main/java/io/druid/query/QueryDataSource.java b/processing/src/main/java/io/druid/query/QueryDataSource.java new file mode 100644 index 00000000000..e15ebf31338 --- /dev/null +++ b/processing/src/main/java/io/druid/query/QueryDataSource.java @@ -0,0 +1,65 @@ +/* + * Druid - a distributed column store. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * + * This file Copyright (C) 2014 N3TWORK, Inc. and contributed to the Druid project + * under the Druid Corporate Contributor License Agreement. + */ + +package io.druid.query; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; + +@JsonTypeName("query") +public class QueryDataSource implements DataSource +{ + @JsonProperty + private final Query query; + + @JsonCreator + public QueryDataSource(@JsonProperty("query") Query query) + { + this.query = query; + } + + public Query getQuery() + { + return query; + } + + public String toString() { return query.toString(); } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + QueryDataSource that = (QueryDataSource) o; + + if (!query.equals(that.query)) return false; + + return true; + } + + @Override + public int hashCode() + { + return query.hashCode(); + } +} diff --git a/processing/src/main/java/io/druid/query/SubqueryQueryRunner.java b/processing/src/main/java/io/druid/query/SubqueryQueryRunner.java new file mode 100644 index 00000000000..8e13d9219e9 --- /dev/null +++ b/processing/src/main/java/io/druid/query/SubqueryQueryRunner.java @@ -0,0 +1,48 @@ +/* + * Druid - a distributed column store. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * + * This file Copyright (C) 2014 N3TWORK, Inc. and contributed to the Druid project + * under the Druid Corporate Contributor License Agreement. + */ + +package io.druid.query; + +import com.metamx.common.guava.Sequence; + +/** + * If there's a subquery, run it instead of the outer query + */ +public class SubqueryQueryRunner implements QueryRunner +{ + private final QueryRunner baseRunner; + + public SubqueryQueryRunner(QueryRunner baseRunner) + { + this.baseRunner = baseRunner; + } + + @Override + public Sequence run(final Query query) + { + DataSource dataSource = query.getDataSource(); + if (dataSource instanceof QueryDataSource) { + return run((Query) ((QueryDataSource) dataSource).getQuery()); + } else { + return baseRunner.run(query); + } + } +} diff --git a/processing/src/main/java/io/druid/query/TableDataSource.java b/processing/src/main/java/io/druid/query/TableDataSource.java new file mode 100644 index 00000000000..f246106aae1 --- /dev/null +++ b/processing/src/main/java/io/druid/query/TableDataSource.java @@ -0,0 +1,64 @@ +/* + * Druid - a distributed column store. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * + * This file Copyright (C) 2014 N3TWORK, Inc. and contributed to the Druid project + * under the Druid Corporate Contributor License Agreement. + */ +package io.druid.query; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; + +@JsonTypeName("table") +public class TableDataSource implements DataSource +{ + @JsonProperty + private final String name; + + @JsonCreator + public TableDataSource(@JsonProperty("name") String name) + { + this.name = (name == null ? null : name.toLowerCase()); + } + + public String getName() + { + return name; + } + + public String toString() { return name; } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (!(o instanceof TableDataSource)) return false; + + TableDataSource that = (TableDataSource) o; + + if (!name.equals(that.name)) return false; + + return true; + } + + @Override + public int hashCode() + { + return name.hashCode(); + } +} diff --git a/processing/src/main/java/io/druid/query/aggregation/CountAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/CountAggregatorFactory.java index 1b02f923996..e47999e8719 100644 --- a/processing/src/main/java/io/druid/query/aggregation/CountAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/CountAggregatorFactory.java @@ -132,4 +132,23 @@ public class CountAggregatorFactory implements AggregatorFactory "name='" + name + '\'' + '}'; } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + CountAggregatorFactory that = (CountAggregatorFactory) o; + + if (name != null ? !name.equals(that.name) : that.name != null) return false; + + return true; + } + + @Override + public int hashCode() + { + return name != null ? name.hashCode() : 0; + } } diff --git a/processing/src/main/java/io/druid/query/aggregation/DoubleSumAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/DoubleSumAggregatorFactory.java index f85d0c677f5..ebd4e185ea3 100644 --- a/processing/src/main/java/io/druid/query/aggregation/DoubleSumAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/DoubleSumAggregatorFactory.java @@ -150,4 +150,26 @@ public class DoubleSumAggregatorFactory implements AggregatorFactory ", name='" + name + '\'' + '}'; } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + DoubleSumAggregatorFactory that = (DoubleSumAggregatorFactory) o; + + if (fieldName != null ? !fieldName.equals(that.fieldName) : that.fieldName != null) return false; + if (name != null ? !name.equals(that.name) : that.name != null) return false; + + return true; + } + + @Override + public int hashCode() + { + int result = fieldName != null ? fieldName.hashCode() : 0; + result = 31 * result + (name != null ? name.hashCode() : 0); + return result; + } } diff --git a/processing/src/main/java/io/druid/query/aggregation/HistogramAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/HistogramAggregatorFactory.java index 2abd19d2330..060d40d2798 100644 --- a/processing/src/main/java/io/druid/query/aggregation/HistogramAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/HistogramAggregatorFactory.java @@ -179,4 +179,30 @@ public class HistogramAggregatorFactory implements AggregatorFactory ", breaks=" + Arrays.toString(breaks) + '}'; } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + HistogramAggregatorFactory that = (HistogramAggregatorFactory) o; + + if (!Arrays.equals(breaks, that.breaks)) return false; + if (breaksList != null ? !breaksList.equals(that.breaksList) : that.breaksList != null) return false; + if (fieldName != null ? !fieldName.equals(that.fieldName) : that.fieldName != null) return false; + if (name != null ? !name.equals(that.name) : that.name != null) return false; + + return true; + } + + @Override + public int hashCode() + { + int result = name != null ? name.hashCode() : 0; + result = 31 * result + (fieldName != null ? fieldName.hashCode() : 0); + result = 31 * result + (breaksList != null ? breaksList.hashCode() : 0); + result = 31 * result + (breaks != null ? Arrays.hashCode(breaks) : 0); + return result; + } } diff --git a/processing/src/main/java/io/druid/query/aggregation/JavaScriptAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/JavaScriptAggregatorFactory.java index 927ab89676f..6de6be09ad8 100644 --- a/processing/src/main/java/io/druid/query/aggregation/JavaScriptAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/JavaScriptAggregatorFactory.java @@ -317,4 +317,35 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory } }; } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + JavaScriptAggregatorFactory that = (JavaScriptAggregatorFactory) o; + + if (compiledScript != null ? !compiledScript.equals(that.compiledScript) : that.compiledScript != null) + return false; + if (fieldNames != null ? !fieldNames.equals(that.fieldNames) : that.fieldNames != null) return false; + if (fnAggregate != null ? !fnAggregate.equals(that.fnAggregate) : that.fnAggregate != null) return false; + if (fnCombine != null ? !fnCombine.equals(that.fnCombine) : that.fnCombine != null) return false; + if (fnReset != null ? !fnReset.equals(that.fnReset) : that.fnReset != null) return false; + if (name != null ? !name.equals(that.name) : that.name != null) return false; + + return true; + } + + @Override + public int hashCode() + { + int result = name != null ? name.hashCode() : 0; + result = 31 * result + (fieldNames != null ? fieldNames.hashCode() : 0); + result = 31 * result + (fnAggregate != null ? fnAggregate.hashCode() : 0); + result = 31 * result + (fnReset != null ? fnReset.hashCode() : 0); + result = 31 * result + (fnCombine != null ? fnCombine.hashCode() : 0); + result = 31 * result + (compiledScript != null ? compiledScript.hashCode() : 0); + return result; + } } diff --git a/processing/src/main/java/io/druid/query/aggregation/LongSumAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/LongSumAggregatorFactory.java index f1372ff024c..50ef5130756 100644 --- a/processing/src/main/java/io/druid/query/aggregation/LongSumAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/LongSumAggregatorFactory.java @@ -150,4 +150,26 @@ public class LongSumAggregatorFactory implements AggregatorFactory ", name='" + name + '\'' + '}'; } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + LongSumAggregatorFactory that = (LongSumAggregatorFactory) o; + + if (fieldName != null ? !fieldName.equals(that.fieldName) : that.fieldName != null) return false; + if (name != null ? !name.equals(that.name) : that.name != null) return false; + + return true; + } + + @Override + public int hashCode() + { + int result = fieldName != null ? fieldName.hashCode() : 0; + result = 31 * result + (name != null ? name.hashCode() : 0); + return result; + } } diff --git a/processing/src/main/java/io/druid/query/aggregation/MaxAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/MaxAggregatorFactory.java index d66d9c5b2d9..ee8217f820b 100644 --- a/processing/src/main/java/io/druid/query/aggregation/MaxAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/MaxAggregatorFactory.java @@ -150,4 +150,26 @@ public class MaxAggregatorFactory implements AggregatorFactory ", name='" + name + '\'' + '}'; } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + MaxAggregatorFactory that = (MaxAggregatorFactory) o; + + if (fieldName != null ? !fieldName.equals(that.fieldName) : that.fieldName != null) return false; + if (name != null ? !name.equals(that.name) : that.name != null) return false; + + return true; + } + + @Override + public int hashCode() + { + int result = fieldName != null ? fieldName.hashCode() : 0; + result = 31 * result + (name != null ? name.hashCode() : 0); + return result; + } } diff --git a/processing/src/main/java/io/druid/query/aggregation/MinAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/MinAggregatorFactory.java index 0a168114358..9c3d560bacf 100644 --- a/processing/src/main/java/io/druid/query/aggregation/MinAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/MinAggregatorFactory.java @@ -150,4 +150,26 @@ public class MinAggregatorFactory implements AggregatorFactory ", name='" + name + '\'' + '}'; } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + MinAggregatorFactory that = (MinAggregatorFactory) o; + + if (fieldName != null ? !fieldName.equals(that.fieldName) : that.fieldName != null) return false; + if (name != null ? !name.equals(that.name) : that.name != null) return false; + + return true; + } + + @Override + public int hashCode() + { + int result = fieldName != null ? fieldName.hashCode() : 0; + result = 31 * result + (name != null ? name.hashCode() : 0); + return result; + } } diff --git a/processing/src/main/java/io/druid/query/aggregation/ToLowerCaseAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/ToLowerCaseAggregatorFactory.java index 457d22f2cdf..6c559ba8ec6 100644 --- a/processing/src/main/java/io/druid/query/aggregation/ToLowerCaseAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/ToLowerCaseAggregatorFactory.java @@ -112,4 +112,24 @@ public class ToLowerCaseAggregatorFactory implements AggregatorFactory { return baseAggregatorFactory.getAggregatorStartValue(); } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + ToLowerCaseAggregatorFactory that = (ToLowerCaseAggregatorFactory) o; + + if (baseAggregatorFactory != null ? !baseAggregatorFactory.equals(that.baseAggregatorFactory) : that.baseAggregatorFactory != null) + return false; + + return true; + } + + @Override + public int hashCode() + { + return baseAggregatorFactory != null ? baseAggregatorFactory.hashCode() : 0; + } } diff --git a/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperUniquesAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperUniquesAggregatorFactory.java index 99ec241699a..5eb45524773 100644 --- a/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperUniquesAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperUniquesAggregatorFactory.java @@ -207,4 +207,26 @@ public class HyperUniquesAggregatorFactory implements AggregatorFactory ", fieldName='" + fieldName + '\'' + '}'; } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + HyperUniquesAggregatorFactory that = (HyperUniquesAggregatorFactory) o; + + if (!fieldName.equals(that.fieldName)) return false; + if (!name.equals(that.name)) return false; + + return true; + } + + @Override + public int hashCode() + { + int result = name.hashCode(); + result = 31 * result + fieldName.hashCode(); + return result; + } } diff --git a/processing/src/main/java/io/druid/query/aggregation/post/ArithmeticPostAggregator.java b/processing/src/main/java/io/druid/query/aggregation/post/ArithmeticPostAggregator.java index cf6881d18ad..87138dadef8 100644 --- a/processing/src/main/java/io/druid/query/aggregation/post/ArithmeticPostAggregator.java +++ b/processing/src/main/java/io/druid/query/aggregation/post/ArithmeticPostAggregator.java @@ -193,4 +193,30 @@ public class ArithmeticPostAggregator implements PostAggregator return lookupMap.keySet(); } } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + ArithmeticPostAggregator that = (ArithmeticPostAggregator) o; + + if (fields != null ? !fields.equals(that.fields) : that.fields != null) return false; + if (fnName != null ? !fnName.equals(that.fnName) : that.fnName != null) return false; + if (name != null ? !name.equals(that.name) : that.name != null) return false; + if (op != that.op) return false; + + return true; + } + + @Override + public int hashCode() + { + int result = name != null ? name.hashCode() : 0; + result = 31 * result + (fnName != null ? fnName.hashCode() : 0); + result = 31 * result + (fields != null ? fields.hashCode() : 0); + result = 31 * result + (op != null ? op.hashCode() : 0); + return result; + } } diff --git a/processing/src/main/java/io/druid/query/aggregation/post/ConstantPostAggregator.java b/processing/src/main/java/io/druid/query/aggregation/post/ConstantPostAggregator.java index d5dce4d3140..7bf4ed23d4c 100644 --- a/processing/src/main/java/io/druid/query/aggregation/post/ConstantPostAggregator.java +++ b/processing/src/main/java/io/druid/query/aggregation/post/ConstantPostAggregator.java @@ -91,4 +91,33 @@ public class ConstantPostAggregator implements PostAggregator ", constantValue=" + constantValue + '}'; } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + ConstantPostAggregator that = (ConstantPostAggregator) o; + + if (constantValue != null && that.constantValue != null) { + if (constantValue.doubleValue() != that.constantValue.doubleValue()) + return false; + } + else if (constantValue != that.constantValue) { + return false; + } + + if (name != null ? !name.equals(that.name) : that.name != null) return false; + + return true; + } + + @Override + public int hashCode() + { + int result = name != null ? name.hashCode() : 0; + result = 31 * result + (constantValue != null ? constantValue.hashCode() : 0); + return result; + } } diff --git a/processing/src/main/java/io/druid/query/aggregation/post/FieldAccessPostAggregator.java b/processing/src/main/java/io/druid/query/aggregation/post/FieldAccessPostAggregator.java index 59962836d2e..6c2321b10fd 100644 --- a/processing/src/main/java/io/druid/query/aggregation/post/FieldAccessPostAggregator.java +++ b/processing/src/main/java/io/druid/query/aggregation/post/FieldAccessPostAggregator.java @@ -84,4 +84,26 @@ public class FieldAccessPostAggregator implements PostAggregator ", fieldName='" + fieldName + '\'' + '}'; } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + FieldAccessPostAggregator that = (FieldAccessPostAggregator) o; + + if (fieldName != null ? !fieldName.equals(that.fieldName) : that.fieldName != null) return false; + if (name != null ? !name.equals(that.name) : that.name != null) return false; + + return true; + } + + @Override + public int hashCode() + { + int result = name != null ? name.hashCode() : 0; + result = 31 * result + (fieldName != null ? fieldName.hashCode() : 0); + return result; + } } diff --git a/processing/src/main/java/io/druid/query/aggregation/post/JavaScriptPostAggregator.java b/processing/src/main/java/io/druid/query/aggregation/post/JavaScriptPostAggregator.java index fef4b26f0f2..666dbc64a82 100644 --- a/processing/src/main/java/io/druid/query/aggregation/post/JavaScriptPostAggregator.java +++ b/processing/src/main/java/io/druid/query/aggregation/post/JavaScriptPostAggregator.java @@ -142,4 +142,30 @@ public class JavaScriptPostAggregator implements PostAggregator { return function; } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + JavaScriptPostAggregator that = (JavaScriptPostAggregator) o; + + if (fieldNames != null ? !fieldNames.equals(that.fieldNames) : that.fieldNames != null) return false; + if (fn != null ? !fn.equals(that.fn) : that.fn != null) return false; + if (function != null ? !function.equals(that.function) : that.function != null) return false; + if (name != null ? !name.equals(that.name) : that.name != null) return false; + + return true; + } + + @Override + public int hashCode() + { + int result = name != null ? name.hashCode() : 0; + result = 31 * result + (fieldNames != null ? fieldNames.hashCode() : 0); + result = 31 * result + (function != null ? function.hashCode() : 0); + result = 31 * result + (fn != null ? fn.hashCode() : 0); + return result; + } } diff --git a/processing/src/main/java/io/druid/query/dimension/DefaultDimensionSpec.java b/processing/src/main/java/io/druid/query/dimension/DefaultDimensionSpec.java index 3a9137b008e..8e18ce61228 100644 --- a/processing/src/main/java/io/druid/query/dimension/DefaultDimensionSpec.java +++ b/processing/src/main/java/io/druid/query/dimension/DefaultDimensionSpec.java @@ -84,4 +84,26 @@ public class DefaultDimensionSpec implements DimensionSpec ", outputName='" + outputName + '\'' + '}'; } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + DefaultDimensionSpec that = (DefaultDimensionSpec) o; + + if (dimension != null ? !dimension.equals(that.dimension) : that.dimension != null) return false; + if (outputName != null ? !outputName.equals(that.outputName) : that.outputName != null) return false; + + return true; + } + + @Override + public int hashCode() + { + int result = dimension != null ? dimension.hashCode() : 0; + result = 31 * result + (outputName != null ? outputName.hashCode() : 0); + return result; + } } diff --git a/processing/src/main/java/io/druid/query/dimension/ExtractionDimensionSpec.java b/processing/src/main/java/io/druid/query/dimension/ExtractionDimensionSpec.java index 82fba73d0a1..9fe480e396d 100644 --- a/processing/src/main/java/io/druid/query/dimension/ExtractionDimensionSpec.java +++ b/processing/src/main/java/io/druid/query/dimension/ExtractionDimensionSpec.java @@ -92,4 +92,29 @@ public class ExtractionDimensionSpec implements DimensionSpec ", outputName='" + outputName + '\'' + '}'; } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + ExtractionDimensionSpec that = (ExtractionDimensionSpec) o; + + if (dimExtractionFn != null ? !dimExtractionFn.equals(that.dimExtractionFn) : that.dimExtractionFn != null) + return false; + if (dimension != null ? !dimension.equals(that.dimension) : that.dimension != null) return false; + if (outputName != null ? !outputName.equals(that.outputName) : that.outputName != null) return false; + + return true; + } + + @Override + public int hashCode() + { + int result = dimension != null ? dimension.hashCode() : 0; + result = 31 * result + (dimExtractionFn != null ? dimExtractionFn.hashCode() : 0); + result = 31 * result + (outputName != null ? outputName.hashCode() : 0); + return result; + } } diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java b/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java index 93db02101ba..80322a29531 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java @@ -33,7 +33,11 @@ import com.metamx.common.guava.Sequences; import io.druid.data.input.Row; import io.druid.granularity.QueryGranularity; import io.druid.query.BaseQuery; +import io.druid.query.DataSource; import io.druid.query.Queries; +import io.druid.query.Query; +import io.druid.query.QueryDataSource; +import io.druid.query.TableDataSource; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.PostAggregator; import io.druid.query.dimension.DefaultDimensionSpec; @@ -72,7 +76,7 @@ public class GroupByQuery extends BaseQuery @JsonCreator public GroupByQuery( - @JsonProperty("dataSource") String dataSource, + @JsonProperty("dataSource") DataSource dataSource, @JsonProperty("intervals") QuerySegmentSpec querySegmentSpec, @JsonProperty("filter") DimFilter dimFilter, @JsonProperty("granularity") QueryGranularity granularity, @@ -133,7 +137,7 @@ public class GroupByQuery extends BaseQuery * have already passed in order for the object to exist. */ private GroupByQuery( - String dataSource, + DataSource dataSource, QuerySegmentSpec querySegmentSpec, DimFilter dimFilter, QueryGranularity granularity, @@ -255,7 +259,7 @@ public class GroupByQuery extends BaseQuery public static class Builder { - private String dataSource; + private DataSource dataSource; private QuerySegmentSpec querySegmentSpec; private DimFilter dimFilter; private QueryGranularity granularity; @@ -270,7 +274,9 @@ public class GroupByQuery extends BaseQuery private List orderByColumnSpecs = Lists.newArrayList(); private int limit = Integer.MAX_VALUE; - private Builder() {} + private Builder() + { + } private Builder(Builder builder) { @@ -288,12 +294,24 @@ public class GroupByQuery extends BaseQuery context = builder.context; } - public Builder setDataSource(String dataSource) + public Builder setDataSource(DataSource dataSource) { this.dataSource = dataSource; return this; } + public Builder setDataSource(String dataSource) + { + this.dataSource = new TableDataSource(dataSource); + return this; + } + + public Builder setDataSource(Query query) + { + this.dataSource = new QueryDataSource(query); + return this; + } + public Builder setInterval(Object interval) { return setQuerySegmentSpec(new LegacySegmentSpec(interval)); @@ -479,13 +497,52 @@ public class GroupByQuery extends BaseQuery public String toString() { return "GroupByQuery{" + - "limitSpec=" + limitSpec + - ", dimFilter=" + dimFilter + - ", granularity=" + granularity + - ", dimensions=" + dimensions + - ", aggregatorSpecs=" + aggregatorSpecs + - ", postAggregatorSpecs=" + postAggregatorSpecs + - ", orderByLimitFn=" + orderByLimitFn + - '}'; + "limitSpec=" + limitSpec + + ", dimFilter=" + dimFilter + + ", granularity=" + granularity + + ", dimensions=" + dimensions + + ", aggregatorSpecs=" + aggregatorSpecs + + ", postAggregatorSpecs=" + postAggregatorSpecs + + ", orderByLimitFn=" + orderByLimitFn + + '}'; + } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + if (!super.equals(o)) return false; + + GroupByQuery that = (GroupByQuery) o; + + if (aggregatorSpecs != null ? !aggregatorSpecs.equals(that.aggregatorSpecs) : that.aggregatorSpecs != null) + return false; + if (dimFilter != null ? !dimFilter.equals(that.dimFilter) : that.dimFilter != null) return false; + if (dimensions != null ? !dimensions.equals(that.dimensions) : that.dimensions != null) return false; + if (granularity != null ? !granularity.equals(that.granularity) : that.granularity != null) return false; + if (havingSpec != null ? !havingSpec.equals(that.havingSpec) : that.havingSpec != null) return false; + if (limitSpec != null ? !limitSpec.equals(that.limitSpec) : that.limitSpec != null) return false; + if (orderByLimitFn != null ? !orderByLimitFn.equals(that.orderByLimitFn) : that.orderByLimitFn != null) + return false; + if (postAggregatorSpecs != null ? !postAggregatorSpecs.equals(that.postAggregatorSpecs) : that.postAggregatorSpecs != null) + return false; + + return true; + } + + @Override + public int hashCode() + { + int result = super.hashCode(); + result = 31 * result + (limitSpec != null ? limitSpec.hashCode() : 0); + result = 31 * result + (havingSpec != null ? havingSpec.hashCode() : 0); + result = 31 * result + (dimFilter != null ? dimFilter.hashCode() : 0); + result = 31 * result + (granularity != null ? granularity.hashCode() : 0); + result = 31 * result + (dimensions != null ? dimensions.hashCode() : 0); + result = 31 * result + (aggregatorSpecs != null ? aggregatorSpecs.hashCode() : 0); + result = 31 * result + (postAggregatorSpecs != null ? postAggregatorSpecs.hashCode() : 0); + result = 31 * result + (orderByLimitFn != null ? orderByLimitFn.hashCode() : 0); + return result; } } diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java index cb1f0279d13..00298f18ba0 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java @@ -24,9 +24,6 @@ import com.google.common.collect.Lists; import com.metamx.common.ISE; import com.metamx.common.Pair; import com.metamx.common.guava.Accumulator; -import com.metamx.common.guava.Sequence; -import com.metamx.common.guava.Sequences; -import io.druid.data.input.MapBasedRow; import io.druid.data.input.Row; import io.druid.data.input.Rows; import io.druid.granularity.QueryGranularity; diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java index d2a338460e2..543ec0ba130 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -34,13 +34,17 @@ import com.metamx.common.guava.Sequences; import com.metamx.emitter.service.ServiceMetricEvent; import io.druid.data.input.MapBasedRow; import io.druid.data.input.Row; +import io.druid.query.DataSource; import io.druid.query.IntervalChunkingQueryRunner; import io.druid.query.Query; +import io.druid.query.QueryDataSource; import io.druid.query.QueryRunner; import io.druid.query.QueryToolChest; +import io.druid.query.SubqueryQueryRunner; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.MetricManipulationFn; import io.druid.segment.incremental.IncrementalIndex; +import io.druid.segment.incremental.IncrementalIndexStorageAdapter; import org.joda.time.Interval; import org.joda.time.Minutes; @@ -56,13 +60,16 @@ public class GroupByQueryQueryToolChest extends QueryToolChest NO_MERGE_CONTEXT = ImmutableMap.of(GROUP_BY_MERGE_KEY, "false"); private final Supplier configSupplier; + private GroupByQueryEngine engine; // For running the outer query around a subquery @Inject public GroupByQueryQueryToolChest( - Supplier configSupplier + Supplier configSupplier, + GroupByQueryEngine engine ) { this.configSupplier = configSupplier; + this.engine = engine; } @Override @@ -84,13 +91,32 @@ public class GroupByQueryQueryToolChest extends QueryToolChest mergeGroupByResults(final GroupByQuery query, QueryRunner runner) { - final GroupByQueryConfig config = configSupplier.get(); - Pair> indexAccumulatorPair = GroupByQueryHelper.createIndexAccumulatorPair( - query, - config - ); - IncrementalIndex index = runner.run(query).accumulate(indexAccumulatorPair.lhs, indexAccumulatorPair.rhs); + Sequence result; + + // If there's a subquery, merge subquery results and then apply the aggregator + DataSource dataSource = query.getDataSource(); + if (dataSource instanceof QueryDataSource) { + GroupByQuery subquery; + try { + subquery = (GroupByQuery) ((QueryDataSource) dataSource).getQuery(); + } catch (ClassCastException e) { + throw new UnsupportedOperationException("Subqueries must be of type 'group by'"); + } + Sequence subqueryResult = mergeGroupByResults(subquery, runner); + IncrementalIndexStorageAdapter adapter + = new IncrementalIndexStorageAdapter(makeIncrementalIndex(subquery, subqueryResult)); + result = engine.process(query, adapter); + } else { + result = runner.run(query); + } + + return postAggregate(query, makeIncrementalIndex(query, result)); + } + + + private Sequence postAggregate(final GroupByQuery query, IncrementalIndex index) + { Sequence sequence = Sequences.map( Sequences.simple(index.iterableWithPostAggregations(query.getPostAggregatorSpecs())), new Function() @@ -101,7 +127,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest rows) + { + final GroupByQueryConfig config = configSupplier.get(); + Pair> indexAccumulatorPair = GroupByQueryHelper.createIndexAccumulatorPair( + query, + config + ); + + return rows.accumulate(indexAccumulatorPair.lhs, indexAccumulatorPair.rhs); + } + + @Override public Sequence mergeSequences(Sequence> seqOfSequences) { @@ -125,7 +163,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest preMergeQueryDecoration(QueryRunner runner) { - return new IntervalChunkingQueryRunner(runner, configSupplier.get().getChunkPeriod()); + return new SubqueryQueryRunner( + new IntervalChunkingQueryRunner(runner, configSupplier.get().getChunkPeriod())); } } diff --git a/processing/src/main/java/io/druid/query/groupby/orderby/DefaultLimitSpec.java b/processing/src/main/java/io/druid/query/groupby/orderby/DefaultLimitSpec.java index 3bb79ed9617..eda54ea0dc3 100644 --- a/processing/src/main/java/io/druid/query/groupby/orderby/DefaultLimitSpec.java +++ b/processing/src/main/java/io/druid/query/groupby/orderby/DefaultLimitSpec.java @@ -196,6 +196,25 @@ public class DefaultLimitSpec implements LimitSpec { return Sequences.limit(input, limit); } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + LimitingFn that = (LimitingFn) o; + + if (limit != that.limit) return false; + + return true; + } + + @Override + public int hashCode() + { + return limit; + } } private static class SortingFn implements Function, Sequence> @@ -209,6 +228,25 @@ public class DefaultLimitSpec implements LimitSpec { return Sequences.sort(input, ordering); } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + SortingFn sortingFn = (SortingFn) o; + + if (ordering != null ? !ordering.equals(sortingFn.ordering) : sortingFn.ordering != null) return false; + + return true; + } + + @Override + public int hashCode() + { + return ordering != null ? ordering.hashCode() : 0; + } } private static class TopNFunction implements Function, Sequence> @@ -231,5 +269,49 @@ public class DefaultLimitSpec implements LimitSpec final ArrayList materializedList = Sequences.toList(input, Lists.newArrayList()); return Sequences.simple(sorter.toTopN(materializedList, limit)); } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + TopNFunction that = (TopNFunction) o; + + if (limit != that.limit) return false; + if (sorter != null ? !sorter.equals(that.sorter) : that.sorter != null) return false; + + return true; + } + + @Override + public int hashCode() + { + int result = sorter != null ? sorter.hashCode() : 0; + result = 31 * result + limit; + return result; + } + } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + DefaultLimitSpec that = (DefaultLimitSpec) o; + + if (limit != that.limit) return false; + if (columns != null ? !columns.equals(that.columns) : that.columns != null) return false; + + return true; + } + + @Override + public int hashCode() + { + int result = columns != null ? columns.hashCode() : 0; + result = 31 * result + limit; + return result; } } diff --git a/processing/src/main/java/io/druid/query/groupby/orderby/NoopLimitSpec.java b/processing/src/main/java/io/druid/query/groupby/orderby/NoopLimitSpec.java index 6fbc063c72d..d975e24a65f 100644 --- a/processing/src/main/java/io/druid/query/groupby/orderby/NoopLimitSpec.java +++ b/processing/src/main/java/io/druid/query/groupby/orderby/NoopLimitSpec.java @@ -46,4 +46,15 @@ public class NoopLimitSpec implements LimitSpec { return "NoopLimitSpec"; } + + @Override + public boolean equals(Object other) + { + return (other instanceof NoopLimitSpec); + } + + @Override + public int hashCode() { + return 0; + } } diff --git a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java index 00642baee3c..85fa61d26e9 100644 --- a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java @@ -147,7 +147,7 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest @JsonProperty("context") Map context ) { - super(dataSource, querySegmentSpec, context); + super(new TableDataSource(dataSource), querySegmentSpec, context); this.toInclude = toInclude == null ? new AllColumnIncluderator() : toInclude; this.merge = merge == null ? false : merge; @@ -76,13 +77,40 @@ public class SegmentMetadataQuery extends BaseQuery public Query withOverriddenContext(Map contextOverride) { return new SegmentMetadataQuery( - getDataSource(), getQuerySegmentSpec(), toInclude, merge, computeOverridenContext(contextOverride) + ((TableDataSource)getDataSource()).getName(), + getQuerySegmentSpec(), toInclude, merge, computeOverridenContext(contextOverride) ); } @Override public Query withQuerySegmentSpec(QuerySegmentSpec spec) { - return new SegmentMetadataQuery(getDataSource(), spec, toInclude, merge, getContext()); + return new SegmentMetadataQuery( + ((TableDataSource)getDataSource()).getName(), + spec, toInclude, merge, getContext()); + } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + if (!super.equals(o)) return false; + + SegmentMetadataQuery that = (SegmentMetadataQuery) o; + + if (merge != that.merge) return false; + if (toInclude != null ? !toInclude.equals(that.toInclude) : that.toInclude != null) return false; + + return true; + } + + @Override + public int hashCode() + { + int result = super.hashCode(); + result = 31 * result + (toInclude != null ? toInclude.hashCode() : 0); + result = 31 * result + (merge ? 1 : 0); + return result; } } diff --git a/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java index 980e6dc2e5c..a11a4c2ac01 100644 --- a/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java @@ -121,7 +121,7 @@ public class SearchQueryQueryToolChest extends QueryToolChest> @JsonCreator public SearchQuery( - @JsonProperty("dataSource") String dataSource, + @JsonProperty("dataSource") DataSource dataSource, @JsonProperty("filter") DimFilter dimFilter, @JsonProperty("granularity") QueryGranularity granularity, @JsonProperty("limit") int limit, @@ -181,13 +182,45 @@ public class SearchQuery extends BaseQuery> public String toString() { return "SearchQuery{" + - "dataSource='" + getDataSource() + '\'' + - ", dimFilter=" + dimFilter + - ", granularity='" + granularity + '\'' + - ", dimensions=" + dimensions + - ", querySpec=" + querySpec + - ", querySegmentSpec=" + getQuerySegmentSpec() + - ", limit=" + limit + - '}'; + "dataSource='" + getDataSource() + '\'' + + ", dimFilter=" + dimFilter + + ", granularity='" + granularity + '\'' + + ", dimensions=" + dimensions + + ", querySpec=" + querySpec + + ", querySegmentSpec=" + getQuerySegmentSpec() + + ", limit=" + limit + + '}'; + } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + if (!super.equals(o)) return false; + + SearchQuery that = (SearchQuery) o; + + if (limit != that.limit) return false; + if (dimFilter != null ? !dimFilter.equals(that.dimFilter) : that.dimFilter != null) return false; + if (dimensions != null ? !dimensions.equals(that.dimensions) : that.dimensions != null) return false; + if (granularity != null ? !granularity.equals(that.granularity) : that.granularity != null) return false; + if (querySpec != null ? !querySpec.equals(that.querySpec) : that.querySpec != null) return false; + if (sortSpec != null ? !sortSpec.equals(that.sortSpec) : that.sortSpec != null) return false; + + return true; + } + + @Override + public int hashCode() + { + int result = super.hashCode(); + result = 31 * result + (dimFilter != null ? dimFilter.hashCode() : 0); + result = 31 * result + (sortSpec != null ? sortSpec.hashCode() : 0); + result = 31 * result + (granularity != null ? granularity.hashCode() : 0); + result = 31 * result + (dimensions != null ? dimensions.hashCode() : 0); + result = 31 * result + (querySpec != null ? querySpec.hashCode() : 0); + result = 31 * result + limit; + return result; } } diff --git a/processing/src/main/java/io/druid/query/select/SelectQuery.java b/processing/src/main/java/io/druid/query/select/SelectQuery.java index 8c5eb2ba59f..bcd29cb7f96 100644 --- a/processing/src/main/java/io/druid/query/select/SelectQuery.java +++ b/processing/src/main/java/io/druid/query/select/SelectQuery.java @@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; import io.druid.granularity.QueryGranularity; import io.druid.query.BaseQuery; +import io.druid.query.DataSource; import io.druid.query.Query; import io.druid.query.Result; import io.druid.query.filter.DimFilter; @@ -45,7 +46,7 @@ public class SelectQuery extends BaseQuery> @JsonCreator public SelectQuery( - @JsonProperty("dataSource") String dataSource, + @JsonProperty("dataSource") DataSource dataSource, @JsonProperty("intervals") QuerySegmentSpec querySegmentSpec, @JsonProperty("filter") DimFilter dimFilter, @JsonProperty("granularity") QueryGranularity granularity, @@ -146,4 +147,34 @@ public class SelectQuery extends BaseQuery> ", pagingSpec=" + pagingSpec + '}'; } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + if (!super.equals(o)) return false; + + SelectQuery that = (SelectQuery) o; + + if (dimFilter != null ? !dimFilter.equals(that.dimFilter) : that.dimFilter != null) return false; + if (dimensions != null ? !dimensions.equals(that.dimensions) : that.dimensions != null) return false; + if (granularity != null ? !granularity.equals(that.granularity) : that.granularity != null) return false; + if (metrics != null ? !metrics.equals(that.metrics) : that.metrics != null) return false; + if (pagingSpec != null ? !pagingSpec.equals(that.pagingSpec) : that.pagingSpec != null) return false; + + return true; + } + + @Override + public int hashCode() + { + int result = super.hashCode(); + result = 31 * result + (dimFilter != null ? dimFilter.hashCode() : 0); + result = 31 * result + (granularity != null ? granularity.hashCode() : 0); + result = 31 * result + (dimensions != null ? dimensions.hashCode() : 0); + result = 31 * result + (metrics != null ? metrics.hashCode() : 0); + result = 31 * result + (pagingSpec != null ? pagingSpec.hashCode() : 0); + return result; + } } diff --git a/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java index 8a728749b22..4e45ffac9e5 100644 --- a/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java @@ -123,7 +123,7 @@ public class SelectQueryQueryToolChest extends QueryToolChest context ) diff --git a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java index 7ccba774799..352cd38b023 100644 --- a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java @@ -78,7 +78,7 @@ public class TimeBoundaryQueryQueryToolChest public boolean apply(T input) { return input.getInterval().overlaps(first.getInterval()) || input.getInterval() - .overlaps(second.getInterval()); + .overlaps(second.getInterval()); } } ) @@ -117,7 +117,7 @@ public class TimeBoundaryQueryQueryToolChest public ServiceMetricEvent.Builder makeMetricBuilder(TimeBoundaryQuery query) { return new ServiceMetricEvent.Builder() - .setUser2(query.getDataSource()) + .setUser2(query.getDataSource().toString()) .setUser4(query.getType()) .setUser6("false") .setUser10(query.getId()); @@ -146,9 +146,9 @@ public class TimeBoundaryQueryQueryToolChest public byte[] computeCacheKey(TimeBoundaryQuery query) { return ByteBuffer.allocate(2) - .put(TIMEBOUNDARY_QUERY) - .put(query.getCacheKey()) - .array(); + .put(TIMEBOUNDARY_QUERY) + .put(query.getCacheKey()) + .array(); } @Override diff --git a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQuery.java b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQuery.java index ab5b649a896..a1de320f5ec 100644 --- a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQuery.java +++ b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQuery.java @@ -25,6 +25,7 @@ import com.fasterxml.jackson.annotation.JsonTypeName; import com.google.common.collect.ImmutableList; import io.druid.granularity.QueryGranularity; import io.druid.query.BaseQuery; +import io.druid.query.DataSource; import io.druid.query.Queries; import io.druid.query.Query; import io.druid.query.Result; @@ -48,7 +49,7 @@ public class TimeseriesQuery extends BaseQuery> @JsonCreator public TimeseriesQuery( - @JsonProperty("dataSource") String dataSource, + @JsonProperty("dataSource") DataSource dataSource, @JsonProperty("intervals") QuerySegmentSpec querySegmentSpec, @JsonProperty("filter") DimFilter dimFilter, @JsonProperty("granularity") QueryGranularity granularity, @@ -132,14 +133,43 @@ public class TimeseriesQuery extends BaseQuery> public String toString() { return "TimeseriesQuery{" + - "dataSource='" + getDataSource() + '\'' + - ", querySegmentSpec=" + getQuerySegmentSpec() + - ", dimFilter=" + dimFilter + - ", granularity='" + granularity + '\'' + - ", aggregatorSpecs=" + aggregatorSpecs + - ", postAggregatorSpecs=" + postAggregatorSpecs + - ", context=" + getContext() + - '}'; + "dataSource='" + getDataSource() + '\'' + + ", querySegmentSpec=" + getQuerySegmentSpec() + + ", dimFilter=" + dimFilter + + ", granularity='" + granularity + '\'' + + ", aggregatorSpecs=" + aggregatorSpecs + + ", postAggregatorSpecs=" + postAggregatorSpecs + + ", context=" + getContext() + + '}'; } + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + if (!super.equals(o)) return false; + + TimeseriesQuery that = (TimeseriesQuery) o; + + if (aggregatorSpecs != null ? !aggregatorSpecs.equals(that.aggregatorSpecs) : that.aggregatorSpecs != null) + return false; + if (dimFilter != null ? !dimFilter.equals(that.dimFilter) : that.dimFilter != null) return false; + if (granularity != null ? !granularity.equals(that.granularity) : that.granularity != null) return false; + if (postAggregatorSpecs != null ? !postAggregatorSpecs.equals(that.postAggregatorSpecs) : that.postAggregatorSpecs != null) + return false; + + return true; + } + + @Override + public int hashCode() + { + int result = super.hashCode(); + result = 31 * result + (dimFilter != null ? dimFilter.hashCode() : 0); + result = 31 * result + (granularity != null ? granularity.hashCode() : 0); + result = 31 * result + (aggregatorSpecs != null ? aggregatorSpecs.hashCode() : 0); + result = 31 * result + (postAggregatorSpecs != null ? postAggregatorSpecs.hashCode() : 0); + return result; + } } diff --git a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java index d5de1bda272..cefe50c5384 100644 --- a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java @@ -123,7 +123,7 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest> @JsonCreator public TopNQuery( - @JsonProperty("dataSource") String dataSource, + @JsonProperty("dataSource") DataSource dataSource, @JsonProperty("dimension") DimensionSpec dimensionSpec, @JsonProperty("metric") TopNMetricSpec topNMetricSpec, @JsonProperty("threshold") int threshold, @@ -208,4 +209,42 @@ public class TopNQuery extends BaseQuery> ", postAggregatorSpecs=" + postAggregatorSpecs + '}'; } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + if (!super.equals(o)) return false; + + TopNQuery topNQuery = (TopNQuery) o; + + if (threshold != topNQuery.threshold) return false; + if (aggregatorSpecs != null ? !aggregatorSpecs.equals(topNQuery.aggregatorSpecs) : topNQuery.aggregatorSpecs != null) + return false; + if (dimFilter != null ? !dimFilter.equals(topNQuery.dimFilter) : topNQuery.dimFilter != null) return false; + if (dimensionSpec != null ? !dimensionSpec.equals(topNQuery.dimensionSpec) : topNQuery.dimensionSpec != null) + return false; + if (granularity != null ? !granularity.equals(topNQuery.granularity) : topNQuery.granularity != null) return false; + if (postAggregatorSpecs != null ? !postAggregatorSpecs.equals(topNQuery.postAggregatorSpecs) : topNQuery.postAggregatorSpecs != null) + return false; + if (topNMetricSpec != null ? !topNMetricSpec.equals(topNQuery.topNMetricSpec) : topNQuery.topNMetricSpec != null) + return false; + + return true; + } + + @Override + public int hashCode() + { + int result = super.hashCode(); + result = 31 * result + (dimensionSpec != null ? dimensionSpec.hashCode() : 0); + result = 31 * result + (topNMetricSpec != null ? topNMetricSpec.hashCode() : 0); + result = 31 * result + threshold; + result = 31 * result + (dimFilter != null ? dimFilter.hashCode() : 0); + result = 31 * result + (granularity != null ? granularity.hashCode() : 0); + result = 31 * result + (aggregatorSpecs != null ? aggregatorSpecs.hashCode() : 0); + result = 31 * result + (postAggregatorSpecs != null ? postAggregatorSpecs.hashCode() : 0); + return result; + } } diff --git a/processing/src/main/java/io/druid/query/topn/TopNQueryBuilder.java b/processing/src/main/java/io/druid/query/topn/TopNQueryBuilder.java index 1bfb690f490..21efd3b8351 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNQueryBuilder.java +++ b/processing/src/main/java/io/druid/query/topn/TopNQueryBuilder.java @@ -21,6 +21,8 @@ package io.druid.query.topn; import com.google.common.collect.Lists; import io.druid.granularity.QueryGranularity; +import io.druid.query.DataSource; +import io.druid.query.TableDataSource; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.PostAggregator; import io.druid.query.dimension.DefaultDimensionSpec; @@ -58,7 +60,7 @@ import java.util.Map; */ public class TopNQueryBuilder { - private String dataSource; + private DataSource dataSource; private DimensionSpec dimensionSpec; private TopNMetricSpec topNMetricSpec; private int threshold; @@ -71,7 +73,7 @@ public class TopNQueryBuilder public TopNQueryBuilder() { - dataSource = ""; + dataSource = null; dimensionSpec = null; topNMetricSpec = null; threshold = 0; @@ -83,7 +85,7 @@ public class TopNQueryBuilder context = null; } - public String getDataSource() + public DataSource getDataSource() { return dataSource; } @@ -152,7 +154,7 @@ public class TopNQueryBuilder public TopNQueryBuilder copy(TopNQuery query) { return new TopNQueryBuilder() - .dataSource(query.getDataSource()) + .dataSource(query.getDataSource().toString()) .dimension(query.getDimensionSpec()) .metric(query.getTopNMetricSpec()) .threshold(query.getThreshold()) @@ -180,6 +182,12 @@ public class TopNQueryBuilder } public TopNQueryBuilder dataSource(String d) + { + dataSource = new TableDataSource(d); + return this; + } + + public TopNQueryBuilder dataSource(DataSource d) { dataSource = d; return this; diff --git a/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java index dc4dd1b4855..de0f28dc204 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java @@ -128,7 +128,7 @@ public class TopNQueryQueryToolChest extends QueryToolChest() - { - @Override - public boolean apply(@Nullable String input) - { - return predicate.applyInContext(cx, input); - } - }) - .transform( - new com.google.common.base.Function() - { - @Override - public ImmutableConciseSet apply(@Nullable String input) - { - return selector.getConciseInvertedIndex(dimension, input); - } - } - ) - ); + final Indexed dimValues = selector.getDimensionValues(dimension); + ImmutableConciseSet conciseSet; + if (dimValues == null) { + conciseSet = new ImmutableConciseSet(); + } else { + conciseSet = ImmutableConciseSet.union( + FunctionalIterable.create(dimValues) + .filter(new Predicate() + { + @Override + public boolean apply(@Nullable String input) + { + return predicate.applyInContext(cx, input); + } + }) + .transform( + new com.google.common.base.Function() + { + @Override + public ImmutableConciseSet apply(@Nullable String input) + { + return selector.getConciseInvertedIndex(dimension, input); + } + } + ) + ); + } return conciseSet; } finally { Context.exit(); @@ -83,12 +90,14 @@ public class JavaScriptFilter implements Filter return factory.makeValueMatcher(dimension, predicate); } - static class JavaScriptPredicate implements Predicate { + static class JavaScriptPredicate implements Predicate + { final ScriptableObject scope; final Function fnApply; final String script; - public JavaScriptPredicate(final String script) { + public JavaScriptPredicate(final String script) + { Preconditions.checkNotNull(script, "script must not be null"); this.script = script; diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java index d0243f39123..8843879f91b 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java @@ -126,8 +126,13 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter @Override public Iterable makeCursors(final Filter filter, final Interval interval, final QueryGranularity gran) { + if (index.isEmpty()) { + return ImmutableList.of(); + } + Interval actualIntervalTmp = interval; + final Interval dataInterval = new Interval(getMinTime().getMillis(), gran.next(getMaxTime().getMillis())); if (!actualIntervalTmp.overlaps(dataInterval)) { return ImmutableList.of(); diff --git a/processing/src/test/java/io/druid/query/DataSourceTest.java b/processing/src/test/java/io/druid/query/DataSourceTest.java new file mode 100644 index 00000000000..5819fc49701 --- /dev/null +++ b/processing/src/test/java/io/druid/query/DataSourceTest.java @@ -0,0 +1,88 @@ +/* + * Druid - a distributed column store. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * + * This file Copyright (C) 2014 N3TWORK, Inc. and contributed to the Druid project + * under the Druid Corporate Contributor License Agreement. + */ + +package io.druid.query; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Lists; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.LongSumAggregatorFactory; +import io.druid.query.dimension.DefaultDimensionSpec; +import io.druid.query.dimension.DimensionSpec; +import io.druid.query.groupby.GroupByQuery; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.util.Arrays; + +public class DataSourceTest +{ + private static final ObjectMapper jsonMapper = new DefaultObjectMapper(); + + @Test + public void testSerialization() throws IOException + { + DataSource dataSource = new TableDataSource("somedatasource"); + String json = jsonMapper.writeValueAsString(dataSource); + DataSource serdeDataSource = jsonMapper.readValue(json, DataSource.class); + Assert.assertEquals(dataSource, serdeDataSource); + } + + @Test + public void testLegacyDataSource() throws IOException + { + DataSource dataSource = jsonMapper.readValue("\"somedatasource\"", DataSource.class); + Assert.assertEquals(new TableDataSource("somedatasource"), dataSource); + } + + @Test + public void testTableDataSource() throws IOException + { + DataSource dataSource = jsonMapper.readValue("{\"type\":\"table\", \"name\":\"somedatasource\"}", DataSource.class); + Assert.assertEquals(new TableDataSource("somedatasource"), dataSource); + } + + @Test + public void testQueryDataSource() throws IOException + { + GroupByQuery query = GroupByQuery + .builder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird) + .setDimensions(Lists.newArrayList(new DefaultDimensionSpec("quality", "alias"))) + .setAggregatorSpecs( + Arrays.asList( + QueryRunnerTestHelper.rowsCount, + new LongSumAggregatorFactory("idx", "index") + ) + ) + .setGranularity(QueryRunnerTestHelper.dayGran) + .build(); + + String dataSourceJSON = "{\"type\":\"query\", \"query\":" + jsonMapper.writeValueAsString(query) + "}"; + + DataSource dataSource = jsonMapper.readValue(dataSourceJSON, DataSource.class); + Assert.assertEquals(new QueryDataSource(query), dataSource); + } + +} diff --git a/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java b/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java index d4058b86345..519c7375b10 100644 --- a/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java +++ b/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java @@ -111,9 +111,15 @@ public class QueryRunnerTestHelper public static final QuerySegmentSpec firstToThird = new MultipleIntervalSegmentSpec( Arrays.asList(new Interval("2011-04-01T00:00:00.000Z/2011-04-03T00:00:00.000Z")) ); + public static final QuerySegmentSpec secondOnly = new MultipleIntervalSegmentSpec( + Arrays.asList(new Interval("2011-04-02T00:00:00.000Z/P1D")) + ); public static final QuerySegmentSpec fullOnInterval = new MultipleIntervalSegmentSpec( Arrays.asList(new Interval("1970-01-01T00:00:00.000Z/2020-01-01T00:00:00.000Z")) ); + public static final QuerySegmentSpec emptyInterval = new MultipleIntervalSegmentSpec( + Arrays.asList(new Interval("2020-04-02T00:00:00.000Z/P1D")) + ); @SuppressWarnings("unchecked") public static Collection makeQueryRunners( diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java index e1556e94da7..f336a26884e 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java @@ -38,11 +38,14 @@ import io.druid.granularity.QueryGranularity; import io.druid.query.Query; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerTestHelper; +import io.druid.query.QueryToolChest; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.DoubleSumAggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory; +import io.druid.query.aggregation.MaxAggregatorFactory; import io.druid.query.dimension.DefaultDimensionSpec; import io.druid.query.dimension.DimensionSpec; +import io.druid.query.filter.JavaScriptDimFilter; import io.druid.query.filter.RegexDimFilter; import io.druid.query.groupby.having.EqualToHavingSpec; import io.druid.query.groupby.having.GreaterThanHavingSpec; @@ -56,7 +59,9 @@ import org.joda.time.DateTime; import org.joda.time.DateTimeZone; import org.joda.time.Interval; import org.joda.time.Period; +import org.junit.Assert; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -90,22 +95,24 @@ public class GroupByQueryRunnerTest config.setMaxIntermediateRows(10000); final Supplier configSupplier = Suppliers.ofInstance(config); - final GroupByQueryRunnerFactory factory = new GroupByQueryRunnerFactory( - new GroupByQueryEngine( - configSupplier, - new StupidPool( - new Supplier() - { - @Override - public ByteBuffer get() - { - return ByteBuffer.allocate(1024 * 1024); - } - } - ) - ), + final GroupByQueryEngine engine = new GroupByQueryEngine( configSupplier, - new GroupByQueryQueryToolChest(configSupplier) + new StupidPool( + new Supplier() + { + @Override + public ByteBuffer get() + { + return ByteBuffer.allocate(1024 * 1024); + } + } + ) + ); + + final GroupByQueryRunnerFactory factory = new GroupByQueryRunnerFactory( + engine, + configSupplier, + new GroupByQueryQueryToolChest(configSupplier, engine) ); return Lists.newArrayList( @@ -167,8 +174,7 @@ public class GroupByQueryRunnerTest createExpectedRow("2011-04-02", "alias", "travel", "rows", 1L, "idx", 126L) ); - Iterable results = Sequences.toList(runner.run(query), Lists.newArrayList()); - + Iterable results = runQuery(query); TestHelper.assertExpectedObjects(expectedResults, results, ""); } @@ -178,33 +184,33 @@ public class GroupByQueryRunnerTest DateTimeZone tz = DateTimeZone.forID("America/Los_Angeles"); GroupByQuery query = GroupByQuery.builder() - .setDataSource(QueryRunnerTestHelper.dataSource) - .setInterval("2011-03-31T00:00:00-07:00/2011-04-02T00:00:00-07:00") - .setDimensions( - Lists.newArrayList( - (DimensionSpec) new DefaultDimensionSpec( - "quality", - "alias" - ) - ) - ) - .setAggregatorSpecs( - Arrays.asList( - QueryRunnerTestHelper.rowsCount, - new LongSumAggregatorFactory( - "idx", - "index" - ) - ) - ) - .setGranularity( - new PeriodGranularity( - new Period("P1D"), - null, - tz - ) - ) - .build(); + .setDataSource(QueryRunnerTestHelper.dataSource) + .setInterval("2011-03-31T00:00:00-07:00/2011-04-02T00:00:00-07:00") + .setDimensions( + Lists.newArrayList( + (DimensionSpec) new DefaultDimensionSpec( + "quality", + "alias" + ) + ) + ) + .setAggregatorSpecs( + Arrays.asList( + QueryRunnerTestHelper.rowsCount, + new LongSumAggregatorFactory( + "idx", + "index" + ) + ) + ) + .setGranularity( + new PeriodGranularity( + new Period("P1D"), + null, + tz + ) + ) + .build(); List expectedResults = Arrays.asList( createExpectedRow(new DateTime("2011-03-31", tz), "alias", "automotive", "rows", 1L, "idx", 135L), @@ -228,11 +234,7 @@ public class GroupByQueryRunnerTest createExpectedRow(new DateTime("2011-04-01", tz), "alias", "travel", "rows", 1L, "idx", 126L) ); - Iterable results = Sequences.toList( - runner.run(query), - Lists.newArrayList() - ); - + Iterable results = runQuery(query); TestHelper.assertExpectedObjects(expectedResults, results, ""); } @@ -661,7 +663,21 @@ public class GroupByQueryRunnerTest createExpectedRow("2011-04-01", "quality", "automotive", "rows", 2L) ); - QueryRunner mergeRunner = new GroupByQueryQueryToolChest(configSupplier).mergeResults(runner); + final GroupByQueryEngine engine = new GroupByQueryEngine( + configSupplier, + new StupidPool( + new Supplier() + { + @Override + public ByteBuffer get() + { + return ByteBuffer.allocate(1024 * 1024); + } + } + ) + ); + + QueryRunner mergeRunner = new GroupByQueryQueryToolChest(configSupplier, engine).mergeResults(runner); TestHelper.assertExpectedObjects(expectedResults, mergeRunner.run(query), "no-limit"); } @@ -696,7 +712,21 @@ public class GroupByQueryRunnerTest ); TestHelper.assertExpectedObjects(expectedResults, runner.run(query), "normal"); - QueryRunner mergeRunner = new GroupByQueryQueryToolChest(configSupplier).mergeResults(runner); + final GroupByQueryEngine engine = new GroupByQueryEngine( + configSupplier, + new StupidPool( + new Supplier() + { + @Override + public ByteBuffer get() + { + return ByteBuffer.allocate(1024 * 1024); + } + } + ) + ); + + QueryRunner mergeRunner = new GroupByQueryQueryToolChest(configSupplier, engine).mergeResults(runner); TestHelper.assertExpectedObjects(expectedResults, mergeRunner.run(query), "no-limit"); } @@ -731,10 +761,200 @@ public class GroupByQueryRunnerTest ); TestHelper.assertExpectedObjects(expectedResults, runner.run(query), "normal"); - QueryRunner mergeRunner = new GroupByQueryQueryToolChest(configSupplier).mergeResults(runner); + final GroupByQueryEngine engine = new GroupByQueryEngine( + configSupplier, + new StupidPool( + new Supplier() + { + @Override + public ByteBuffer get() + { + return ByteBuffer.allocate(1024 * 1024); + } + } + ) + ); + + QueryRunner mergeRunner = new GroupByQueryQueryToolChest(configSupplier, engine).mergeResults(runner); TestHelper.assertExpectedObjects(expectedResults, mergeRunner.run(query), "no-limit"); } + // A subquery identical to the query should yield identical results + @Test + public void testIdenticalSubquery() + { + GroupByQuery subquery = GroupByQuery + .builder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird) + .setDimensions(Lists.newArrayList(new DefaultDimensionSpec("quality", "alias"))) + .setDimFilter(new JavaScriptDimFilter("quality", "function(dim){ return true; }")) + .setAggregatorSpecs( + Arrays.asList( + QueryRunnerTestHelper.rowsCount, + new LongSumAggregatorFactory("idx", "index") + ) + ) + .setGranularity(QueryRunnerTestHelper.dayGran) + .build(); + + GroupByQuery query = GroupByQuery + .builder() + .setDataSource(subquery) + .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird) + .setDimensions(Lists.newArrayList(new DefaultDimensionSpec("alias", "alias"))) + .setAggregatorSpecs( + Arrays.asList( + new LongSumAggregatorFactory("rows", "rows"), + new LongSumAggregatorFactory("idx", "idx") + ) + ) + .setGranularity(QueryRunnerTestHelper.dayGran) + .build(); + + List expectedResults = Arrays.asList( + createExpectedRow("2011-04-01", "alias", "automotive", "rows", 1L, "idx", 135L), + createExpectedRow("2011-04-01", "alias", "business", "rows", 1L, "idx", 118L), + createExpectedRow("2011-04-01", "alias", "entertainment", "rows", 1L, "idx", 158L), + createExpectedRow("2011-04-01", "alias", "health", "rows", 1L, "idx", 120L), + createExpectedRow("2011-04-01", "alias", "mezzanine", "rows", 3L, "idx", 2870L), + createExpectedRow("2011-04-01", "alias", "news", "rows", 1L, "idx", 121L), + createExpectedRow("2011-04-01", "alias", "premium", "rows", 3L, "idx", 2900L), + createExpectedRow("2011-04-01", "alias", "technology", "rows", 1L, "idx", 78L), + createExpectedRow("2011-04-01", "alias", "travel", "rows", 1L, "idx", 119L), + + createExpectedRow("2011-04-02", "alias", "automotive", "rows", 1L, "idx", 147L), + createExpectedRow("2011-04-02", "alias", "business", "rows", 1L, "idx", 112L), + createExpectedRow("2011-04-02", "alias", "entertainment", "rows", 1L, "idx", 166L), + createExpectedRow("2011-04-02", "alias", "health", "rows", 1L, "idx", 113L), + createExpectedRow("2011-04-02", "alias", "mezzanine", "rows", 3L, "idx", 2447L), + createExpectedRow("2011-04-02", "alias", "news", "rows", 1L, "idx", 114L), + createExpectedRow("2011-04-02", "alias", "premium", "rows", 3L, "idx", 2505L), + createExpectedRow("2011-04-02", "alias", "technology", "rows", 1L, "idx", 97L), + createExpectedRow("2011-04-02", "alias", "travel", "rows", 1L, "idx", 126L) + ); + + // Subqueries are handled by the ToolChest + Iterable results = runQuery(query); + TestHelper.assertExpectedObjects(expectedResults, results, ""); + } + + @Test + public void testDifferentGroupingSubquery() + { + GroupByQuery subquery = GroupByQuery + .builder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird) + .setDimensions(Lists.newArrayList(new DefaultDimensionSpec("quality", "alias"))) + .setAggregatorSpecs( + Arrays.asList( + QueryRunnerTestHelper.rowsCount, + new LongSumAggregatorFactory("idx", "index") + ) + ) + .setGranularity(QueryRunnerTestHelper.dayGran) + .build(); + + GroupByQuery query = GroupByQuery + .builder() + .setDataSource(subquery) + .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird) + .setAggregatorSpecs( + Arrays.asList( + new MaxAggregatorFactory("idx", "idx") + ) + ) + .setGranularity(QueryRunnerTestHelper.dayGran) + .build(); + + List expectedResults = Arrays.asList( + createExpectedRow("2011-04-01", "idx", 2900.0), + createExpectedRow("2011-04-02", "idx", 2505.0) + ); + + Iterable results = runQuery(query); + TestHelper.assertExpectedObjects(expectedResults, results, ""); + } + + @Test + public void testDifferentIntervalSubquery() + { + GroupByQuery subquery = GroupByQuery + .builder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird) + .setDimensions(Lists.newArrayList(new DefaultDimensionSpec("quality", "alias"))) + .setAggregatorSpecs( + Arrays.asList( + QueryRunnerTestHelper.rowsCount, + new LongSumAggregatorFactory("idx", "index") + ) + ) + .setGranularity(QueryRunnerTestHelper.dayGran) + .build(); + + GroupByQuery query = GroupByQuery + .builder() + .setDataSource(subquery) + .setQuerySegmentSpec(QueryRunnerTestHelper.secondOnly) + .setAggregatorSpecs( + Arrays.asList( + new MaxAggregatorFactory("idx", "idx") + ) + ) + .setGranularity(QueryRunnerTestHelper.dayGran) + .build(); + + List expectedResults = Arrays.asList( + createExpectedRow("2011-04-02", "idx", 2505.0) + ); + + Iterable results = runQuery(query); + TestHelper.assertExpectedObjects(expectedResults, results, ""); + } + + @Test + public void testEmptySubquery() + { + GroupByQuery subquery = GroupByQuery + .builder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setQuerySegmentSpec(QueryRunnerTestHelper.emptyInterval) + .setDimensions(Lists.newArrayList(new DefaultDimensionSpec("quality", "alias"))) + .setAggregatorSpecs( + Arrays.asList( + QueryRunnerTestHelper.rowsCount, + new LongSumAggregatorFactory("idx", "index") + ) + ) + .setGranularity(QueryRunnerTestHelper.dayGran) + .build(); + + GroupByQuery query = GroupByQuery + .builder() + .setDataSource(subquery) + .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird) + .setAggregatorSpecs( + Arrays.asList( + new MaxAggregatorFactory("idx", "idx") + ) + ) + .setGranularity(QueryRunnerTestHelper.dayGran) + .build(); + + Iterable results = runQuery(query); + Assert.assertFalse(results.iterator().hasNext()); + } + + private Iterable runQuery(GroupByQuery query) + { + QueryToolChest toolChest = factory.getToolchest(); + Sequence queryResult = toolChest.mergeResults(toolChest.preMergeQueryDecoration(runner)).run(query); + return Sequences.toList(queryResult, Lists.newArrayList()); + } + + private Row createExpectedRow(final String timestamp, Object... vals) { return createExpectedRow(new DateTime(timestamp), vals); diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryTest.java new file mode 100644 index 00000000000..8557ba04c24 --- /dev/null +++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryTest.java @@ -0,0 +1,68 @@ +/* + * Druid - a distributed column store. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * + * This file Copyright (C) 2014 N3TWORK, Inc. and contributed to the Druid project + * under the Druid Corporate Contributor License Agreement. + */ + +package io.druid.query.groupby; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Lists; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.query.Druids; +import io.druid.query.Query; +import io.druid.query.QueryRunnerTestHelper; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.LongSumAggregatorFactory; +import io.druid.query.aggregation.PostAggregator; +import io.druid.query.dimension.DefaultDimensionSpec; +import io.druid.query.dimension.DimensionSpec; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.util.Arrays; + +public class GroupByQueryTest +{ + private static final ObjectMapper jsonMapper = new DefaultObjectMapper(); + + @Test + public void testQuerySerialization() throws IOException + { + Query query = GroupByQuery + .builder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird) + .setDimensions(Lists.newArrayList(new DefaultDimensionSpec("quality", "alias"))) + .setAggregatorSpecs( + Arrays.asList( + QueryRunnerTestHelper.rowsCount, + new LongSumAggregatorFactory("idx", "index") + ) + ) + .setGranularity(QueryRunnerTestHelper.dayGran) + .build(); + + String json = jsonMapper.writeValueAsString(query); + Query serdeQuery = jsonMapper.readValue(json, Query.class); + + Assert.assertEquals(query, serdeQuery); + } + +} diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByTimeseriesQueryRunnerTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByTimeseriesQueryRunnerTest.java index 562527556a9..2538e91bc76 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByTimeseriesQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByTimeseriesQueryRunnerTest.java @@ -56,22 +56,24 @@ public class GroupByTimeseriesQueryRunnerTest extends TimeseriesQueryRunnerTest config.setMaxIntermediateRows(10000); final Supplier configSupplier = Suppliers.ofInstance(config); - final GroupByQueryRunnerFactory factory = new GroupByQueryRunnerFactory( - new GroupByQueryEngine( - configSupplier, - new StupidPool( - new Supplier() - { - @Override - public ByteBuffer get() - { - return ByteBuffer.allocate(1024 * 1024); - } - } - ) - ), + final GroupByQueryEngine engine = new GroupByQueryEngine( configSupplier, - new GroupByQueryQueryToolChest(configSupplier) + new StupidPool( + new Supplier() + { + @Override + public ByteBuffer get() + { + return ByteBuffer.allocate(1024 * 1024); + } + } + ) + ); + + final GroupByQueryRunnerFactory factory = new GroupByQueryRunnerFactory( + engine, + configSupplier, + new GroupByQueryQueryToolChest(configSupplier, engine) ); final Collection objects = QueryRunnerTestHelper.makeQueryRunners(factory); @@ -95,13 +97,13 @@ public class GroupByTimeseriesQueryRunnerTest extends TimeseriesQueryRunnerTest return Sequences.map( groupByRunner.run( GroupByQuery.builder() - .setDataSource(tsQuery.getDataSource()) - .setQuerySegmentSpec(tsQuery.getQuerySegmentSpec()) - .setGranularity(tsQuery.getGranularity()) - .setDimFilter(tsQuery.getDimensionsFilter()) - .setAggregatorSpecs(tsQuery.getAggregatorSpecs()) - .setPostAggregatorSpecs(tsQuery.getPostAggregatorSpecs()) - .build() + .setDataSource(tsQuery.getDataSource()) + .setQuerySegmentSpec(tsQuery.getQuerySegmentSpec()) + .setGranularity(tsQuery.getGranularity()) + .setDimFilter(tsQuery.getDimensionsFilter()) + .setAggregatorSpecs(tsQuery.getAggregatorSpecs()) + .setPostAggregatorSpecs(tsQuery.getPostAggregatorSpecs()) + .build() ), new Function>() { diff --git a/processing/src/test/java/io/druid/query/search/SearchQueryTest.java b/processing/src/test/java/io/druid/query/search/SearchQueryTest.java new file mode 100644 index 00000000000..87bae26f0fd --- /dev/null +++ b/processing/src/test/java/io/druid/query/search/SearchQueryTest.java @@ -0,0 +1,54 @@ +/* + * Druid - a distributed column store. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * + * This file Copyright (C) 2014 N3TWORK, Inc. and contributed to the Druid project + * under the Druid Corporate Contributor License Agreement. + */ + +package io.druid.query.search; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.query.Druids; +import io.druid.query.Query; +import io.druid.query.QueryRunnerTestHelper; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; + +public class SearchQueryTest +{ + private static final ObjectMapper jsonMapper = new DefaultObjectMapper(); + + @Test + public void testQuerySerialization() throws IOException + { + Query query = Druids.newSearchQueryBuilder() + .dataSource(QueryRunnerTestHelper.dataSource) + .granularity(QueryRunnerTestHelper.allGran) + .intervals(QueryRunnerTestHelper.fullOnInterval) + .query("a") + .build(); + + String json = jsonMapper.writeValueAsString(query); + Query serdeQuery = jsonMapper.readValue(json, Query.class); + + Assert.assertEquals(query, serdeQuery); + } + +} diff --git a/processing/src/test/java/io/druid/query/select/SelectQueryRunnerTest.java b/processing/src/test/java/io/druid/query/select/SelectQueryRunnerTest.java index 6c7b26d6059..5015239870e 100644 --- a/processing/src/test/java/io/druid/query/select/SelectQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/select/SelectQueryRunnerTest.java @@ -28,6 +28,7 @@ import io.druid.jackson.DefaultObjectMapper; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerTestHelper; import io.druid.query.Result; +import io.druid.query.TableDataSource; import io.druid.query.filter.SelectorDimFilter; import io.druid.query.spec.LegacySegmentSpec; import org.joda.time.DateTime; @@ -72,7 +73,7 @@ public class SelectQueryRunnerTest public void testFullOnSelect() { SelectQuery query = new SelectQuery( - QueryRunnerTestHelper.dataSource, + new TableDataSource(QueryRunnerTestHelper.dataSource), QueryRunnerTestHelper.fullOnInterval, null, QueryRunnerTestHelper.allGran, @@ -141,7 +142,7 @@ public class SelectQueryRunnerTest public void testSelectWithDimsAndMets() { SelectQuery query = new SelectQuery( - QueryRunnerTestHelper.dataSource, + new TableDataSource(QueryRunnerTestHelper.dataSource), QueryRunnerTestHelper.fullOnInterval, null, QueryRunnerTestHelper.allGran, @@ -201,7 +202,7 @@ public class SelectQueryRunnerTest public void testSelectPagination() { SelectQuery query = new SelectQuery( - QueryRunnerTestHelper.dataSource, + new TableDataSource(QueryRunnerTestHelper.dataSource), QueryRunnerTestHelper.fullOnInterval, null, QueryRunnerTestHelper.allGran, @@ -261,7 +262,7 @@ public class SelectQueryRunnerTest public void testFullOnSelectWithFilter() { SelectQuery query = new SelectQuery( - QueryRunnerTestHelper.dataSource, + new TableDataSource(QueryRunnerTestHelper.dataSource), new LegacySegmentSpec(new Interval("2011-01-12/2011-01-14")), new SelectorDimFilter(QueryRunnerTestHelper.providerDimension, "spot"), QueryRunnerTestHelper.dayGran, diff --git a/processing/src/test/java/io/druid/query/timeboundary/TimeBoundaryQueryTest.java b/processing/src/test/java/io/druid/query/timeboundary/TimeBoundaryQueryTest.java new file mode 100644 index 00000000000..1dd50e9493d --- /dev/null +++ b/processing/src/test/java/io/druid/query/timeboundary/TimeBoundaryQueryTest.java @@ -0,0 +1,50 @@ +/* + * Druid - a distributed column store. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * + * This file Copyright (C) 2014 N3TWORK, Inc. and contributed to the Druid project + * under the Druid Corporate Contributor License Agreement. + */ + +package io.druid.query.timeboundary; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.query.Druids; +import io.druid.query.Query; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; + +public class TimeBoundaryQueryTest +{ + private static final ObjectMapper jsonMapper = new DefaultObjectMapper(); + + @Test + public void testQuerySerialization() throws IOException + { + Query query = Druids.newTimeBoundaryQueryBuilder() + .dataSource("testing") + .build(); + + String json = jsonMapper.writeValueAsString(query); + Query serdeQuery = jsonMapper.readValue(json, Query.class); + + Assert.assertEquals(query, serdeQuery); + } + +} diff --git a/processing/src/test/java/io/druid/query/timeseries/TimeseriesQueryTest.java b/processing/src/test/java/io/druid/query/timeseries/TimeseriesQueryTest.java new file mode 100644 index 00000000000..43e22ddfb94 --- /dev/null +++ b/processing/src/test/java/io/druid/query/timeseries/TimeseriesQueryTest.java @@ -0,0 +1,62 @@ +/* + * Druid - a distributed column store. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * + * This file Copyright (C) 2014 N3TWORK, Inc. and contributed to the Druid project + * under the Druid Corporate Contributor License Agreement. + */ + +package io.druid.query.timeseries; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.query.Druids; +import io.druid.query.Query; +import io.druid.query.QueryRunnerTestHelper; +import io.druid.query.aggregation.PostAggregator; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.util.Arrays; + +public class TimeseriesQueryTest +{ + private static final ObjectMapper jsonMapper = new DefaultObjectMapper(); + + @Test + public void testQuerySerialization() throws IOException + { + Query query = Druids.newTimeseriesQueryBuilder() + .dataSource(QueryRunnerTestHelper.dataSource) + .granularity(QueryRunnerTestHelper.dayGran) + .intervals(QueryRunnerTestHelper.fullOnInterval) + .aggregators( + Arrays.asList( + QueryRunnerTestHelper.rowsCount, + QueryRunnerTestHelper.indexDoubleSum + ) + ) + .postAggregators(Arrays.asList(QueryRunnerTestHelper.addRowsIndexConstant)) + .build(); + + String json = jsonMapper.writeValueAsString(query); + Query serdeQuery = jsonMapper.readValue(json, Query.class); + + Assert.assertEquals(query, serdeQuery); + } + +} diff --git a/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerTest.java b/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerTest.java index c938ef49259..291eab8171a 100644 --- a/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerTest.java @@ -133,32 +133,32 @@ public class TopNQueryRunnerTest new TopNResultValue( Arrays.>asList( ImmutableMap.builder() - .put("provider", "total_market") - .put("rows", 186L) - .put("index", 215679.82879638672D) - .put("addRowsIndexConstant", 215866.82879638672D) - .put("uniques", QueryRunnerTestHelper.UNIQUES_2) - .put("maxIndex", 1743.9217529296875D) - .put("minIndex", 792.3260498046875D) - .build(), + .put(providerDimension, "total_market") + .put("rows", 186L) + .put("index", 215679.82879638672D) + .put("addRowsIndexConstant", 215866.82879638672D) + .put("uniques", QueryRunnerTestHelper.UNIQUES_2) + .put("maxIndex", 1743.9217529296875D) + .put("minIndex", 792.3260498046875D) + .build(), ImmutableMap.builder() - .put("provider", "upfront") - .put("rows", 186L) - .put("index", 192046.1060180664D) - .put("addRowsIndexConstant", 192233.1060180664D) - .put("uniques", QueryRunnerTestHelper.UNIQUES_2) - .put("maxIndex", 1870.06103515625D) - .put("minIndex", 545.9906005859375D) - .build(), + .put(providerDimension, "upfront") + .put("rows", 186L) + .put("index", 192046.1060180664D) + .put("addRowsIndexConstant", 192233.1060180664D) + .put("uniques", QueryRunnerTestHelper.UNIQUES_2) + .put("maxIndex", 1870.06103515625D) + .put("minIndex", 545.9906005859375D) + .build(), ImmutableMap.builder() - .put("provider", "spot") - .put("rows", 837L) - .put("index", 95606.57232284546D) - .put("addRowsIndexConstant", 96444.57232284546D) - .put("uniques", QueryRunnerTestHelper.UNIQUES_9) - .put("maxIndex", 277.2735290527344D) - .put("minIndex", 59.02102279663086D) - .build() + .put(providerDimension, "spot") + .put("rows", 837L) + .put("index", 95606.57232284546D) + .put("addRowsIndexConstant", 96444.57232284546D) + .put("uniques", QueryRunnerTestHelper.UNIQUES_9) + .put("maxIndex", 277.2735290527344D) + .put("minIndex", 59.02102279663086D) + .build() ) ) ) @@ -197,32 +197,32 @@ public class TopNQueryRunnerTest new TopNResultValue( Arrays.>asList( ImmutableMap.builder() - .put("provider", "total_market") - .put("rows", 186L) - .put("index", 215679.82879638672D) - .put("addRowsIndexConstant", 215866.82879638672D) - .put("uniques", QueryRunnerTestHelper.UNIQUES_2) - .put("maxIndex", 1743.9217529296875D) - .put("minIndex", 792.3260498046875D) - .build(), + .put(providerDimension, "total_market") + .put("rows", 186L) + .put("index", 215679.82879638672D) + .put("addRowsIndexConstant", 215866.82879638672D) + .put("uniques", QueryRunnerTestHelper.UNIQUES_2) + .put("maxIndex", 1743.9217529296875D) + .put("minIndex", 792.3260498046875D) + .build(), ImmutableMap.builder() - .put("provider", "upfront") - .put("rows", 186L) - .put("index", 192046.1060180664D) - .put("addRowsIndexConstant", 192233.1060180664D) - .put("uniques", QueryRunnerTestHelper.UNIQUES_2) - .put("maxIndex", 1870.06103515625D) - .put("minIndex", 545.9906005859375D) - .build(), + .put(providerDimension, "upfront") + .put("rows", 186L) + .put("index", 192046.1060180664D) + .put("addRowsIndexConstant", 192233.1060180664D) + .put("uniques", QueryRunnerTestHelper.UNIQUES_2) + .put("maxIndex", 1870.06103515625D) + .put("minIndex", 545.9906005859375D) + .build(), ImmutableMap.builder() - .put("provider", "spot") - .put("rows", 837L) - .put("index", 95606.57232284546D) - .put("addRowsIndexConstant", 96444.57232284546D) - .put("uniques", QueryRunnerTestHelper.UNIQUES_9) - .put("maxIndex", 277.2735290527344D) - .put("minIndex", 59.02102279663086D) - .build() + .put(providerDimension, "spot") + .put("rows", 837L) + .put("index", 95606.57232284546D) + .put("addRowsIndexConstant", 96444.57232284546D) + .put("uniques", QueryRunnerTestHelper.UNIQUES_9) + .put("maxIndex", 277.2735290527344D) + .put("minIndex", 59.02102279663086D) + .build() ) ) ) @@ -262,32 +262,32 @@ public class TopNQueryRunnerTest new TopNResultValue( Arrays.>asList( ImmutableMap.builder() - .put("provider", "spot") - .put("rows", 837L) - .put("index", 95606.57232284546D) - .put("addRowsIndexConstant", 96444.57232284546D) - .put("uniques", QueryRunnerTestHelper.UNIQUES_9) - .put("maxIndex", 277.2735290527344D) - .put("minIndex", 59.02102279663086D) - .build(), + .put("provider", "spot") + .put("rows", 837L) + .put("index", 95606.57232284546D) + .put("addRowsIndexConstant", 96444.57232284546D) + .put("uniques", QueryRunnerTestHelper.UNIQUES_9) + .put("maxIndex", 277.2735290527344D) + .put("minIndex", 59.02102279663086D) + .build(), ImmutableMap.builder() - .put("provider", "total_market") - .put("rows", 186L) - .put("index", 215679.82879638672D) - .put("addRowsIndexConstant", 215866.82879638672D) - .put("uniques", QueryRunnerTestHelper.UNIQUES_2) - .put("maxIndex", 1743.9217529296875D) - .put("minIndex", 792.3260498046875D) - .build(), + .put("provider", "total_market") + .put("rows", 186L) + .put("index", 215679.82879638672D) + .put("addRowsIndexConstant", 215866.82879638672D) + .put("uniques", QueryRunnerTestHelper.UNIQUES_2) + .put("maxIndex", 1743.9217529296875D) + .put("minIndex", 792.3260498046875D) + .build(), ImmutableMap.builder() - .put("provider", "upfront") - .put("rows", 186L) - .put("index", 192046.1060180664D) - .put("addRowsIndexConstant", 192233.1060180664D) - .put("uniques", QueryRunnerTestHelper.UNIQUES_2) - .put("maxIndex", 1870.06103515625D) - .put("minIndex", 545.9906005859375D) - .build() + .put("provider", "upfront") + .put("rows", 186L) + .put("index", 192046.1060180664D) + .put("addRowsIndexConstant", 192233.1060180664D) + .put("uniques", QueryRunnerTestHelper.UNIQUES_2) + .put("maxIndex", 1870.06103515625D) + .put("minIndex", 545.9906005859375D) + .build() ) ) ) @@ -318,21 +318,21 @@ public class TopNQueryRunnerTest new TopNResultValue( Arrays.>asList( ImmutableMap.of( - "provider", "total_market", + providerDimension, "total_market", "rows", 4L, "index", 5351.814697265625D, "addRowsIndexConstant", 5356.814697265625D, "uniques", QueryRunnerTestHelper.UNIQUES_2 ), ImmutableMap.of( - "provider", "upfront", + providerDimension, "upfront", "rows", 4L, "index", 4875.669677734375D, "addRowsIndexConstant", 4880.669677734375D, "uniques", QueryRunnerTestHelper.UNIQUES_2 ), ImmutableMap.of( - "provider", "spot", + providerDimension, "spot", "rows", 18L, "index", 2231.8768157958984D, "addRowsIndexConstant", 2250.8768157958984D, @@ -416,21 +416,21 @@ public class TopNQueryRunnerTest new TopNResultValue( Arrays.>asList( ImmutableMap.of( - "provider", "total_market", + providerDimension, "total_market", "rows", 4L, "index", 5351.814697265625D, "addRowsIndexConstant", 5356.814697265625D, "uniques", QueryRunnerTestHelper.UNIQUES_2 ), ImmutableMap.of( - "provider", "upfront", + providerDimension, "upfront", "rows", 4L, "index", 4875.669677734375D, "addRowsIndexConstant", 4880.669677734375D, "uniques", QueryRunnerTestHelper.UNIQUES_2 ), ImmutableMap.of( - "provider", "spot", + providerDimension, "spot", "rows", 18L, "index", 2231.8768157958984D, "addRowsIndexConstant", 2250.8768157958984D, @@ -465,14 +465,14 @@ public class TopNQueryRunnerTest new TopNResultValue( Arrays.>asList( ImmutableMap.of( - "provider", "total_market", + providerDimension, "total_market", "rows", 4L, "index", 5351.814697265625D, "addRowsIndexConstant", 5356.814697265625D, "uniques", QueryRunnerTestHelper.UNIQUES_2 ), ImmutableMap.of( - "provider", "upfront", + providerDimension, "upfront", "rows", 4L, "index", 4875.669677734375D, "addRowsIndexConstant", 4880.669677734375D, @@ -507,7 +507,7 @@ public class TopNQueryRunnerTest new TopNResultValue( Arrays.>asList( ImmutableMap.of( - "provider", "upfront", + providerDimension, "upfront", "rows", 4L, "index", 4875.669677734375D, "addRowsIndexConstant", 4880.669677734375D, @@ -542,21 +542,21 @@ public class TopNQueryRunnerTest new TopNResultValue( Arrays.>asList( ImmutableMap.of( - "provider", "upfront", + providerDimension, "upfront", "rows", 2L, "index", 2591.68359375D, "addRowsIndexConstant", 2594.68359375D, "uniques", QueryRunnerTestHelper.UNIQUES_1 ), ImmutableMap.of( - "provider", "total_market", + providerDimension, "total_market", "rows", 2L, "index", 2508.39599609375D, "addRowsIndexConstant", 2511.39599609375D, "uniques", QueryRunnerTestHelper.UNIQUES_1 ), ImmutableMap.of( - "provider", "spot", + providerDimension, "spot", "rows", 2L, "index", 220.63774871826172D, "addRowsIndexConstant", 223.63774871826172D, @@ -595,21 +595,21 @@ public class TopNQueryRunnerTest new TopNResultValue( Arrays.>asList( ImmutableMap.of( - "provider", "upfront", + providerDimension, "upfront", "rows", 1L, "index", new Float(1447.341160).doubleValue(), "addRowsIndexConstant", new Float(1449.341160).doubleValue(), "uniques", QueryRunnerTestHelper.UNIQUES_1 ), ImmutableMap.of( - "provider", "total_market", + providerDimension, "total_market", "rows", 1L, "index", new Float(1314.839715).doubleValue(), "addRowsIndexConstant", new Float(1316.839715).doubleValue(), "uniques", QueryRunnerTestHelper.UNIQUES_1 ), ImmutableMap.of( - "provider", "spot", + providerDimension, "spot", "rows", 1L, "index", new Float(109.705815).doubleValue(), "addRowsIndexConstant", new Float(111.705815).doubleValue(), @@ -644,14 +644,14 @@ public class TopNQueryRunnerTest new TopNResultValue( Arrays.>asList( ImmutableMap.of( - "provider", "total_market", + providerDimension, "total_market", "rows", 4L, "index", 5351.814697265625D, "addRowsIndexConstant", 5356.814697265625D, "uniques", QueryRunnerTestHelper.UNIQUES_2 ), ImmutableMap.of( - "provider", "upfront", + providerDimension, "upfront", "rows", 4L, "index", 4875.669677734375D, "addRowsIndexConstant", 4880.669677734375D, @@ -695,18 +695,18 @@ public class TopNQueryRunnerTest public void testTopNWithNonExistentFilterMultiDim() { AndDimFilter andDimFilter = Druids.newAndDimFilterBuilder() - .fields( - Lists.newArrayList( - Druids.newSelectorDimFilterBuilder() - .dimension(providerDimension) - .value("billyblank") - .build(), - Druids.newSelectorDimFilterBuilder() - .dimension(QueryRunnerTestHelper.qualityDimension) - .value("mezzanine") - .build() - ) - ).build(); + .fields( + Lists.newArrayList( + Druids.newSelectorDimFilterBuilder() + .dimension(providerDimension) + .value("billyblank") + .build(), + Druids.newSelectorDimFilterBuilder() + .dimension(QueryRunnerTestHelper.qualityDimension) + .value("mezzanine") + .build() + ) + ).build(); TopNQuery query = new TopNQueryBuilder() .dataSource(QueryRunnerTestHelper.dataSource) .granularity(QueryRunnerTestHelper.allGran) @@ -966,21 +966,21 @@ public class TopNQueryRunnerTest new TopNResultValue( Arrays.>asList( ImmutableMap.of( - "provider", "spot", + providerDimension, "spot", "rows", 18L, "index", 2231.8768157958984D, "addRowsIndexConstant", 2250.8768157958984D, "uniques", QueryRunnerTestHelper.UNIQUES_9 ), ImmutableMap.of( - "provider", "total_market", + providerDimension, "total_market", "rows", 4L, "index", 5351.814697265625D, "addRowsIndexConstant", 5356.814697265625D, "uniques", QueryRunnerTestHelper.UNIQUES_2 ), ImmutableMap.of( - "provider", "upfront", + providerDimension, "upfront", "rows", 4L, "index", 4875.669677734375D, "addRowsIndexConstant", 4880.669677734375D, @@ -1014,14 +1014,14 @@ public class TopNQueryRunnerTest new TopNResultValue( Arrays.>asList( ImmutableMap.of( - "provider", "total_market", + providerDimension, "total_market", "rows", 4L, "index", 5351.814697265625D, "addRowsIndexConstant", 5356.814697265625D, "uniques", QueryRunnerTestHelper.UNIQUES_2 ), ImmutableMap.of( - "provider", "upfront", + providerDimension, "upfront", "rows", 4L, "index", 4875.669677734375D, "addRowsIndexConstant", 4880.669677734375D, @@ -1055,14 +1055,14 @@ public class TopNQueryRunnerTest new TopNResultValue( Arrays.>asList( ImmutableMap.of( - "provider", "total_market", + providerDimension, "total_market", "rows", 4L, "index", 5351.814697265625D, "addRowsIndexConstant", 5356.814697265625D, "uniques", QueryRunnerTestHelper.UNIQUES_2 ), ImmutableMap.of( - "provider", "upfront", + providerDimension, "upfront", "rows", 4L, "index", 4875.669677734375D, "addRowsIndexConstant", 4880.669677734375D, @@ -1100,21 +1100,21 @@ public class TopNQueryRunnerTest new TopNResultValue( Arrays.>asList( ImmutableMap.of( - "provider", "s", + providerDimension, "s", "rows", 18L, "index", 2231.8768157958984D, "addRowsIndexConstant", 2250.8768157958984D, "uniques", QueryRunnerTestHelper.UNIQUES_9 ), ImmutableMap.of( - "provider", "t", + providerDimension, "t", "rows", 4L, "index", 5351.814697265625D, "addRowsIndexConstant", 5356.814697265625D, "uniques", QueryRunnerTestHelper.UNIQUES_2 ), ImmutableMap.of( - "provider", "u", + providerDimension, "u", "rows", 4L, "index", 4875.669677734375D, "addRowsIndexConstant", 4880.669677734375D, @@ -1149,21 +1149,21 @@ public class TopNQueryRunnerTest new TopNResultValue( Arrays.>asList( ImmutableMap.of( - "provider", "spot", + providerDimension, "spot", "rows", 18L, "index", 2231.8768157958984D, "addRowsIndexConstant", 2250.8768157958984D, "uniques", QueryRunnerTestHelper.UNIQUES_9 ), ImmutableMap.of( - "provider", "upfront", + providerDimension, "upfront", "rows", 4L, "index", 4875.669677734375D, "addRowsIndexConstant", 4880.669677734375D, "uniques", QueryRunnerTestHelper.UNIQUES_2 ), ImmutableMap.of( - "provider", "total_market", + providerDimension, "total_market", "rows", 4L, "index", 5351.814697265625D, "addRowsIndexConstant", 5356.814697265625D, diff --git a/processing/src/test/java/io/druid/query/topn/TopNQueryTest.java b/processing/src/test/java/io/druid/query/topn/TopNQueryTest.java new file mode 100644 index 00000000000..f2f4ac22f3d --- /dev/null +++ b/processing/src/test/java/io/druid/query/topn/TopNQueryTest.java @@ -0,0 +1,81 @@ +/* + * Druid - a distributed column store. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * + * This file Copyright (C) 2014 N3TWORK, Inc. and contributed to the Druid project + * under the Druid Corporate Contributor License Agreement. + */ + +package io.druid.query.topn; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.query.Query; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.MaxAggregatorFactory; +import io.druid.query.aggregation.MinAggregatorFactory; +import io.druid.query.aggregation.PostAggregator; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.util.Arrays; + +import static io.druid.query.QueryRunnerTestHelper.addRowsIndexConstant; +import static io.druid.query.QueryRunnerTestHelper.allGran; +import static io.druid.query.QueryRunnerTestHelper.commonAggregators; +import static io.druid.query.QueryRunnerTestHelper.dataSource; +import static io.druid.query.QueryRunnerTestHelper.fullOnInterval; +import static io.druid.query.QueryRunnerTestHelper.indexMetric; +import static io.druid.query.QueryRunnerTestHelper.providerDimension; + +public class TopNQueryTest +{ + private static final ObjectMapper jsonMapper = new DefaultObjectMapper(); + + @Test + public void testQuerySerialization() throws IOException + { + Query query = new TopNQueryBuilder() + .dataSource(dataSource) + .granularity(allGran) + .dimension(providerDimension) + .metric(indexMetric) + .threshold(4) + .intervals(fullOnInterval) + .aggregators( + Lists.newArrayList( + Iterables.concat( + commonAggregators, + Lists.newArrayList( + new MaxAggregatorFactory("maxIndex", "index"), + new MinAggregatorFactory("minIndex", "index") + ) + ) + ) + ) + .postAggregators(Arrays.asList(addRowsIndexConstant)) + .build(); + + String json = jsonMapper.writeValueAsString(query); + Query serdeQuery = jsonMapper.readValue(json, Query.class); + + Assert.assertEquals(query, serdeQuery); + } + +} diff --git a/server/src/main/java/io/druid/client/BrokerServerView.java b/server/src/main/java/io/druid/client/BrokerServerView.java index d09a6d11ec6..be6a6553ae7 100644 --- a/server/src/main/java/io/druid/client/BrokerServerView.java +++ b/server/src/main/java/io/druid/client/BrokerServerView.java @@ -25,13 +25,16 @@ import com.google.common.collect.Ordering; import com.google.inject.Inject; import com.metamx.common.logger.Logger; import com.metamx.http.client.HttpClient; -import io.druid.client.selector.ServerSelector; import io.druid.client.selector.QueryableDruidServer; +import io.druid.client.selector.ServerSelector; import io.druid.client.selector.ServerSelectorStrategy; import io.druid.concurrent.Execs; import io.druid.guice.annotations.Client; +import io.druid.query.DataSource; +import io.druid.query.QueryDataSource; import io.druid.query.QueryRunner; import io.druid.query.QueryToolChestWarehouse; +import io.druid.query.TableDataSource; import io.druid.timeline.DataSegment; import io.druid.timeline.VersionedIntervalTimeline; import io.druid.timeline.partition.PartitionChunk; @@ -232,10 +235,21 @@ public class BrokerServerView implements TimelineServerView @Override - public VersionedIntervalTimeline getTimeline(String dataSource) + public VersionedIntervalTimeline getTimeline(DataSource dataSource) { + String table; + while (dataSource instanceof QueryDataSource) { + dataSource = ((QueryDataSource) dataSource).getQuery().getDataSource(); + } + + if (dataSource instanceof TableDataSource) { + table = ((TableDataSource) dataSource).getName(); + } else { + throw new UnsupportedOperationException("Unsupported data source type: " + dataSource.getClass().getSimpleName()); + } + synchronized (lock) { - return timelines.get(dataSource); + return timelines.get(table); } } diff --git a/server/src/main/java/io/druid/client/CachingClusteredClient.java b/server/src/main/java/io/druid/client/CachingClusteredClient.java index c17314c7cf8..a9d6538b200 100644 --- a/server/src/main/java/io/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/io/druid/client/CachingClusteredClient.java @@ -40,8 +40,8 @@ import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; import com.metamx.emitter.EmittingLogger; import io.druid.client.cache.Cache; -import io.druid.client.selector.ServerSelector; import io.druid.client.selector.QueryableDruidServer; +import io.druid.client.selector.ServerSelector; import io.druid.guice.annotations.Smile; import io.druid.query.BySegmentResultValueClass; import io.druid.query.CacheStrategy; @@ -124,7 +124,7 @@ public class CachingClusteredClient implements QueryRunner final boolean useCache = Boolean.parseBoolean(query.getContextValue("useCache", "true")) && strategy != null; final boolean populateCache = Boolean.parseBoolean(query.getContextValue("populateCache", "true")) - && strategy != null; + && strategy != null; final boolean isBySegment = Boolean.parseBoolean(query.getContextValue("bySegment", "false")); @@ -140,6 +140,7 @@ public class CachingClusteredClient implements QueryRunner final Query rewrittenQuery = query.withOverriddenContext(contextBuilder.build()); + VersionedIntervalTimeline timeline = serverView.getTimeline(query.getDataSource()); if (timeline == null) { return Sequences.empty(); @@ -285,8 +286,7 @@ public class CachingClusteredClient implements QueryRunner objectMapper.getFactory().createParser(cachedResult), cacheObjectClazz ); - } - catch (IOException e) { + } catch (IOException e) { throw Throwables.propagate(e); } } @@ -339,7 +339,7 @@ public class CachingClusteredClient implements QueryRunner CachePopulator cachePopulator = cachePopulatorMap.get( String.format("%s_%s", segmentIdentifier, value.getInterval()) ); - if(cachePopulator != null) { + if (cachePopulator != null) { cachePopulator.populate(Iterables.transform(segmentResults, prepareForCache)); } @@ -424,8 +424,7 @@ public class CachingClusteredClient implements QueryRunner } cache.put(key, valueBytes); - } - catch (IOException e) { + } catch (IOException e) { throw Throwables.propagate(e); } } diff --git a/server/src/main/java/io/druid/client/TimelineServerView.java b/server/src/main/java/io/druid/client/TimelineServerView.java index 7082c599c75..0a6a43c8fdb 100644 --- a/server/src/main/java/io/druid/client/TimelineServerView.java +++ b/server/src/main/java/io/druid/client/TimelineServerView.java @@ -20,6 +20,7 @@ package io.druid.client; import io.druid.client.selector.ServerSelector; +import io.druid.query.DataSource; import io.druid.query.QueryRunner; import io.druid.timeline.VersionedIntervalTimeline; @@ -27,6 +28,6 @@ import io.druid.timeline.VersionedIntervalTimeline; */ public interface TimelineServerView extends ServerView { - VersionedIntervalTimeline getTimeline(String dataSource); + VersionedIntervalTimeline getTimeline(DataSource dataSource); QueryRunner getQueryRunner(DruidServer server); } diff --git a/server/src/main/java/io/druid/guice/DataSegmentPusherPullerModule.java b/server/src/main/java/io/druid/guice/LocalDataStorageDruidModule.java similarity index 86% rename from server/src/main/java/io/druid/guice/DataSegmentPusherPullerModule.java rename to server/src/main/java/io/druid/guice/LocalDataStorageDruidModule.java index af45ea18f17..4511359a522 100644 --- a/server/src/main/java/io/druid/guice/DataSegmentPusherPullerModule.java +++ b/server/src/main/java/io/druid/guice/LocalDataStorageDruidModule.java @@ -23,15 +23,17 @@ import com.google.inject.Binder; import com.google.inject.Key; import com.google.inject.Module; import io.druid.segment.loading.DataSegmentPusher; +import io.druid.segment.loading.DataSegmentKiller; import io.druid.segment.loading.LocalDataSegmentPuller; import io.druid.segment.loading.LocalDataSegmentPusher; import io.druid.segment.loading.LocalDataSegmentPusherConfig; +import io.druid.segment.loading.LocalDataSegmentKiller; import io.druid.segment.loading.OmniSegmentLoader; import io.druid.segment.loading.SegmentLoader; /** */ -public class DataSegmentPusherPullerModule implements Module +public class LocalDataStorageDruidModule implements Module { @Override public void configure(Binder binder) @@ -52,6 +54,11 @@ public class DataSegmentPusherPullerModule implements Module .to(LocalDataSegmentPuller.class) .in(LazySingleton.class); + PolyBind.optionBinder(binder, Key.get(DataSegmentKiller.class)) + .addBinding("local") + .to(LocalDataSegmentKiller.class) + .in(LazySingleton.class); + PolyBind.optionBinder(binder, Key.get(DataSegmentPusher.class)) .addBinding("local") .to(LocalDataSegmentPusher.class) diff --git a/server/src/main/java/io/druid/initialization/Initialization.java b/server/src/main/java/io/druid/initialization/Initialization.java index 5e8e0461202..37dcc5821b0 100644 --- a/server/src/main/java/io/druid/initialization/Initialization.java +++ b/server/src/main/java/io/druid/initialization/Initialization.java @@ -36,7 +36,7 @@ import io.druid.curator.CuratorModule; import io.druid.curator.discovery.DiscoveryModule; import io.druid.guice.AWSModule; import io.druid.guice.AnnouncerModule; -import io.druid.guice.DataSegmentPusherPullerModule; +import io.druid.guice.LocalDataStorageDruidModule; import io.druid.guice.DbConnectorModule; import io.druid.guice.DruidGuiceExtensions; import io.druid.guice.DruidProcessingModule; @@ -316,7 +316,7 @@ public class Initialization new DbConnectorModule(), new JacksonConfigManagerModule(), new IndexingServiceDiscoveryModule(), - new DataSegmentPusherPullerModule(), + new LocalDataStorageDruidModule(), new FirehoseModule() ); diff --git a/server/src/main/java/io/druid/segment/loading/LocalDataSegmentKiller.java b/server/src/main/java/io/druid/segment/loading/LocalDataSegmentKiller.java new file mode 100644 index 00000000000..014805e4d0b --- /dev/null +++ b/server/src/main/java/io/druid/segment/loading/LocalDataSegmentKiller.java @@ -0,0 +1,68 @@ +package io.druid.segment.loading; + +import com.google.inject.Inject; +import com.metamx.common.MapUtils; +import com.metamx.common.logger.Logger; +import io.druid.segment.loading.DataSegmentKiller; +import io.druid.segment.loading.SegmentLoadingException; +import io.druid.timeline.DataSegment; + +import java.io.File; +import java.util.Map; + +/** + */ +public class LocalDataSegmentKiller implements DataSegmentKiller +{ + private static final Logger log = new Logger(LocalDataSegmentKiller.class); + + @Override + public void kill(DataSegment segment) throws SegmentLoadingException + { + final File path = getDirectory(segment); + log.info("segment[%s] maps to path[%s]", segment.getIdentifier(), path); + + if (!path.isDirectory()) { + if (!path.delete()) { + log.error("Unable to delete file[%s].", path); + throw new SegmentLoadingException("Couldn't kill segment[%s]", segment.getIdentifier()); + } + + return; + } + + final File[] files = path.listFiles(); + int success = 0; + + for (File file : files) { + if (!file.delete()) { + log.error("Unable to delete file[%s].", file); + } else { + ++success; + } + } + + if (success == 0 && files.length != 0) { + throw new SegmentLoadingException("Couldn't kill segment[%s]", segment.getIdentifier()); + } + + if (success < files.length) { + log.warn("Couldn't completely kill segment[%s]", segment.getIdentifier()); + } else if (!path.delete()) { + log.warn("Unable to delete directory[%s].", path); + log.warn("Couldn't completely kill segment[%s]", segment.getIdentifier()); + } + } + + private File getDirectory(DataSegment segment) throws SegmentLoadingException + { + final Map loadSpec = segment.getLoadSpec(); + final File path = new File(MapUtils.getString(loadSpec, "path")); + + if (!path.exists()) { + throw new SegmentLoadingException("Asked to load path[%s], but it doesn't exist.", path); + } + + return path.getParentFile(); + } +} diff --git a/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java b/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java index bb2b666db26..e54ff4bcb9c 100644 --- a/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java +++ b/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java @@ -30,6 +30,7 @@ import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.emitter.EmittingLogger; import io.druid.data.input.Firehose; import io.druid.data.input.InputRow; +import io.druid.query.DataSource; import io.druid.query.FinalizeResultsQueryRunner; import io.druid.query.NoopQueryRunner; import io.druid.query.Query; @@ -39,6 +40,7 @@ import io.druid.query.QueryRunnerFactoryConglomerate; import io.druid.query.QuerySegmentWalker; import io.druid.query.QueryToolChest; import io.druid.query.SegmentDescriptor; +import io.druid.query.TableDataSource; import io.druid.segment.realtime.plumber.Plumber; import io.druid.segment.realtime.plumber.Sink; import org.joda.time.DateTime; @@ -96,6 +98,7 @@ public class RealtimeManager implements QuerySegmentWalker Closeables.closeQuietly(chief); } } + public FireDepartmentMetrics getMetrics(String datasource) { FireChief chief = chiefs.get(datasource); @@ -108,7 +111,7 @@ public class RealtimeManager implements QuerySegmentWalker @Override public QueryRunner getQueryRunnerForIntervals(Query query, Iterable intervals) { - final FireChief chief = chiefs.get(query.getDataSource()); + final FireChief chief = chiefs.get(getDataSourceName(query)); return chief == null ? new NoopQueryRunner() : chief.getQueryRunner(query); } @@ -116,11 +119,28 @@ public class RealtimeManager implements QuerySegmentWalker @Override public QueryRunner getQueryRunnerForSegments(Query query, Iterable specs) { - final FireChief chief = chiefs.get(query.getDataSource()); + final FireChief chief = chiefs.get(getDataSourceName(query)); return chief == null ? new NoopQueryRunner() : chief.getQueryRunner(query); } + private String getDataSourceName(Query query) + { + DataSource dataSource = query.getDataSource(); + if (!(dataSource instanceof TableDataSource)) { + throw new UnsupportedOperationException("data source type '" + dataSource.getClass().getName() + "' unsupported"); + } + + String dataSourceName; + try { + dataSourceName = ((TableDataSource) query.getDataSource()).getName(); + } catch (ClassCastException e) { + throw new UnsupportedOperationException("Subqueries are only supported in the broker"); + } + return dataSourceName; + } + + private class FireChief extends Thread implements Closeable { private final FireDepartment fireDepartment; @@ -152,8 +172,7 @@ public class RealtimeManager implements QuerySegmentWalker log.info("Someone get us a plumber!"); plumber = fireDepartment.findPlumber(); log.info("We have our plumber!"); - } - catch (IOException e) { + } catch (IOException e) { throw Throwables.propagate(e); } } @@ -180,8 +199,7 @@ public class RealtimeManager implements QuerySegmentWalker try { try { inputRow = firehose.nextRow(); - } - catch (Exception e) { + } catch (Exception e) { log.debug(e, "thrown away line due to exception, considering unparseable"); metrics.incrementUnparseable(); continue; @@ -206,8 +224,7 @@ public class RealtimeManager implements QuerySegmentWalker nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis(); } metrics.incrementProcessed(); - } - catch (FormattedException e) { + } catch (FormattedException e) { log.info(e, "unparseable line: %s", e.getDetails()); metrics.incrementUnparseable(); continue; @@ -215,16 +232,15 @@ public class RealtimeManager implements QuerySegmentWalker } } catch (RuntimeException e) { log.makeAlert(e, "RuntimeException aborted realtime processing[%s]", fireDepartment.getSchema().getDataSource()) - .emit(); + .emit(); normalExit = false; throw e; } catch (Error e) { log.makeAlert(e, "Exception aborted realtime processing[%s]", fireDepartment.getSchema().getDataSource()) - .emit(); + .emit(); normalExit = false; throw e; - } - finally { + } finally { Closeables.closeQuietly(firehose); if (normalExit) { plumber.finishJob(); diff --git a/server/src/main/java/io/druid/server/QueryResource.java b/server/src/main/java/io/druid/server/QueryResource.java index 97bca835ddb..051a56e4465 100644 --- a/server/src/main/java/io/druid/server/QueryResource.java +++ b/server/src/main/java/io/druid/server/QueryResource.java @@ -131,7 +131,7 @@ public class QueryResource emitter.emit( new ServiceMetricEvent.Builder() - .setUser2(query.getDataSource()) + .setUser2(query.getDataSource().toString()) .setUser4(query.getType()) .setUser5(query.getIntervals().get(0).toString()) .setUser6(String.valueOf(query.hasFilters())) diff --git a/server/src/main/java/io/druid/server/coordination/ServerManager.java b/server/src/main/java/io/druid/server/coordination/ServerManager.java index 950be651e86..3ee0c108f8a 100644 --- a/server/src/main/java/io/druid/server/coordination/ServerManager.java +++ b/server/src/main/java/io/druid/server/coordination/ServerManager.java @@ -31,6 +31,7 @@ import com.metamx.emitter.service.ServiceMetricEvent; import io.druid.collections.CountingMap; import io.druid.guice.annotations.Processing; import io.druid.query.BySegmentQueryRunner; +import io.druid.query.DataSource; import io.druid.query.FinalizeResultsQueryRunner; import io.druid.query.MetricsEmittingQueryRunner; import io.druid.query.NoopQueryRunner; @@ -42,6 +43,7 @@ import io.druid.query.QuerySegmentWalker; import io.druid.query.QueryToolChest; import io.druid.query.ReferenceCountingSegmentQueryRunner; import io.druid.query.SegmentDescriptor; +import io.druid.query.TableDataSource; import io.druid.query.spec.QuerySegmentSpec; import io.druid.query.spec.SpecificSegmentQueryRunner; import io.druid.query.spec.SpecificSegmentSpec; @@ -118,6 +120,7 @@ public class ServerManager implements QuerySegmentWalker /** * Load a single segment. + * * @param segment segment to load * @return true if the segment was newly loaded, false if it was already loaded * @throws SegmentLoadingException if the segment cannot be loaded @@ -127,12 +130,10 @@ public class ServerManager implements QuerySegmentWalker final Segment adapter; try { adapter = segmentLoader.getSegment(segment); - } - catch (SegmentLoadingException e) { + } catch (SegmentLoadingException e) { try { segmentLoader.cleanup(segment); - } - catch (SegmentLoadingException e1) { + } catch (SegmentLoadingException e1) { // ignore } throw e; @@ -204,12 +205,11 @@ public class ServerManager implements QuerySegmentWalker try { log.info("Attempting to close segment %s", segment.getIdentifier()); oldQueryable.close(); - } - catch (IOException e) { + } catch (IOException e) { log.makeAlert(e, "Exception closing segment") - .addData("dataSource", dataSource) - .addData("segmentId", segment.getIdentifier()) - .emit(); + .addData("dataSource", dataSource) + .addData("segmentId", segment.getIdentifier()) + .emit(); } } else { log.info( @@ -233,7 +233,19 @@ public class ServerManager implements QuerySegmentWalker final QueryToolChest> toolChest = factory.getToolchest(); - final VersionedIntervalTimeline timeline = dataSources.get(query.getDataSource()); + DataSource dataSource = query.getDataSource(); + if (!(dataSource instanceof TableDataSource)) { + throw new UnsupportedOperationException("data source type '" + dataSource.getClass().getName() + "' unsupported"); + } + + String dataSourceName; + try { + dataSourceName = ((TableDataSource) query.getDataSource()).getName(); + } catch (ClassCastException e) { + throw new UnsupportedOperationException("Subqueries are only supported in the broker"); + } + + final VersionedIntervalTimeline timeline = dataSources.get(dataSourceName); if (timeline == null) { return new NoopQueryRunner(); @@ -294,6 +306,7 @@ public class ServerManager implements QuerySegmentWalker Predicates.>notNull() ); + return new FinalizeResultsQueryRunner(toolChest.mergeResults(factory.mergeRunners(exec, adapters)), toolChest); } @@ -303,14 +316,21 @@ public class ServerManager implements QuerySegmentWalker final QueryRunnerFactory> factory = conglomerate.findFactory(query); if (factory == null) { log.makeAlert("Unknown query type, [%s]", query.getClass()) - .addData("dataSource", query.getDataSource()) - .emit(); + .addData("dataSource", query.getDataSource()) + .emit(); return new NoopQueryRunner(); } final QueryToolChest> toolChest = factory.getToolchest(); - final VersionedIntervalTimeline timeline = dataSources.get(query.getDataSource()); + String dataSourceName; + try { + dataSourceName = ((TableDataSource) query.getDataSource()).getName(); + } catch (ClassCastException e) { + throw new UnsupportedOperationException("Subqueries are only supported in the broker"); + } + + final VersionedIntervalTimeline timeline = dataSources.get(dataSourceName); if (timeline == null) { return new NoopQueryRunner(); diff --git a/server/src/test/java/io/druid/client/CachingClusteredClientTest.java b/server/src/test/java/io/druid/client/CachingClusteredClientTest.java index 78cfa370df1..40045ec8fa7 100644 --- a/server/src/test/java/io/druid/client/CachingClusteredClientTest.java +++ b/server/src/test/java/io/druid/client/CachingClusteredClientTest.java @@ -35,11 +35,6 @@ import com.metamx.common.guava.MergeIterable; import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; import com.metamx.common.guava.nary.TrinaryFn; -import io.druid.query.topn.TopNQuery; -import io.druid.query.topn.TopNQueryBuilder; -import io.druid.query.topn.TopNQueryConfig; -import io.druid.query.topn.TopNQueryQueryToolChest; -import io.druid.query.topn.TopNResultValue; import io.druid.client.cache.Cache; import io.druid.client.cache.MapCache; import io.druid.client.selector.QueryableDruidServer; @@ -49,6 +44,7 @@ import io.druid.granularity.PeriodGranularity; import io.druid.granularity.QueryGranularity; import io.druid.jackson.DefaultObjectMapper; import io.druid.query.BySegmentResultValueClass; +import io.druid.query.DataSource; import io.druid.query.Druids; import io.druid.query.MapQueryToolChestWarehouse; import io.druid.query.Query; @@ -57,6 +53,7 @@ import io.druid.query.QueryRunner; import io.druid.query.QueryToolChest; import io.druid.query.Result; import io.druid.query.SegmentDescriptor; +import io.druid.query.TableDataSource; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory; @@ -76,6 +73,11 @@ import io.druid.query.timeboundary.TimeBoundaryResultValue; import io.druid.query.timeseries.TimeseriesQuery; import io.druid.query.timeseries.TimeseriesQueryQueryToolChest; import io.druid.query.timeseries.TimeseriesResultValue; +import io.druid.query.topn.TopNQuery; +import io.druid.query.topn.TopNQueryBuilder; +import io.druid.query.topn.TopNQueryConfig; +import io.druid.query.topn.TopNQueryQueryToolChest; +import io.druid.query.topn.TopNResultValue; import io.druid.segment.TestHelper; import io.druid.timeline.DataSegment; import io.druid.timeline.VersionedIntervalTimeline; @@ -211,13 +213,13 @@ public class CachingClusteredClientTest public void testTimeseriesCaching() throws Exception { final Druids.TimeseriesQueryBuilder builder = Druids.newTimeseriesQueryBuilder() - .dataSource(DATA_SOURCE) - .intervals(SEG_SPEC) - .filters(DIM_FILTER) - .granularity(GRANULARITY) - .aggregators(AGGS) - .postAggregators(POST_AGGS) - .context(CONTEXT); + .dataSource(DATA_SOURCE) + .intervals(SEG_SPEC) + .filters(DIM_FILTER) + .granularity(GRANULARITY) + .aggregators(AGGS) + .postAggregators(POST_AGGS) + .context(CONTEXT); testQueryCaching( builder.build(), @@ -262,9 +264,9 @@ public class CachingClusteredClientTest ), client.run( builder.intervals("2011-01-01/2011-01-10") - .aggregators(RENAMED_AGGS) - .postAggregators(RENAMED_POST_AGGS) - .build() + .aggregators(RENAMED_AGGS) + .postAggregators(RENAMED_POST_AGGS) + .build() ) ); } @@ -274,13 +276,13 @@ public class CachingClusteredClientTest public void testTimeseriesCachingTimeZone() throws Exception { final Druids.TimeseriesQueryBuilder builder = Druids.newTimeseriesQueryBuilder() - .dataSource(DATA_SOURCE) - .intervals(SEG_SPEC) - .filters(DIM_FILTER) - .granularity(PT1H_TZ_GRANULARITY) - .aggregators(AGGS) - .postAggregators(POST_AGGS) - .context(CONTEXT); + .dataSource(DATA_SOURCE) + .intervals(SEG_SPEC) + .filters(DIM_FILTER) + .granularity(PT1H_TZ_GRANULARITY) + .aggregators(AGGS) + .postAggregators(POST_AGGS) + .context(CONTEXT); testQueryCaching( builder.build(), @@ -302,9 +304,9 @@ public class CachingClusteredClientTest ), client.run( builder.intervals("2011-11-04/2011-11-08") - .aggregators(RENAMED_AGGS) - .postAggregators(RENAMED_POST_AGGS) - .build() + .aggregators(RENAMED_AGGS) + .postAggregators(RENAMED_POST_AGGS) + .build() ) ); } @@ -313,18 +315,18 @@ public class CachingClusteredClientTest public void testDisableUseCache() throws Exception { final Druids.TimeseriesQueryBuilder builder = Druids.newTimeseriesQueryBuilder() - .dataSource(DATA_SOURCE) - .intervals(SEG_SPEC) - .filters(DIM_FILTER) - .granularity(GRANULARITY) - .aggregators(AGGS) - .postAggregators(POST_AGGS); + .dataSource(DATA_SOURCE) + .intervals(SEG_SPEC) + .filters(DIM_FILTER) + .granularity(GRANULARITY) + .aggregators(AGGS) + .postAggregators(POST_AGGS); testQueryCaching( 1, true, builder.context(ImmutableMap.of("useCache", "false", - "populateCache", "true")).build(), + "populateCache", "true")).build(), new Interval("2011-01-01/2011-01-02"), makeTimeResults(new DateTime("2011-01-01"), 50, 5000) ); @@ -338,7 +340,7 @@ public class CachingClusteredClientTest 1, false, builder.context(ImmutableMap.of("useCache", "false", - "populateCache", "false")).build(), + "populateCache", "false")).build(), new Interval("2011-01-01/2011-01-02"), makeTimeResults(new DateTime("2011-01-01"), 50, 5000) ); @@ -350,7 +352,7 @@ public class CachingClusteredClientTest 1, false, builder.context(ImmutableMap.of("useCache", "true", - "populateCache", "false")).build(), + "populateCache", "false")).build(), new Interval("2011-01-01/2011-01-02"), makeTimeResults(new DateTime("2011-01-01"), 50, 5000) ); @@ -419,10 +421,10 @@ public class CachingClusteredClientTest ), client.run( builder.intervals("2011-01-01/2011-01-10") - .metric("imps") - .aggregators(RENAMED_AGGS) - .postAggregators(RENAMED_POST_AGGS) - .build() + .metric("imps") + .aggregators(RENAMED_AGGS) + .postAggregators(RENAMED_POST_AGGS) + .build() ) ); } @@ -464,10 +466,10 @@ public class CachingClusteredClientTest ), client.run( builder.intervals("2011-11-04/2011-11-08") - .metric("imps") - .aggregators(RENAMED_AGGS) - .postAggregators(RENAMED_POST_AGGS) - .build() + .metric("imps") + .aggregators(RENAMED_AGGS) + .postAggregators(RENAMED_POST_AGGS) + .build() ) ); } @@ -530,10 +532,10 @@ public class CachingClusteredClientTest ), client.run( builder.intervals("2011-01-01/2011-01-10") - .metric("imps") - .aggregators(RENAMED_AGGS) - .postAggregators(RENAMED_POST_AGGS) - .build() + .metric("imps") + .aggregators(RENAMED_AGGS) + .postAggregators(RENAMED_POST_AGGS) + .build() ) ); } @@ -543,7 +545,7 @@ public class CachingClusteredClientTest { testQueryCaching( new SearchQuery( - DATA_SOURCE, + new TableDataSource(DATA_SOURCE), DIM_FILTER, GRANULARITY, 1000, @@ -579,7 +581,8 @@ public class CachingClusteredClientTest ); } - public void testQueryCaching(final Query query, Object... args) { + public void testQueryCaching(final Query query, Object... args) + { testQueryCaching(3, true, query, args); } @@ -634,8 +637,8 @@ public class CachingClusteredClientTest EasyMock.expect(serverView.getQueryRunner(server)) - .andReturn(expectations.getQueryRunner()) - .once(); + .andReturn(expectations.getQueryRunner()) + .once(); final Capture capture = new Capture(); queryCaptures.add(capture); @@ -652,8 +655,8 @@ public class CachingClusteredClientTest } EasyMock.expect(queryable.run(EasyMock.capture(capture))) - .andReturn(toQueryableTimeseriesResults(expectBySegment, segmentIds, intervals, results)) - .once(); + .andReturn(toQueryableTimeseriesResults(expectBySegment, segmentIds, intervals, results)) + .once(); } else if (query instanceof TopNQuery) { List segmentIds = Lists.newArrayList(); @@ -665,8 +668,8 @@ public class CachingClusteredClientTest results.add(expectation.getResults()); } EasyMock.expect(queryable.run(EasyMock.capture(capture))) - .andReturn(toQueryableTopNResults(segmentIds, intervals, results)) - .once(); + .andReturn(toQueryableTopNResults(segmentIds, intervals, results)) + .once(); } else if (query instanceof SearchQuery) { List segmentIds = Lists.newArrayList(); List intervals = Lists.newArrayList(); @@ -677,8 +680,8 @@ public class CachingClusteredClientTest results.add(expectation.getResults()); } EasyMock.expect(queryable.run(EasyMock.capture(capture))) - .andReturn(toQueryableSearchResults(segmentIds, intervals, results)) - .once(); + .andReturn(toQueryableSearchResults(segmentIds, intervals, results)) + .once(); } else if (query instanceof TimeBoundaryQuery) { List segmentIds = Lists.newArrayList(); List intervals = Lists.newArrayList(); @@ -689,8 +692,8 @@ public class CachingClusteredClientTest results.add(expectation.getResults()); } EasyMock.expect(queryable.run(EasyMock.capture(capture))) - .andReturn(toQueryableTimeBoundaryResults(segmentIds, intervals, results)) - .once(); + .andReturn(toQueryableTimeBoundaryResults(segmentIds, intervals, results)) + .once(); } else { throw new ISE("Unknown query type[%s]", query.getClass()); } @@ -757,13 +760,12 @@ public class CachingClusteredClientTest // make sure all the queries were sent down as 'bySegment' for (Capture queryCapture : queryCaptures) { Query capturedQuery = (Query) queryCapture.getValue(); - if(expectBySegment) { + if (expectBySegment) { Assert.assertEquals("true", capturedQuery.getContextValue("bySegment")); - } - else { + } else { Assert.assertTrue( capturedQuery.getContextValue("bySegment") == null || - capturedQuery.getContextValue("bySegment").equals("false") + capturedQuery.getContextValue("bySegment").equals("false") ); } } @@ -818,7 +820,8 @@ public class CachingClusteredClientTest } timeline.add(queryIntervals.get(k), String.valueOf(k), chunk); } - } return serverExpectationList; + } + return serverExpectationList; } private Sequence> toQueryableTimeseriesResults( @@ -828,35 +831,35 @@ public class CachingClusteredClientTest Iterable>> results ) { - if(bySegment) { - return Sequences.simple( - FunctionalIterable - .create(segmentIds) - .trinaryTransform( - intervals, - results, - new TrinaryFn>, Result>() - { - @Override - @SuppressWarnings("unchecked") - public Result apply( - final String segmentId, - final Interval interval, - final Iterable> results - ) + if (bySegment) { + return Sequences.simple( + FunctionalIterable + .create(segmentIds) + .trinaryTransform( + intervals, + results, + new TrinaryFn>, Result>() { - return new Result( - results.iterator().next().getTimestamp(), - new BySegmentResultValueClass( - Lists.newArrayList(results), - segmentId, - interval - ) - ); + @Override + @SuppressWarnings("unchecked") + public Result apply( + final String segmentId, + final Interval interval, + final Iterable> results + ) + { + return new Result( + results.iterator().next().getTimestamp(), + new BySegmentResultValueClass( + Lists.newArrayList(results), + segmentId, + interval + ) + ); + } } - } - ) - ); + ) + ); } else { return Sequences.simple(Iterables.concat(results)); } @@ -994,36 +997,36 @@ public class CachingClusteredClientTest } private Iterable> makeBySegmentTimeResults - (Object... objects) - { - if (objects.length % 5 != 0) { - throw new ISE("makeTimeResults must be passed arguments in groups of 5, got[%d]", objects.length); - } - - List> retVal = Lists.newArrayListWithCapacity(objects.length / 5); - for (int i = 0; i < objects.length; i += 5) { - retVal.add( - new BySegmentResultValueClass( - Lists.newArrayList( - new TimeseriesResultValue( - ImmutableMap.of( - "rows", objects[i + 1], - "imps", objects[i + 2], - "impers", objects[i + 2], - "avg_imps_per_row", - ((Number) objects[i + 2]).doubleValue() / ((Number) objects[i + 1]).doubleValue() - ) - ) - ), - (String)objects[i+3], - (Interval)objects[i+4] - - ) - ); - } - return retVal; + (Object... objects) + { + if (objects.length % 5 != 0) { + throw new ISE("makeTimeResults must be passed arguments in groups of 5, got[%d]", objects.length); } + List> retVal = Lists.newArrayListWithCapacity(objects.length / 5); + for (int i = 0; i < objects.length; i += 5) { + retVal.add( + new BySegmentResultValueClass( + Lists.newArrayList( + new TimeseriesResultValue( + ImmutableMap.of( + "rows", objects[i + 1], + "imps", objects[i + 2], + "impers", objects[i + 2], + "avg_imps_per_row", + ((Number) objects[i + 2]).doubleValue() / ((Number) objects[i + 1]).doubleValue() + ) + ) + ), + (String) objects[i + 3], + (Interval) objects[i + 4] + + ) + ); + } + return retVal; + } + private Iterable> makeRenamedTimeResults (Object... objects) { @@ -1156,13 +1159,13 @@ public class CachingClusteredClientTest return new CachingClusteredClient( new MapQueryToolChestWarehouse( ImmutableMap., QueryToolChest>builder() - .put( - TimeseriesQuery.class, - new TimeseriesQueryQueryToolChest(new QueryConfig()) - ) - .put(TopNQuery.class, new TopNQueryQueryToolChest(new TopNQueryConfig())) - .put(SearchQuery.class, new SearchQueryQueryToolChest(new SearchQueryConfig())) - .build() + .put( + TimeseriesQuery.class, + new TimeseriesQueryQueryToolChest(new QueryConfig()) + ) + .put(TopNQuery.class, new TopNQueryQueryToolChest(new TopNQueryConfig())) + .put(SearchQuery.class, new SearchQueryQueryToolChest(new SearchQueryConfig())) + .build() ), new TimelineServerView() { @@ -1172,7 +1175,7 @@ public class CachingClusteredClientTest } @Override - public VersionedIntervalTimeline getTimeline(String dataSource) + public VersionedIntervalTimeline getTimeline(DataSource dataSource) { return timeline; }