From 728a606d32ba31939e6a4d20b47ea72b2d2de49f Mon Sep 17 00:00:00 2001 From: nishantmonu51 Date: Thu, 1 May 2014 01:54:52 +0530 Subject: [PATCH] Add support for union queries --- .../overlord/ThreadPoolTaskRunner.java | 9 +- .../main/java/io/druid/query/DataSource.java | 9 +- .../src/main/java/io/druid/query/Druids.java | 2 +- .../src/main/java/io/druid/query/Query.java | 2 + .../java/io/druid/query/QueryDataSource.java | 13 ++- .../java/io/druid/query/TableDataSource.java | 15 ++- .../java/io/druid/query/UnionDataSource.java | 101 ++++++++++++++++++ .../java/io/druid/query/UnionQueryRunner.java | 65 +++++++++++ .../io/druid/query/groupby/GroupByQuery.java | 18 ++++ .../groupby/GroupByQueryQueryToolChest.java | 10 +- .../SegmentMetadataQueryQueryToolChest.java | 2 +- .../metadata/SegmentMetadataQuery.java | 11 ++ .../search/SearchQueryQueryToolChest.java | 7 +- .../query/search/search/SearchQuery.java | 16 +++ .../io/druid/query/select/SelectQuery.java | 15 +++ .../select/SelectQueryQueryToolChest.java | 10 +- .../query/timeboundary/TimeBoundaryQuery.java | 10 ++ .../TimeBoundaryQueryQueryToolChest.java | 2 +- .../query/timeseries/TimeseriesQuery.java | 14 +++ .../TimeseriesQueryQueryToolChest.java | 10 +- .../java/io/druid/query/topn/TopNQuery.java | 18 ++++ .../query/topn/TopNQueryQueryToolChest.java | 11 +- .../java/io/druid/query/DataSourceTest.java | 25 +++++ .../metadata/SegmentMetadataQueryTest.java | 3 +- .../io/druid/client/BrokerServerView.java | 13 +-- .../segment/realtime/RealtimeManager.java | 14 +-- .../server/AsyncQueryForwardingServlet.java | 2 +- .../java/io/druid/server/QueryResource.java | 2 +- .../server/coordination/ServerManager.java | 29 +++-- .../router/TieredBrokerHostSelector.java | 3 +- 30 files changed, 393 insertions(+), 68 deletions(-) create mode 100644 processing/src/main/java/io/druid/query/UnionDataSource.java create mode 100644 processing/src/main/java/io/druid/query/UnionQueryRunner.java diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java index 0d69ed5036c..4f13daf3f5b 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java @@ -22,6 +22,7 @@ package io.druid.indexing.overlord; import com.google.api.client.repackaged.com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; @@ -153,13 +154,7 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker private QueryRunner getQueryRunnerImpl(Query query) { QueryRunner queryRunner = null; - String queryDataSource; - try { - queryDataSource = ((TableDataSource)query.getDataSource()).getName(); - } - catch (ClassCastException e) { - throw new IllegalArgumentException("Subqueries are not welcome here"); - } + final String queryDataSource = Iterables.getOnlyElement(query.getDataSource().getNames()); for (final ThreadPoolTaskRunnerWorkItem taskRunnerWorkItem : ImmutableList.copyOf(runningItems)) { final Task task = taskRunnerWorkItem.getTask(); diff --git a/processing/src/main/java/io/druid/query/DataSource.java b/processing/src/main/java/io/druid/query/DataSource.java index a4ef603da1f..436e2e0de02 100644 --- a/processing/src/main/java/io/druid/query/DataSource.java +++ b/processing/src/main/java/io/druid/query/DataSource.java @@ -24,15 +24,20 @@ package io.druid.query; import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; +import java.util.List; + @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "type", defaultImpl = LegacyDataSource.class) @JsonSubTypes({ @JsonSubTypes.Type(value = TableDataSource.class, name = "table"), - @JsonSubTypes.Type(value = QueryDataSource.class, name = "query") + @JsonSubTypes.Type(value = QueryDataSource.class, name = "query"), + @JsonSubTypes.Type(value = UnionDataSource.class, name = "union") }) public interface DataSource { - public String getName(); + public Iterable getNames(); + + public String getMetricName(); } diff --git a/processing/src/main/java/io/druid/query/Druids.java b/processing/src/main/java/io/druid/query/Druids.java index 3ab6b0a8ff7..87676414ab0 100644 --- a/processing/src/main/java/io/druid/query/Druids.java +++ b/processing/src/main/java/io/druid/query/Druids.java @@ -537,7 +537,7 @@ public class Druids public SearchQueryBuilder copy(SearchQuery query) { return new SearchQueryBuilder() - .dataSource(((TableDataSource)query.getDataSource()).getName()) + .dataSource(query.getDataSource()) .intervals(query.getQuerySegmentSpec()) .filters(query.getDimensionsFilter()) .granularity(query.getGranularity()) diff --git a/processing/src/main/java/io/druid/query/Query.java b/processing/src/main/java/io/druid/query/Query.java index 9b9c9e373f9..04c581152ad 100644 --- a/processing/src/main/java/io/druid/query/Query.java +++ b/processing/src/main/java/io/druid/query/Query.java @@ -88,4 +88,6 @@ public interface Query public Query withId(String id); public String getId(); + + Query withDataSource(DataSource dataSource); } diff --git a/processing/src/main/java/io/druid/query/QueryDataSource.java b/processing/src/main/java/io/druid/query/QueryDataSource.java index 3f0c397f6d4..de3ff3982cb 100644 --- a/processing/src/main/java/io/druid/query/QueryDataSource.java +++ b/processing/src/main/java/io/druid/query/QueryDataSource.java @@ -24,6 +24,9 @@ package io.druid.query; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.collect.Lists; + +import java.util.List; @JsonTypeName("query") public class QueryDataSource implements DataSource @@ -38,9 +41,15 @@ public class QueryDataSource implements DataSource } @Override - public String getName() + public Iterable getNames() { - return query.getDataSource().getName(); + return query.getDataSource().getNames(); + } + + @Override + public String getMetricName() + { + return query.getDataSource().getMetricName(); } @JsonProperty diff --git a/processing/src/main/java/io/druid/query/TableDataSource.java b/processing/src/main/java/io/druid/query/TableDataSource.java index b658454cbc1..047295733ec 100644 --- a/processing/src/main/java/io/druid/query/TableDataSource.java +++ b/processing/src/main/java/io/druid/query/TableDataSource.java @@ -23,6 +23,9 @@ package io.druid.query; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.collect.Lists; + +import java.util.List; @JsonTypeName("table") public class TableDataSource implements DataSource @@ -37,8 +40,18 @@ public class TableDataSource implements DataSource } @JsonProperty + public String getName(){ + return name; + } + @Override - public String getName() + public List getNames() + { + return Lists.newArrayList(name); + } + + @Override + public String getMetricName() { return name; } diff --git a/processing/src/main/java/io/druid/query/UnionDataSource.java b/processing/src/main/java/io/druid/query/UnionDataSource.java new file mode 100644 index 00000000000..a6034aca7b9 --- /dev/null +++ b/processing/src/main/java/io/druid/query/UnionDataSource.java @@ -0,0 +1,101 @@ +/* + * Druid - a distributed column store. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * + * This file Copyright (C) 2014 N3TWORK, Inc. and contributed to the Druid project + * under the Druid Corporate Contributor License Agreement. + */ + +package io.druid.query; + + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; + +import javax.annotation.Nullable; +import java.util.List; +import java.util.SortedSet; +import java.util.TreeSet; + +public class UnionDataSource implements DataSource +{ + @JsonProperty + private final List dataSources; + + @JsonCreator + public UnionDataSource(@JsonProperty("dataSources") List dataSources) + { + Preconditions.checkNotNull(dataSources, "datasources cannot be null for uniondatasource"); + this.dataSources = dataSources; + } + + @Override + public Iterable getNames() + { + return Iterables.concat(Iterables.transform(dataSources, new Function>() + { + @Override + public Iterable apply(DataSource input) + { + return input.getNames(); + } + })); + } + + @Override + public String getMetricName() + { + SortedSet str = new TreeSet<>(); + for(DataSource ds : dataSources){ + str.add(ds.getMetricName()); + } + return str.toString(); + } + + @JsonProperty + public List getDataSources(){ + return dataSources; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + UnionDataSource that = (UnionDataSource) o; + + if (!dataSources.equals(that.dataSources)) { + return false; + } + + return true; + } + + @Override + public int hashCode() + { + return dataSources.hashCode(); + } +} diff --git a/processing/src/main/java/io/druid/query/UnionQueryRunner.java b/processing/src/main/java/io/druid/query/UnionQueryRunner.java new file mode 100644 index 00000000000..fc1534ebc92 --- /dev/null +++ b/processing/src/main/java/io/druid/query/UnionQueryRunner.java @@ -0,0 +1,65 @@ +/* + * Druid - a distributed column store. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * + * This file Copyright (C) 2014 N3TWORK, Inc. and contributed to the Druid project + * under the Druid Corporate Contributor License Agreement. + */ + +package io.druid.query; + +import com.google.common.base.Function; +import com.google.common.collect.Iterables; +import com.metamx.common.guava.Sequence; +import com.metamx.common.guava.Sequences; + +public class UnionQueryRunner implements QueryRunner +{ + private final QueryRunner baseRunner; + + public UnionQueryRunner(QueryRunner baseRunner) + { + this.baseRunner = baseRunner; + } + + @Override + public Sequence run(final Query query) + { + DataSource dataSource = query.getDataSource(); + if (dataSource instanceof UnionDataSource) { + System.out.println("Breaking into Single Source" + dataSource.getMetricName()); + return Sequences.concat( + Iterables.transform( + ((UnionDataSource) dataSource).getDataSources(), + new Function>() + { + @Override + public Sequence apply(DataSource singleSource) + { + System.out.println("Running with Single Source" + singleSource.getNames()); + return baseRunner.run( + query.withDataSource(singleSource) + ); + } + } + ) + ); + } else { + return baseRunner.run(query); + } + } + +} diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java b/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java index 3f04f30b8aa..0a23b5704b7 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java @@ -257,6 +257,24 @@ public class GroupByQuery extends BaseQuery ); } + @Override + public Query withDataSource(DataSource dataSource) + { + return new GroupByQuery( + dataSource, + getQuerySegmentSpec(), + dimFilter, + granularity, + dimensions, + aggregatorSpecs, + postAggregatorSpecs, + havingSpec, + limitSpec, + orderByLimitFn, + getContext() + ); + } + public static class Builder { private DataSource dataSource; diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java index ab70b34db05..ef34b3b81e1 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -41,6 +41,7 @@ import io.druid.query.QueryDataSource; import io.druid.query.QueryRunner; import io.druid.query.QueryToolChest; import io.druid.query.SubqueryQueryRunner; +import io.druid.query.UnionQueryRunner; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.MetricManipulationFn; import io.druid.segment.incremental.IncrementalIndex; @@ -163,7 +164,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest preMergeQueryDecoration(QueryRunner runner) { - return new SubqueryQueryRunner( - new IntervalChunkingQueryRunner(runner, configSupplier.get().getChunkPeriod())); + return new UnionQueryRunner( + new SubqueryQueryRunner( + new IntervalChunkingQueryRunner(runner, configSupplier.get().getChunkPeriod()) + ) + ); } } diff --git a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java index 98eb896476d..ee8610eb57e 100644 --- a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java @@ -147,7 +147,7 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest spec, toInclude, merge, getContext()); } + @Override + public Query withDataSource(DataSource dataSource) + { + return new SegmentMetadataQuery( + dataSource, + getQuerySegmentSpec(), + toInclude, + merge, + getContext()); + } + @Override public boolean equals(Object o) { diff --git a/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java index a8429ad2726..2a1b6500aaa 100644 --- a/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java @@ -45,6 +45,7 @@ import io.druid.query.QueryToolChest; import io.druid.query.Result; import io.druid.query.ResultGranularTimestampComparator; import io.druid.query.ResultMergeQueryRunner; +import io.druid.query.UnionQueryRunner; import io.druid.query.aggregation.MetricManipulationFn; import io.druid.query.filter.DimFilter; import io.druid.query.search.search.SearchHit; @@ -121,7 +122,7 @@ public class SearchQueryQueryToolChest extends QueryToolChest> preMergeQueryDecoration(QueryRunner> runner) { return new SearchThresholdAdjustingQueryRunner( - new IntervalChunkingQueryRunner>(runner, config.getChunkPeriod()), + new UnionQueryRunner>( + new IntervalChunkingQueryRunner>(runner, config.getChunkPeriod()) + ), config ); } diff --git a/processing/src/main/java/io/druid/query/search/search/SearchQuery.java b/processing/src/main/java/io/druid/query/search/search/SearchQuery.java index 7c2901ea997..19536cd9f24 100644 --- a/processing/src/main/java/io/druid/query/search/search/SearchQuery.java +++ b/processing/src/main/java/io/druid/query/search/search/SearchQuery.java @@ -111,6 +111,22 @@ public class SearchQuery extends BaseQuery> ); } + @Override + public Query> withDataSource(DataSource dataSource) + { + return new SearchQuery( + dataSource, + dimFilter, + granularity, + limit, + getQuerySegmentSpec(), + dimensions, + querySpec, + sortSpec, + getContext() + ); + } + @Override public SearchQuery withOverriddenContext(Map contextOverrides) { diff --git a/processing/src/main/java/io/druid/query/select/SelectQuery.java b/processing/src/main/java/io/druid/query/select/SelectQuery.java index 7556006734f..16ae163e693 100644 --- a/processing/src/main/java/io/druid/query/select/SelectQuery.java +++ b/processing/src/main/java/io/druid/query/select/SelectQuery.java @@ -120,6 +120,21 @@ public class SelectQuery extends BaseQuery> ); } + @Override + public Query> withDataSource(DataSource dataSource) + { + return new SelectQuery( + dataSource, + getQuerySegmentSpec(), + dimFilter, + granularity, + dimensions, + metrics, + pagingSpec, + getContext() + ); + } + public SelectQuery withOverriddenContext(Map contextOverrides) { return new SelectQuery( diff --git a/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java index f3bbe028ed8..d40cb48af0d 100644 --- a/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java @@ -42,6 +42,7 @@ import io.druid.query.QueryToolChest; import io.druid.query.Result; import io.druid.query.ResultGranularTimestampComparator; import io.druid.query.ResultMergeQueryRunner; +import io.druid.query.UnionQueryRunner; import io.druid.query.aggregation.MetricManipulationFn; import io.druid.query.filter.DimFilter; import org.joda.time.DateTime; @@ -123,7 +124,7 @@ public class SelectQueryQueryToolChest extends QueryToolChest> preMergeQueryDecoration(QueryRunner> runner) { - return new IntervalChunkingQueryRunner>(runner, config.getChunkPeriod()); + return new UnionQueryRunner>( + new IntervalChunkingQueryRunner>( + runner, + config.getChunkPeriod() + ) + ); } public Ordering> getOrdering() diff --git a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQuery.java b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQuery.java index 357854f1958..6f3e70b9851 100644 --- a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQuery.java +++ b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQuery.java @@ -97,6 +97,16 @@ public class TimeBoundaryQuery extends BaseQuery ); } + @Override + public Query> withDataSource(DataSource dataSource) + { + return new TimeBoundaryQuery( + dataSource, + getQuerySegmentSpec(), + getContext() + ); + } + public byte[] getCacheKey() { return ByteBuffer.allocate(1) diff --git a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java index 7186b1a6e38..bfba85353b3 100644 --- a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java @@ -117,7 +117,7 @@ public class TimeBoundaryQueryQueryToolChest public ServiceMetricEvent.Builder makeMetricBuilder(TimeBoundaryQuery query) { return new ServiceMetricEvent.Builder() - .setUser2(query.getDataSource().toString()) + .setUser2(query.getDataSource().getMetricName()) .setUser4(query.getType()) .setUser6("false"); } diff --git a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQuery.java b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQuery.java index 3a03018a63e..96a94227b36 100644 --- a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQuery.java +++ b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQuery.java @@ -116,6 +116,20 @@ public class TimeseriesQuery extends BaseQuery> ); } + @Override + public Query> withDataSource(DataSource dataSource) + { + return new TimeseriesQuery( + dataSource, + getQuerySegmentSpec(), + dimFilter, + granularity, + aggregatorSpecs, + postAggregatorSpecs, + getContext() + ); + } + public TimeseriesQuery withOverriddenContext(Map contextOverrides) { return new TimeseriesQuery( diff --git a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java index 482e92adc3e..3ff92321cae 100644 --- a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java @@ -42,6 +42,7 @@ import io.druid.query.QueryToolChest; import io.druid.query.Result; import io.druid.query.ResultGranularTimestampComparator; import io.druid.query.ResultMergeQueryRunner; +import io.druid.query.UnionQueryRunner; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.MetricManipulationFn; import io.druid.query.aggregation.PostAggregator; @@ -122,7 +123,7 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest> preMergeQueryDecoration(QueryRunner> runner) { - return new IntervalChunkingQueryRunner>(runner, config.getChunkPeriod()); + return new UnionQueryRunner>( + new IntervalChunkingQueryRunner>( + runner, + config.getChunkPeriod() + ) + ); } public Ordering> getOrdering() diff --git a/processing/src/main/java/io/druid/query/topn/TopNQuery.java b/processing/src/main/java/io/druid/query/topn/TopNQuery.java index 0e7a796d045..92e3a18cfa5 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNQuery.java +++ b/processing/src/main/java/io/druid/query/topn/TopNQuery.java @@ -27,6 +27,7 @@ import io.druid.granularity.QueryGranularity; import io.druid.query.BaseQuery; import io.druid.query.DataSource; import io.druid.query.Queries; +import io.druid.query.Query; import io.druid.query.Result; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.PostAggregator; @@ -162,6 +163,23 @@ public class TopNQuery extends BaseQuery> ); } + @Override + public Query> withDataSource(DataSource dataSource) + { + return new TopNQuery( + getDataSource(), + dimensionSpec, + topNMetricSpec, + threshold, + getQuerySegmentSpec(), + dimFilter, + granularity, + aggregatorSpecs, + postAggregatorSpecs, + getContext() + ); + } + public TopNQuery withThreshold(int threshold) { return new TopNQuery( diff --git a/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java index 5db416f1d0b..cbce09c1f9c 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java @@ -45,6 +45,7 @@ import io.druid.query.QueryToolChest; import io.druid.query.Result; import io.druid.query.ResultGranularTimestampComparator; import io.druid.query.ResultMergeQueryRunner; +import io.druid.query.UnionQueryRunner; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorUtil; import io.druid.query.aggregation.MetricManipulationFn; @@ -131,7 +132,7 @@ public class TopNQueryQueryToolChest extends QueryToolChest> preMergeQueryDecoration(QueryRunner> runner) { - return new IntervalChunkingQueryRunner>(runner, config.getChunkPeriod()); + return new UnionQueryRunner>( + new IntervalChunkingQueryRunner>( + runner, + config + .getChunkPeriod() + ) + ); } @Override diff --git a/processing/src/test/java/io/druid/query/DataSourceTest.java b/processing/src/test/java/io/druid/query/DataSourceTest.java index 5819fc49701..be6577138da 100644 --- a/processing/src/test/java/io/druid/query/DataSourceTest.java +++ b/processing/src/test/java/io/druid/query/DataSourceTest.java @@ -22,7 +22,10 @@ package io.druid.query; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Function; +import com.google.common.collect.Iterables; import com.google.common.collect.Lists; +import com.metamx.common.guava.Sequence; import io.druid.jackson.DefaultObjectMapper; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory; @@ -85,4 +88,26 @@ public class DataSourceTest Assert.assertEquals(new QueryDataSource(query), dataSource); } + @Test + public void testUnionDataSource() throws Exception + { + DataSource dataSource = jsonMapper.readValue( + "{\"type\":\"union\", \"dataSources\":[\"ds1\", \"ds2\"]}", + DataSource.class + ); + Assert.assertTrue(dataSource instanceof UnionDataSource); + Assert.assertEquals( + Lists.newArrayList(new TableDataSource("ds1"), new TableDataSource("ds2")), + Lists.newArrayList(((UnionDataSource) dataSource).getDataSources()) + ); + Assert.assertEquals( + Lists.newArrayList("ds1", "ds2"), + Lists.newArrayList(dataSource.getNames()) + ); + Assert.assertEquals(Lists.newArrayList("ds1", "ds2").toString(), dataSource.getMetricName()); + + final DataSource serde = jsonMapper.readValue(jsonMapper.writeValueAsString(dataSource), DataSource.class); + Assert.assertEquals(dataSource, serde); + } + } diff --git a/processing/src/test/java/io/druid/query/metadata/SegmentMetadataQueryTest.java b/processing/src/test/java/io/druid/query/metadata/SegmentMetadataQueryTest.java index 0948ec06a37..6e6dd9ce275 100644 --- a/processing/src/test/java/io/druid/query/metadata/SegmentMetadataQueryTest.java +++ b/processing/src/test/java/io/druid/query/metadata/SegmentMetadataQueryTest.java @@ -20,6 +20,7 @@ package io.druid.query.metadata; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Iterables; import io.druid.jackson.DefaultObjectMapper; import io.druid.query.Query; import io.druid.query.metadata.metadata.SegmentMetadataQuery; @@ -41,7 +42,7 @@ public class SegmentMetadataQueryTest + "}"; Query query = mapper.readValue(queryStr, Query.class); Assert.assertTrue(query instanceof SegmentMetadataQuery); - Assert.assertEquals("test_ds", query.getDataSource().getName()); + Assert.assertEquals("test_ds", Iterables.getOnlyElement(query.getDataSource().getNames())); Assert.assertEquals(new Interval("2013-12-04T00:00:00.000Z/2013-12-05T00:00:00.000Z"), query.getIntervals().get(0)); // test serialize and deserialize diff --git a/server/src/main/java/io/druid/client/BrokerServerView.java b/server/src/main/java/io/druid/client/BrokerServerView.java index be6a6553ae7..597a41a481a 100644 --- a/server/src/main/java/io/druid/client/BrokerServerView.java +++ b/server/src/main/java/io/druid/client/BrokerServerView.java @@ -20,6 +20,7 @@ package io.druid.client; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Iterables; import com.google.common.collect.Maps; import com.google.common.collect.Ordering; import com.google.inject.Inject; @@ -237,17 +238,7 @@ public class BrokerServerView implements TimelineServerView @Override public VersionedIntervalTimeline getTimeline(DataSource dataSource) { - String table; - while (dataSource instanceof QueryDataSource) { - dataSource = ((QueryDataSource) dataSource).getQuery().getDataSource(); - } - - if (dataSource instanceof TableDataSource) { - table = ((TableDataSource) dataSource).getName(); - } else { - throw new UnsupportedOperationException("Unsupported data source type: " + dataSource.getClass().getSimpleName()); - } - + String table = Iterables.getOnlyElement(dataSource.getNames()); synchronized (lock) { return timelines.get(table); } diff --git a/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java b/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java index e54ff4bcb9c..d041406ad71 100644 --- a/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java +++ b/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java @@ -21,6 +21,7 @@ package io.druid.segment.realtime; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; +import com.google.common.collect.Iterables; import com.google.common.collect.Maps; import com.google.common.io.Closeables; import com.google.inject.Inject; @@ -126,18 +127,7 @@ public class RealtimeManager implements QuerySegmentWalker private String getDataSourceName(Query query) { - DataSource dataSource = query.getDataSource(); - if (!(dataSource instanceof TableDataSource)) { - throw new UnsupportedOperationException("data source type '" + dataSource.getClass().getName() + "' unsupported"); - } - - String dataSourceName; - try { - dataSourceName = ((TableDataSource) query.getDataSource()).getName(); - } catch (ClassCastException e) { - throw new UnsupportedOperationException("Subqueries are only supported in the broker"); - } - return dataSourceName; + return Iterables.getOnlyElement(query.getDataSource().getNames()); } diff --git a/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java b/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java index 7147ab9e87c..fcf5d9d8f46 100644 --- a/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java +++ b/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java @@ -282,7 +282,7 @@ public class AsyncQueryForwardingServlet extends HttpServlet emitter.emit( new ServiceMetricEvent.Builder() - .setUser2(theQuery.getDataSource().getName()) + .setUser2(theQuery.getDataSource().getMetricName()) .setUser4(theQuery.getType()) .setUser5(COMMA_JOIN.join(theQuery.getIntervals())) .setUser6(String.valueOf(theQuery.hasFilters())) diff --git a/server/src/main/java/io/druid/server/QueryResource.java b/server/src/main/java/io/druid/server/QueryResource.java index 77d0c1bf04c..02b2c95ac2a 100644 --- a/server/src/main/java/io/druid/server/QueryResource.java +++ b/server/src/main/java/io/druid/server/QueryResource.java @@ -131,7 +131,7 @@ public class QueryResource emitter.emit( new ServiceMetricEvent.Builder() - .setUser2(query.getDataSource().toString()) + .setUser2(query.getDataSource().getMetricName()) .setUser4(query.getType()) .setUser5(COMMA_JOIN.join(query.getIntervals())) .setUser6(String.valueOf(query.hasFilters())) diff --git a/server/src/main/java/io/druid/server/coordination/ServerManager.java b/server/src/main/java/io/druid/server/coordination/ServerManager.java index 2869237a8c7..0676430c3d5 100644 --- a/server/src/main/java/io/druid/server/coordination/ServerManager.java +++ b/server/src/main/java/io/druid/server/coordination/ServerManager.java @@ -21,7 +21,9 @@ package io.druid.server.coordination; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Function; +import com.google.common.base.Preconditions; import com.google.common.base.Predicates; +import com.google.common.collect.Iterables; import com.google.common.collect.Ordering; import com.google.inject.Inject; import com.metamx.common.ISE; @@ -252,14 +254,7 @@ public class ServerManager implements QuerySegmentWalker if (!(dataSource instanceof TableDataSource)) { throw new UnsupportedOperationException("data source type '" + dataSource.getClass().getName() + "' unsupported"); } - - String dataSourceName; - try { - dataSourceName = ((TableDataSource) query.getDataSource()).getName(); - } - catch (ClassCastException e) { - throw new UnsupportedOperationException("Subqueries are only supported in the broker"); - } + String dataSourceName = getDataSourceName(dataSource); final VersionedIntervalTimeline timeline = dataSources.get(dataSourceName); @@ -325,6 +320,16 @@ public class ServerManager implements QuerySegmentWalker return new FinalizeResultsQueryRunner(toolChest.mergeResults(factory.mergeRunners(exec, adapters)), toolChest); } + private String getDataSourceName(DataSource dataSource) + { + Preconditions.checkArgument( + dataSource instanceof TableDataSource, + "Subqueries and Unions are only supported in the broker" + ); + + return Iterables.getOnlyElement(dataSource.getNames()); + } + @Override public QueryRunner getQueryRunnerForSegments(Query query, Iterable specs) { @@ -338,13 +343,7 @@ public class ServerManager implements QuerySegmentWalker final QueryToolChest> toolChest = factory.getToolchest(); - String dataSourceName; - try { - dataSourceName = ((TableDataSource) query.getDataSource()).getName(); - } - catch (ClassCastException e) { - throw new UnsupportedOperationException("Subqueries are only supported in the broker"); - } + String dataSourceName = getDataSourceName(query.getDataSource()); final VersionedIntervalTimeline timeline = dataSources.get(dataSourceName); diff --git a/server/src/main/java/io/druid/server/router/TieredBrokerHostSelector.java b/server/src/main/java/io/druid/server/router/TieredBrokerHostSelector.java index 681acd815b1..8ebe2e55050 100644 --- a/server/src/main/java/io/druid/server/router/TieredBrokerHostSelector.java +++ b/server/src/main/java/io/druid/server/router/TieredBrokerHostSelector.java @@ -137,7 +137,8 @@ public class TieredBrokerHostSelector implements HostSelector } if (brokerServiceName == null) { - List rules = ruleManager.getRulesWithDefault((query.getDataSource()).getName()); + // For Union Queries tier will be selected on the rules for first dataSource. + List rules = ruleManager.getRulesWithDefault(Iterables.getFirst(query.getDataSource().getNames(), null)); // find the rule that can apply to the entire set of intervals DateTime now = new DateTime();