From 728a606d32ba31939e6a4d20b47ea72b2d2de49f Mon Sep 17 00:00:00 2001 From: nishantmonu51 Date: Thu, 1 May 2014 01:54:52 +0530 Subject: [PATCH 1/7] Add support for union queries --- .../overlord/ThreadPoolTaskRunner.java | 9 +- .../main/java/io/druid/query/DataSource.java | 9 +- .../src/main/java/io/druid/query/Druids.java | 2 +- .../src/main/java/io/druid/query/Query.java | 2 + .../java/io/druid/query/QueryDataSource.java | 13 ++- .../java/io/druid/query/TableDataSource.java | 15 ++- .../java/io/druid/query/UnionDataSource.java | 101 ++++++++++++++++++ .../java/io/druid/query/UnionQueryRunner.java | 65 +++++++++++ .../io/druid/query/groupby/GroupByQuery.java | 18 ++++ .../groupby/GroupByQueryQueryToolChest.java | 10 +- .../SegmentMetadataQueryQueryToolChest.java | 2 +- .../metadata/SegmentMetadataQuery.java | 11 ++ .../search/SearchQueryQueryToolChest.java | 7 +- .../query/search/search/SearchQuery.java | 16 +++ .../io/druid/query/select/SelectQuery.java | 15 +++ .../select/SelectQueryQueryToolChest.java | 10 +- .../query/timeboundary/TimeBoundaryQuery.java | 10 ++ .../TimeBoundaryQueryQueryToolChest.java | 2 +- .../query/timeseries/TimeseriesQuery.java | 14 +++ .../TimeseriesQueryQueryToolChest.java | 10 +- .../java/io/druid/query/topn/TopNQuery.java | 18 ++++ .../query/topn/TopNQueryQueryToolChest.java | 11 +- .../java/io/druid/query/DataSourceTest.java | 25 +++++ .../metadata/SegmentMetadataQueryTest.java | 3 +- .../io/druid/client/BrokerServerView.java | 13 +-- .../segment/realtime/RealtimeManager.java | 14 +-- .../server/AsyncQueryForwardingServlet.java | 2 +- .../java/io/druid/server/QueryResource.java | 2 +- .../server/coordination/ServerManager.java | 29 +++-- .../router/TieredBrokerHostSelector.java | 3 +- 30 files changed, 393 insertions(+), 68 deletions(-) create mode 100644 processing/src/main/java/io/druid/query/UnionDataSource.java create mode 100644 processing/src/main/java/io/druid/query/UnionQueryRunner.java diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java index 0d69ed5036c..4f13daf3f5b 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java @@ -22,6 +22,7 @@ package io.druid.indexing.overlord; import com.google.api.client.repackaged.com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; @@ -153,13 +154,7 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker private QueryRunner getQueryRunnerImpl(Query query) { QueryRunner queryRunner = null; - String queryDataSource; - try { - queryDataSource = ((TableDataSource)query.getDataSource()).getName(); - } - catch (ClassCastException e) { - throw new IllegalArgumentException("Subqueries are not welcome here"); - } + final String queryDataSource = Iterables.getOnlyElement(query.getDataSource().getNames()); for (final ThreadPoolTaskRunnerWorkItem taskRunnerWorkItem : ImmutableList.copyOf(runningItems)) { final Task task = taskRunnerWorkItem.getTask(); diff --git a/processing/src/main/java/io/druid/query/DataSource.java b/processing/src/main/java/io/druid/query/DataSource.java index a4ef603da1f..436e2e0de02 100644 --- a/processing/src/main/java/io/druid/query/DataSource.java +++ b/processing/src/main/java/io/druid/query/DataSource.java @@ -24,15 +24,20 @@ package io.druid.query; import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; +import java.util.List; + @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "type", defaultImpl = LegacyDataSource.class) @JsonSubTypes({ @JsonSubTypes.Type(value = TableDataSource.class, name = "table"), - @JsonSubTypes.Type(value = QueryDataSource.class, name = "query") + @JsonSubTypes.Type(value = QueryDataSource.class, name = "query"), + @JsonSubTypes.Type(value = UnionDataSource.class, name = "union") }) public interface DataSource { - public String getName(); + public Iterable getNames(); + + public String getMetricName(); } diff --git a/processing/src/main/java/io/druid/query/Druids.java b/processing/src/main/java/io/druid/query/Druids.java index 3ab6b0a8ff7..87676414ab0 100644 --- a/processing/src/main/java/io/druid/query/Druids.java +++ b/processing/src/main/java/io/druid/query/Druids.java @@ -537,7 +537,7 @@ public class Druids public SearchQueryBuilder copy(SearchQuery query) { return new SearchQueryBuilder() - .dataSource(((TableDataSource)query.getDataSource()).getName()) + .dataSource(query.getDataSource()) .intervals(query.getQuerySegmentSpec()) .filters(query.getDimensionsFilter()) .granularity(query.getGranularity()) diff --git a/processing/src/main/java/io/druid/query/Query.java b/processing/src/main/java/io/druid/query/Query.java index 9b9c9e373f9..04c581152ad 100644 --- a/processing/src/main/java/io/druid/query/Query.java +++ b/processing/src/main/java/io/druid/query/Query.java @@ -88,4 +88,6 @@ public interface Query public Query withId(String id); public String getId(); + + Query withDataSource(DataSource dataSource); } diff --git a/processing/src/main/java/io/druid/query/QueryDataSource.java b/processing/src/main/java/io/druid/query/QueryDataSource.java index 3f0c397f6d4..de3ff3982cb 100644 --- a/processing/src/main/java/io/druid/query/QueryDataSource.java +++ b/processing/src/main/java/io/druid/query/QueryDataSource.java @@ -24,6 +24,9 @@ package io.druid.query; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.collect.Lists; + +import java.util.List; @JsonTypeName("query") public class QueryDataSource implements DataSource @@ -38,9 +41,15 @@ public class QueryDataSource implements DataSource } @Override - public String getName() + public Iterable getNames() { - return query.getDataSource().getName(); + return query.getDataSource().getNames(); + } + + @Override + public String getMetricName() + { + return query.getDataSource().getMetricName(); } @JsonProperty diff --git a/processing/src/main/java/io/druid/query/TableDataSource.java b/processing/src/main/java/io/druid/query/TableDataSource.java index b658454cbc1..047295733ec 100644 --- a/processing/src/main/java/io/druid/query/TableDataSource.java +++ b/processing/src/main/java/io/druid/query/TableDataSource.java @@ -23,6 +23,9 @@ package io.druid.query; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.collect.Lists; + +import java.util.List; @JsonTypeName("table") public class TableDataSource implements DataSource @@ -37,8 +40,18 @@ public class TableDataSource implements DataSource } @JsonProperty + public String getName(){ + return name; + } + @Override - public String getName() + public List getNames() + { + return Lists.newArrayList(name); + } + + @Override + public String getMetricName() { return name; } diff --git a/processing/src/main/java/io/druid/query/UnionDataSource.java b/processing/src/main/java/io/druid/query/UnionDataSource.java new file mode 100644 index 00000000000..a6034aca7b9 --- /dev/null +++ b/processing/src/main/java/io/druid/query/UnionDataSource.java @@ -0,0 +1,101 @@ +/* + * Druid - a distributed column store. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * + * This file Copyright (C) 2014 N3TWORK, Inc. and contributed to the Druid project + * under the Druid Corporate Contributor License Agreement. + */ + +package io.druid.query; + + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; + +import javax.annotation.Nullable; +import java.util.List; +import java.util.SortedSet; +import java.util.TreeSet; + +public class UnionDataSource implements DataSource +{ + @JsonProperty + private final List dataSources; + + @JsonCreator + public UnionDataSource(@JsonProperty("dataSources") List dataSources) + { + Preconditions.checkNotNull(dataSources, "datasources cannot be null for uniondatasource"); + this.dataSources = dataSources; + } + + @Override + public Iterable getNames() + { + return Iterables.concat(Iterables.transform(dataSources, new Function>() + { + @Override + public Iterable apply(DataSource input) + { + return input.getNames(); + } + })); + } + + @Override + public String getMetricName() + { + SortedSet str = new TreeSet<>(); + for(DataSource ds : dataSources){ + str.add(ds.getMetricName()); + } + return str.toString(); + } + + @JsonProperty + public List getDataSources(){ + return dataSources; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + UnionDataSource that = (UnionDataSource) o; + + if (!dataSources.equals(that.dataSources)) { + return false; + } + + return true; + } + + @Override + public int hashCode() + { + return dataSources.hashCode(); + } +} diff --git a/processing/src/main/java/io/druid/query/UnionQueryRunner.java b/processing/src/main/java/io/druid/query/UnionQueryRunner.java new file mode 100644 index 00000000000..fc1534ebc92 --- /dev/null +++ b/processing/src/main/java/io/druid/query/UnionQueryRunner.java @@ -0,0 +1,65 @@ +/* + * Druid - a distributed column store. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * + * This file Copyright (C) 2014 N3TWORK, Inc. and contributed to the Druid project + * under the Druid Corporate Contributor License Agreement. + */ + +package io.druid.query; + +import com.google.common.base.Function; +import com.google.common.collect.Iterables; +import com.metamx.common.guava.Sequence; +import com.metamx.common.guava.Sequences; + +public class UnionQueryRunner implements QueryRunner +{ + private final QueryRunner baseRunner; + + public UnionQueryRunner(QueryRunner baseRunner) + { + this.baseRunner = baseRunner; + } + + @Override + public Sequence run(final Query query) + { + DataSource dataSource = query.getDataSource(); + if (dataSource instanceof UnionDataSource) { + System.out.println("Breaking into Single Source" + dataSource.getMetricName()); + return Sequences.concat( + Iterables.transform( + ((UnionDataSource) dataSource).getDataSources(), + new Function>() + { + @Override + public Sequence apply(DataSource singleSource) + { + System.out.println("Running with Single Source" + singleSource.getNames()); + return baseRunner.run( + query.withDataSource(singleSource) + ); + } + } + ) + ); + } else { + return baseRunner.run(query); + } + } + +} diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java b/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java index 3f04f30b8aa..0a23b5704b7 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java @@ -257,6 +257,24 @@ public class GroupByQuery extends BaseQuery ); } + @Override + public Query withDataSource(DataSource dataSource) + { + return new GroupByQuery( + dataSource, + getQuerySegmentSpec(), + dimFilter, + granularity, + dimensions, + aggregatorSpecs, + postAggregatorSpecs, + havingSpec, + limitSpec, + orderByLimitFn, + getContext() + ); + } + public static class Builder { private DataSource dataSource; diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java index ab70b34db05..ef34b3b81e1 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -41,6 +41,7 @@ import io.druid.query.QueryDataSource; import io.druid.query.QueryRunner; import io.druid.query.QueryToolChest; import io.druid.query.SubqueryQueryRunner; +import io.druid.query.UnionQueryRunner; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.MetricManipulationFn; import io.druid.segment.incremental.IncrementalIndex; @@ -163,7 +164,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest preMergeQueryDecoration(QueryRunner runner) { - return new SubqueryQueryRunner( - new IntervalChunkingQueryRunner(runner, configSupplier.get().getChunkPeriod())); + return new UnionQueryRunner( + new SubqueryQueryRunner( + new IntervalChunkingQueryRunner(runner, configSupplier.get().getChunkPeriod()) + ) + ); } } diff --git a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java index 98eb896476d..ee8610eb57e 100644 --- a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java @@ -147,7 +147,7 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest spec, toInclude, merge, getContext()); } + @Override + public Query withDataSource(DataSource dataSource) + { + return new SegmentMetadataQuery( + dataSource, + getQuerySegmentSpec(), + toInclude, + merge, + getContext()); + } + @Override public boolean equals(Object o) { diff --git a/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java index a8429ad2726..2a1b6500aaa 100644 --- a/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java @@ -45,6 +45,7 @@ import io.druid.query.QueryToolChest; import io.druid.query.Result; import io.druid.query.ResultGranularTimestampComparator; import io.druid.query.ResultMergeQueryRunner; +import io.druid.query.UnionQueryRunner; import io.druid.query.aggregation.MetricManipulationFn; import io.druid.query.filter.DimFilter; import io.druid.query.search.search.SearchHit; @@ -121,7 +122,7 @@ public class SearchQueryQueryToolChest extends QueryToolChest> preMergeQueryDecoration(QueryRunner> runner) { return new SearchThresholdAdjustingQueryRunner( - new IntervalChunkingQueryRunner>(runner, config.getChunkPeriod()), + new UnionQueryRunner>( + new IntervalChunkingQueryRunner>(runner, config.getChunkPeriod()) + ), config ); } diff --git a/processing/src/main/java/io/druid/query/search/search/SearchQuery.java b/processing/src/main/java/io/druid/query/search/search/SearchQuery.java index 7c2901ea997..19536cd9f24 100644 --- a/processing/src/main/java/io/druid/query/search/search/SearchQuery.java +++ b/processing/src/main/java/io/druid/query/search/search/SearchQuery.java @@ -111,6 +111,22 @@ public class SearchQuery extends BaseQuery> ); } + @Override + public Query> withDataSource(DataSource dataSource) + { + return new SearchQuery( + dataSource, + dimFilter, + granularity, + limit, + getQuerySegmentSpec(), + dimensions, + querySpec, + sortSpec, + getContext() + ); + } + @Override public SearchQuery withOverriddenContext(Map contextOverrides) { diff --git a/processing/src/main/java/io/druid/query/select/SelectQuery.java b/processing/src/main/java/io/druid/query/select/SelectQuery.java index 7556006734f..16ae163e693 100644 --- a/processing/src/main/java/io/druid/query/select/SelectQuery.java +++ b/processing/src/main/java/io/druid/query/select/SelectQuery.java @@ -120,6 +120,21 @@ public class SelectQuery extends BaseQuery> ); } + @Override + public Query> withDataSource(DataSource dataSource) + { + return new SelectQuery( + dataSource, + getQuerySegmentSpec(), + dimFilter, + granularity, + dimensions, + metrics, + pagingSpec, + getContext() + ); + } + public SelectQuery withOverriddenContext(Map contextOverrides) { return new SelectQuery( diff --git a/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java index f3bbe028ed8..d40cb48af0d 100644 --- a/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java @@ -42,6 +42,7 @@ import io.druid.query.QueryToolChest; import io.druid.query.Result; import io.druid.query.ResultGranularTimestampComparator; import io.druid.query.ResultMergeQueryRunner; +import io.druid.query.UnionQueryRunner; import io.druid.query.aggregation.MetricManipulationFn; import io.druid.query.filter.DimFilter; import org.joda.time.DateTime; @@ -123,7 +124,7 @@ public class SelectQueryQueryToolChest extends QueryToolChest> preMergeQueryDecoration(QueryRunner> runner) { - return new IntervalChunkingQueryRunner>(runner, config.getChunkPeriod()); + return new UnionQueryRunner>( + new IntervalChunkingQueryRunner>( + runner, + config.getChunkPeriod() + ) + ); } public Ordering> getOrdering() diff --git a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQuery.java b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQuery.java index 357854f1958..6f3e70b9851 100644 --- a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQuery.java +++ b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQuery.java @@ -97,6 +97,16 @@ public class TimeBoundaryQuery extends BaseQuery ); } + @Override + public Query> withDataSource(DataSource dataSource) + { + return new TimeBoundaryQuery( + dataSource, + getQuerySegmentSpec(), + getContext() + ); + } + public byte[] getCacheKey() { return ByteBuffer.allocate(1) diff --git a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java index 7186b1a6e38..bfba85353b3 100644 --- a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java @@ -117,7 +117,7 @@ public class TimeBoundaryQueryQueryToolChest public ServiceMetricEvent.Builder makeMetricBuilder(TimeBoundaryQuery query) { return new ServiceMetricEvent.Builder() - .setUser2(query.getDataSource().toString()) + .setUser2(query.getDataSource().getMetricName()) .setUser4(query.getType()) .setUser6("false"); } diff --git a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQuery.java b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQuery.java index 3a03018a63e..96a94227b36 100644 --- a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQuery.java +++ b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQuery.java @@ -116,6 +116,20 @@ public class TimeseriesQuery extends BaseQuery> ); } + @Override + public Query> withDataSource(DataSource dataSource) + { + return new TimeseriesQuery( + dataSource, + getQuerySegmentSpec(), + dimFilter, + granularity, + aggregatorSpecs, + postAggregatorSpecs, + getContext() + ); + } + public TimeseriesQuery withOverriddenContext(Map contextOverrides) { return new TimeseriesQuery( diff --git a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java index 482e92adc3e..3ff92321cae 100644 --- a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java @@ -42,6 +42,7 @@ import io.druid.query.QueryToolChest; import io.druid.query.Result; import io.druid.query.ResultGranularTimestampComparator; import io.druid.query.ResultMergeQueryRunner; +import io.druid.query.UnionQueryRunner; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.MetricManipulationFn; import io.druid.query.aggregation.PostAggregator; @@ -122,7 +123,7 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest> preMergeQueryDecoration(QueryRunner> runner) { - return new IntervalChunkingQueryRunner>(runner, config.getChunkPeriod()); + return new UnionQueryRunner>( + new IntervalChunkingQueryRunner>( + runner, + config.getChunkPeriod() + ) + ); } public Ordering> getOrdering() diff --git a/processing/src/main/java/io/druid/query/topn/TopNQuery.java b/processing/src/main/java/io/druid/query/topn/TopNQuery.java index 0e7a796d045..92e3a18cfa5 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNQuery.java +++ b/processing/src/main/java/io/druid/query/topn/TopNQuery.java @@ -27,6 +27,7 @@ import io.druid.granularity.QueryGranularity; import io.druid.query.BaseQuery; import io.druid.query.DataSource; import io.druid.query.Queries; +import io.druid.query.Query; import io.druid.query.Result; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.PostAggregator; @@ -162,6 +163,23 @@ public class TopNQuery extends BaseQuery> ); } + @Override + public Query> withDataSource(DataSource dataSource) + { + return new TopNQuery( + getDataSource(), + dimensionSpec, + topNMetricSpec, + threshold, + getQuerySegmentSpec(), + dimFilter, + granularity, + aggregatorSpecs, + postAggregatorSpecs, + getContext() + ); + } + public TopNQuery withThreshold(int threshold) { return new TopNQuery( diff --git a/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java index 5db416f1d0b..cbce09c1f9c 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java @@ -45,6 +45,7 @@ import io.druid.query.QueryToolChest; import io.druid.query.Result; import io.druid.query.ResultGranularTimestampComparator; import io.druid.query.ResultMergeQueryRunner; +import io.druid.query.UnionQueryRunner; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorUtil; import io.druid.query.aggregation.MetricManipulationFn; @@ -131,7 +132,7 @@ public class TopNQueryQueryToolChest extends QueryToolChest> preMergeQueryDecoration(QueryRunner> runner) { - return new IntervalChunkingQueryRunner>(runner, config.getChunkPeriod()); + return new UnionQueryRunner>( + new IntervalChunkingQueryRunner>( + runner, + config + .getChunkPeriod() + ) + ); } @Override diff --git a/processing/src/test/java/io/druid/query/DataSourceTest.java b/processing/src/test/java/io/druid/query/DataSourceTest.java index 5819fc49701..be6577138da 100644 --- a/processing/src/test/java/io/druid/query/DataSourceTest.java +++ b/processing/src/test/java/io/druid/query/DataSourceTest.java @@ -22,7 +22,10 @@ package io.druid.query; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Function; +import com.google.common.collect.Iterables; import com.google.common.collect.Lists; +import com.metamx.common.guava.Sequence; import io.druid.jackson.DefaultObjectMapper; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory; @@ -85,4 +88,26 @@ public class DataSourceTest Assert.assertEquals(new QueryDataSource(query), dataSource); } + @Test + public void testUnionDataSource() throws Exception + { + DataSource dataSource = jsonMapper.readValue( + "{\"type\":\"union\", \"dataSources\":[\"ds1\", \"ds2\"]}", + DataSource.class + ); + Assert.assertTrue(dataSource instanceof UnionDataSource); + Assert.assertEquals( + Lists.newArrayList(new TableDataSource("ds1"), new TableDataSource("ds2")), + Lists.newArrayList(((UnionDataSource) dataSource).getDataSources()) + ); + Assert.assertEquals( + Lists.newArrayList("ds1", "ds2"), + Lists.newArrayList(dataSource.getNames()) + ); + Assert.assertEquals(Lists.newArrayList("ds1", "ds2").toString(), dataSource.getMetricName()); + + final DataSource serde = jsonMapper.readValue(jsonMapper.writeValueAsString(dataSource), DataSource.class); + Assert.assertEquals(dataSource, serde); + } + } diff --git a/processing/src/test/java/io/druid/query/metadata/SegmentMetadataQueryTest.java b/processing/src/test/java/io/druid/query/metadata/SegmentMetadataQueryTest.java index 0948ec06a37..6e6dd9ce275 100644 --- a/processing/src/test/java/io/druid/query/metadata/SegmentMetadataQueryTest.java +++ b/processing/src/test/java/io/druid/query/metadata/SegmentMetadataQueryTest.java @@ -20,6 +20,7 @@ package io.druid.query.metadata; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Iterables; import io.druid.jackson.DefaultObjectMapper; import io.druid.query.Query; import io.druid.query.metadata.metadata.SegmentMetadataQuery; @@ -41,7 +42,7 @@ public class SegmentMetadataQueryTest + "}"; Query query = mapper.readValue(queryStr, Query.class); Assert.assertTrue(query instanceof SegmentMetadataQuery); - Assert.assertEquals("test_ds", query.getDataSource().getName()); + Assert.assertEquals("test_ds", Iterables.getOnlyElement(query.getDataSource().getNames())); Assert.assertEquals(new Interval("2013-12-04T00:00:00.000Z/2013-12-05T00:00:00.000Z"), query.getIntervals().get(0)); // test serialize and deserialize diff --git a/server/src/main/java/io/druid/client/BrokerServerView.java b/server/src/main/java/io/druid/client/BrokerServerView.java index be6a6553ae7..597a41a481a 100644 --- a/server/src/main/java/io/druid/client/BrokerServerView.java +++ b/server/src/main/java/io/druid/client/BrokerServerView.java @@ -20,6 +20,7 @@ package io.druid.client; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Iterables; import com.google.common.collect.Maps; import com.google.common.collect.Ordering; import com.google.inject.Inject; @@ -237,17 +238,7 @@ public class BrokerServerView implements TimelineServerView @Override public VersionedIntervalTimeline getTimeline(DataSource dataSource) { - String table; - while (dataSource instanceof QueryDataSource) { - dataSource = ((QueryDataSource) dataSource).getQuery().getDataSource(); - } - - if (dataSource instanceof TableDataSource) { - table = ((TableDataSource) dataSource).getName(); - } else { - throw new UnsupportedOperationException("Unsupported data source type: " + dataSource.getClass().getSimpleName()); - } - + String table = Iterables.getOnlyElement(dataSource.getNames()); synchronized (lock) { return timelines.get(table); } diff --git a/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java b/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java index e54ff4bcb9c..d041406ad71 100644 --- a/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java +++ b/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java @@ -21,6 +21,7 @@ package io.druid.segment.realtime; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; +import com.google.common.collect.Iterables; import com.google.common.collect.Maps; import com.google.common.io.Closeables; import com.google.inject.Inject; @@ -126,18 +127,7 @@ public class RealtimeManager implements QuerySegmentWalker private String getDataSourceName(Query query) { - DataSource dataSource = query.getDataSource(); - if (!(dataSource instanceof TableDataSource)) { - throw new UnsupportedOperationException("data source type '" + dataSource.getClass().getName() + "' unsupported"); - } - - String dataSourceName; - try { - dataSourceName = ((TableDataSource) query.getDataSource()).getName(); - } catch (ClassCastException e) { - throw new UnsupportedOperationException("Subqueries are only supported in the broker"); - } - return dataSourceName; + return Iterables.getOnlyElement(query.getDataSource().getNames()); } diff --git a/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java b/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java index 7147ab9e87c..fcf5d9d8f46 100644 --- a/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java +++ b/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java @@ -282,7 +282,7 @@ public class AsyncQueryForwardingServlet extends HttpServlet emitter.emit( new ServiceMetricEvent.Builder() - .setUser2(theQuery.getDataSource().getName()) + .setUser2(theQuery.getDataSource().getMetricName()) .setUser4(theQuery.getType()) .setUser5(COMMA_JOIN.join(theQuery.getIntervals())) .setUser6(String.valueOf(theQuery.hasFilters())) diff --git a/server/src/main/java/io/druid/server/QueryResource.java b/server/src/main/java/io/druid/server/QueryResource.java index 77d0c1bf04c..02b2c95ac2a 100644 --- a/server/src/main/java/io/druid/server/QueryResource.java +++ b/server/src/main/java/io/druid/server/QueryResource.java @@ -131,7 +131,7 @@ public class QueryResource emitter.emit( new ServiceMetricEvent.Builder() - .setUser2(query.getDataSource().toString()) + .setUser2(query.getDataSource().getMetricName()) .setUser4(query.getType()) .setUser5(COMMA_JOIN.join(query.getIntervals())) .setUser6(String.valueOf(query.hasFilters())) diff --git a/server/src/main/java/io/druid/server/coordination/ServerManager.java b/server/src/main/java/io/druid/server/coordination/ServerManager.java index 2869237a8c7..0676430c3d5 100644 --- a/server/src/main/java/io/druid/server/coordination/ServerManager.java +++ b/server/src/main/java/io/druid/server/coordination/ServerManager.java @@ -21,7 +21,9 @@ package io.druid.server.coordination; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Function; +import com.google.common.base.Preconditions; import com.google.common.base.Predicates; +import com.google.common.collect.Iterables; import com.google.common.collect.Ordering; import com.google.inject.Inject; import com.metamx.common.ISE; @@ -252,14 +254,7 @@ public class ServerManager implements QuerySegmentWalker if (!(dataSource instanceof TableDataSource)) { throw new UnsupportedOperationException("data source type '" + dataSource.getClass().getName() + "' unsupported"); } - - String dataSourceName; - try { - dataSourceName = ((TableDataSource) query.getDataSource()).getName(); - } - catch (ClassCastException e) { - throw new UnsupportedOperationException("Subqueries are only supported in the broker"); - } + String dataSourceName = getDataSourceName(dataSource); final VersionedIntervalTimeline timeline = dataSources.get(dataSourceName); @@ -325,6 +320,16 @@ public class ServerManager implements QuerySegmentWalker return new FinalizeResultsQueryRunner(toolChest.mergeResults(factory.mergeRunners(exec, adapters)), toolChest); } + private String getDataSourceName(DataSource dataSource) + { + Preconditions.checkArgument( + dataSource instanceof TableDataSource, + "Subqueries and Unions are only supported in the broker" + ); + + return Iterables.getOnlyElement(dataSource.getNames()); + } + @Override public QueryRunner getQueryRunnerForSegments(Query query, Iterable specs) { @@ -338,13 +343,7 @@ public class ServerManager implements QuerySegmentWalker final QueryToolChest> toolChest = factory.getToolchest(); - String dataSourceName; - try { - dataSourceName = ((TableDataSource) query.getDataSource()).getName(); - } - catch (ClassCastException e) { - throw new UnsupportedOperationException("Subqueries are only supported in the broker"); - } + String dataSourceName = getDataSourceName(query.getDataSource()); final VersionedIntervalTimeline timeline = dataSources.get(dataSourceName); diff --git a/server/src/main/java/io/druid/server/router/TieredBrokerHostSelector.java b/server/src/main/java/io/druid/server/router/TieredBrokerHostSelector.java index 681acd815b1..8ebe2e55050 100644 --- a/server/src/main/java/io/druid/server/router/TieredBrokerHostSelector.java +++ b/server/src/main/java/io/druid/server/router/TieredBrokerHostSelector.java @@ -137,7 +137,8 @@ public class TieredBrokerHostSelector implements HostSelector } if (brokerServiceName == null) { - List rules = ruleManager.getRulesWithDefault((query.getDataSource()).getName()); + // For Union Queries tier will be selected on the rules for first dataSource. + List rules = ruleManager.getRulesWithDefault(Iterables.getFirst(query.getDataSource().getNames(), null)); // find the rule that can apply to the entire set of intervals DateTime now = new DateTime(); From 1d67e399492e90beebadab76df947492f3a49e97 Mon Sep 17 00:00:00 2001 From: nishantmonu51 Date: Thu, 1 May 2014 21:04:47 +0530 Subject: [PATCH 2/7] Fixes fix NPE in select query, calculating cache key remove unwanted logging fix ds in topNQuery --- .../src/main/java/io/druid/indexer/DeterminePartitionsJob.java | 2 -- processing/src/main/java/io/druid/query/UnionDataSource.java | 3 +++ processing/src/main/java/io/druid/query/select/PagingSpec.java | 2 +- processing/src/main/java/io/druid/query/topn/TopNQuery.java | 2 +- 4 files changed, 5 insertions(+), 4 deletions(-) diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java index ddcb691ef09..7a685650528 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java @@ -762,8 +762,6 @@ public class DeterminePartitionsJob implements Jobby log.info(" %s", HadoopDruidIndexerConfig.jsonMapper.writeValueAsString(shardSpec)); } - System.out.println(HadoopDruidIndexerConfig.jsonMapper.writeValueAsString(chosenShardSpecs)); - try { HadoopDruidIndexerConfig.jsonMapper .writerWithType( diff --git a/processing/src/main/java/io/druid/query/UnionDataSource.java b/processing/src/main/java/io/druid/query/UnionDataSource.java index a6034aca7b9..83b9f0acbe7 100644 --- a/processing/src/main/java/io/druid/query/UnionDataSource.java +++ b/processing/src/main/java/io/druid/query/UnionDataSource.java @@ -43,6 +43,9 @@ public class UnionDataSource implements DataSource public UnionDataSource(@JsonProperty("dataSources") List dataSources) { Preconditions.checkNotNull(dataSources, "datasources cannot be null for uniondatasource"); + for(DataSource ds : dataSources){ + Preconditions.checkArgument(ds instanceof TableDataSource, "Union DataSource only supports TableDatasource"); + } this.dataSources = dataSources; } diff --git a/processing/src/main/java/io/druid/query/select/PagingSpec.java b/processing/src/main/java/io/druid/query/select/PagingSpec.java index 7be4cf62746..0ad5b56635d 100644 --- a/processing/src/main/java/io/druid/query/select/PagingSpec.java +++ b/processing/src/main/java/io/druid/query/select/PagingSpec.java @@ -40,7 +40,7 @@ public class PagingSpec @JsonProperty("threshold") int threshold ) { - this.pagingIdentifiers = pagingIdentifiers; + this.pagingIdentifiers = pagingIdentifiers == null ? new LinkedHashMap() : pagingIdentifiers; this.threshold = threshold; } diff --git a/processing/src/main/java/io/druid/query/topn/TopNQuery.java b/processing/src/main/java/io/druid/query/topn/TopNQuery.java index 92e3a18cfa5..b456e487039 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNQuery.java +++ b/processing/src/main/java/io/druid/query/topn/TopNQuery.java @@ -167,7 +167,7 @@ public class TopNQuery extends BaseQuery> public Query> withDataSource(DataSource dataSource) { return new TopNQuery( - getDataSource(), + dataSource, dimensionSpec, topNMetricSpec, threshold, From 92ebe69fd9f5df3b76a4264969f98a4013d5cb1a Mon Sep 17 00:00:00 2001 From: nishantmonu51 Date: Thu, 1 May 2014 21:16:42 +0530 Subject: [PATCH 3/7] remove unwanted loggings --- processing/src/main/java/io/druid/query/UnionQueryRunner.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/processing/src/main/java/io/druid/query/UnionQueryRunner.java b/processing/src/main/java/io/druid/query/UnionQueryRunner.java index fc1534ebc92..0e3088c4847 100644 --- a/processing/src/main/java/io/druid/query/UnionQueryRunner.java +++ b/processing/src/main/java/io/druid/query/UnionQueryRunner.java @@ -40,7 +40,6 @@ public class UnionQueryRunner implements QueryRunner { DataSource dataSource = query.getDataSource(); if (dataSource instanceof UnionDataSource) { - System.out.println("Breaking into Single Source" + dataSource.getMetricName()); return Sequences.concat( Iterables.transform( ((UnionDataSource) dataSource).getDataSources(), @@ -49,7 +48,6 @@ public class UnionQueryRunner implements QueryRunner @Override public Sequence apply(DataSource singleSource) { - System.out.println("Running with Single Source" + singleSource.getNames()); return baseRunner.run( query.withDataSource(singleSource) ); From 1b6137504c92cb31b750f72746fd64e58a9fe38d Mon Sep 17 00:00:00 2001 From: nishantmonu51 Date: Thu, 1 May 2014 21:32:27 +0530 Subject: [PATCH 4/7] Add toString for better logging --- .../src/main/java/io/druid/query/UnionDataSource.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/processing/src/main/java/io/druid/query/UnionDataSource.java b/processing/src/main/java/io/druid/query/UnionDataSource.java index 83b9f0acbe7..003ce86be47 100644 --- a/processing/src/main/java/io/druid/query/UnionDataSource.java +++ b/processing/src/main/java/io/druid/query/UnionDataSource.java @@ -101,4 +101,12 @@ public class UnionDataSource implements DataSource { return dataSources.hashCode(); } + + @Override + public String toString() + { + return "UnionDataSource{" + + "dataSources=" + dataSources + + '}'; + } } From 477e01daf73930bfbbcd1a90e73175eb0a1350cb Mon Sep 17 00:00:00 2001 From: nishantmonu51 Date: Fri, 2 May 2014 15:21:06 +0530 Subject: [PATCH 5/7] review comments --- .../indexing/wikipedia_hadoop_config.json | 2 +- .../java/io/druid/query/TableDataSource.java | 2 +- .../java/io/druid/query/UnionDataSource.java | 11 +++---- .../groupby/GroupByQueryQueryToolChest.java | 7 ++--- .../search/SearchQueryQueryToolChest.java | 5 +-- .../select/SelectQueryQueryToolChest.java | 10 +++--- .../TimeseriesQueryQueryToolChest.java | 10 +++--- .../query/topn/TopNQueryQueryToolChest.java | 27 +++++++--------- .../server/ClientQuerySegmentWalker.java | 31 ++++++++++--------- 9 files changed, 45 insertions(+), 60 deletions(-) diff --git a/examples/bin/examples/indexing/wikipedia_hadoop_config.json b/examples/bin/examples/indexing/wikipedia_hadoop_config.json index 3b268056222..bb1767a1ede 100644 --- a/examples/bin/examples/indexing/wikipedia_hadoop_config.json +++ b/examples/bin/examples/indexing/wikipedia_hadoop_config.json @@ -2,7 +2,7 @@ "dataSource": "wikipedia", "timestampSpec" : { "column": "timestamp", - "format": "iso", + "format": "iso" }, "dataSpec": { "format": "json", diff --git a/processing/src/main/java/io/druid/query/TableDataSource.java b/processing/src/main/java/io/druid/query/TableDataSource.java index 047295733ec..bf2512dce53 100644 --- a/processing/src/main/java/io/druid/query/TableDataSource.java +++ b/processing/src/main/java/io/druid/query/TableDataSource.java @@ -45,7 +45,7 @@ public class TableDataSource implements DataSource } @Override - public List getNames() + public Iterable getNames() { return Lists.newArrayList(name); } diff --git a/processing/src/main/java/io/druid/query/UnionDataSource.java b/processing/src/main/java/io/druid/query/UnionDataSource.java index 003ce86be47..ac62a7e3991 100644 --- a/processing/src/main/java/io/druid/query/UnionDataSource.java +++ b/processing/src/main/java/io/druid/query/UnionDataSource.java @@ -37,15 +37,12 @@ import java.util.TreeSet; public class UnionDataSource implements DataSource { @JsonProperty - private final List dataSources; + private final List dataSources; @JsonCreator - public UnionDataSource(@JsonProperty("dataSources") List dataSources) + public UnionDataSource(@JsonProperty("dataSources") List dataSources) { - Preconditions.checkNotNull(dataSources, "datasources cannot be null for uniondatasource"); - for(DataSource ds : dataSources){ - Preconditions.checkArgument(ds instanceof TableDataSource, "Union DataSource only supports TableDatasource"); - } + Preconditions.checkNotNull(dataSources, "dataSources cannot be null for unionDataSource"); this.dataSources = dataSources; } @@ -73,7 +70,7 @@ public class UnionDataSource implements DataSource } @JsonProperty - public List getDataSources(){ + public List getDataSources(){ return dataSources; } diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java index ef34b3b81e1..5be40a9dbbf 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -41,7 +41,6 @@ import io.druid.query.QueryDataSource; import io.druid.query.QueryRunner; import io.druid.query.QueryToolChest; import io.druid.query.SubqueryQueryRunner; -import io.druid.query.UnionQueryRunner; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.MetricManipulationFn; import io.druid.segment.incremental.IncrementalIndex; @@ -203,10 +202,8 @@ public class GroupByQueryQueryToolChest extends QueryToolChest preMergeQueryDecoration(QueryRunner runner) { - return new UnionQueryRunner( - new SubqueryQueryRunner( - new IntervalChunkingQueryRunner(runner, configSupplier.get().getChunkPeriod()) - ) + return new SubqueryQueryRunner( + new IntervalChunkingQueryRunner(runner, configSupplier.get().getChunkPeriod()) ); } } diff --git a/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java index 2a1b6500aaa..cefd92bb94f 100644 --- a/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java @@ -45,7 +45,6 @@ import io.druid.query.QueryToolChest; import io.druid.query.Result; import io.druid.query.ResultGranularTimestampComparator; import io.druid.query.ResultMergeQueryRunner; -import io.druid.query.UnionQueryRunner; import io.druid.query.aggregation.MetricManipulationFn; import io.druid.query.filter.DimFilter; import io.druid.query.search.search.SearchHit; @@ -259,9 +258,7 @@ public class SearchQueryQueryToolChest extends QueryToolChest> preMergeQueryDecoration(QueryRunner> runner) { return new SearchThresholdAdjustingQueryRunner( - new UnionQueryRunner>( - new IntervalChunkingQueryRunner>(runner, config.getChunkPeriod()) - ), + new IntervalChunkingQueryRunner>(runner, config.getChunkPeriod()), config ); } diff --git a/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java index d40cb48af0d..aad589a2a7a 100644 --- a/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java @@ -42,7 +42,6 @@ import io.druid.query.QueryToolChest; import io.druid.query.Result; import io.druid.query.ResultGranularTimestampComparator; import io.druid.query.ResultMergeQueryRunner; -import io.druid.query.UnionQueryRunner; import io.druid.query.aggregation.MetricManipulationFn; import io.druid.query.filter.DimFilter; import org.joda.time.DateTime; @@ -278,11 +277,10 @@ public class SelectQueryQueryToolChest extends QueryToolChest> preMergeQueryDecoration(QueryRunner> runner) { - return new UnionQueryRunner>( - new IntervalChunkingQueryRunner>( - runner, - config.getChunkPeriod() - ) + return new IntervalChunkingQueryRunner>( + runner, + config.getChunkPeriod() + ); } diff --git a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java index 3ff92321cae..441373a56ed 100644 --- a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java @@ -42,7 +42,6 @@ import io.druid.query.QueryToolChest; import io.druid.query.Result; import io.druid.query.ResultGranularTimestampComparator; import io.druid.query.ResultMergeQueryRunner; -import io.druid.query.UnionQueryRunner; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.MetricManipulationFn; import io.druid.query.aggregation.PostAggregator; @@ -230,11 +229,10 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest> preMergeQueryDecoration(QueryRunner> runner) { - return new UnionQueryRunner>( - new IntervalChunkingQueryRunner>( - runner, - config.getChunkPeriod() - ) + return new IntervalChunkingQueryRunner>( + runner, + config.getChunkPeriod() + ); } diff --git a/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java index cbce09c1f9c..15ddca402fd 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java @@ -45,7 +45,6 @@ import io.druid.query.QueryToolChest; import io.druid.query.Result; import io.druid.query.ResultGranularTimestampComparator; import io.druid.query.ResultMergeQueryRunner; -import io.druid.query.UnionQueryRunner; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorUtil; import io.druid.query.aggregation.MetricManipulationFn; @@ -55,7 +54,6 @@ import org.joda.time.DateTime; import org.joda.time.Interval; import org.joda.time.Minutes; -import javax.annotation.Nullable; import java.nio.ByteBuffer; import java.util.Iterator; import java.util.List; @@ -83,6 +81,14 @@ public class TopNQueryQueryToolChest extends QueryToolChest prunePostAggregators(TopNQuery query) + { + return AggregatorUtil.pruneDependentPostAgg( + query.getPostAggregatorSpecs(), + query.getTopNMetricSpec().getMetricName(query.getDimensionSpec()) + ); + } + @Override public QueryRunner> mergeResults(QueryRunner> runner) { @@ -371,12 +377,9 @@ public class TopNQueryQueryToolChest extends QueryToolChest> preMergeQueryDecoration(QueryRunner> runner) { - return new UnionQueryRunner>( - new IntervalChunkingQueryRunner>( - runner, - config - .getChunkPeriod() - ) + return new IntervalChunkingQueryRunner>( + runner, + config.getChunkPeriod() ); } @@ -475,12 +478,4 @@ public class TopNQueryQueryToolChest extends QueryToolChest prunePostAggregators(TopNQuery query) - { - return AggregatorUtil.pruneDependentPostAgg( - query.getPostAggregatorSpecs(), - query.getTopNMetricSpec().getMetricName(query.getDimensionSpec()) - ); - } } diff --git a/server/src/main/java/io/druid/server/ClientQuerySegmentWalker.java b/server/src/main/java/io/druid/server/ClientQuerySegmentWalker.java index 76d9d1aa838..d500c9f70e2 100644 --- a/server/src/main/java/io/druid/server/ClientQuerySegmentWalker.java +++ b/server/src/main/java/io/druid/server/ClientQuerySegmentWalker.java @@ -32,12 +32,13 @@ import io.druid.query.QuerySegmentWalker; import io.druid.query.QueryToolChest; import io.druid.query.QueryToolChestWarehouse; import io.druid.query.SegmentDescriptor; +import io.druid.query.UnionQueryRunner; import org.joda.time.Interval; import javax.annotation.Nullable; /** -*/ + */ public class ClientQuerySegmentWalker implements QuerySegmentWalker { private final ServiceEmitter emitter; @@ -70,22 +71,24 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker private FinalizeResultsQueryRunner makeRunner(final Query query) { - final QueryToolChest> toolChest = warehouse.getToolChest(query); + final QueryToolChest> toolChest = warehouse.getToolChest(query); return new FinalizeResultsQueryRunner( toolChest.postMergeQueryDecoration( toolChest.mergeResults( - new MetricsEmittingQueryRunner( - emitter, - new Function, ServiceMetricEvent.Builder>() - { - @Override - public ServiceMetricEvent.Builder apply(@Nullable Query input) - { - return toolChest.makeMetricBuilder(query); - } - }, - toolChest.preMergeQueryDecoration(baseClient) - ).withWaitMeasuredFromNow() + new UnionQueryRunner( + new MetricsEmittingQueryRunner( + emitter, + new Function, ServiceMetricEvent.Builder>() + { + @Override + public ServiceMetricEvent.Builder apply(@Nullable Query input) + { + return toolChest.makeMetricBuilder(query); + } + }, + toolChest.preMergeQueryDecoration(baseClient) + ).withWaitMeasuredFromNow() + ) ) ), toolChest From f2725994b75fc5b0a407db051bd9dd1b16864382 Mon Sep 17 00:00:00 2001 From: nishantmonu51 Date: Mon, 5 May 2014 19:57:32 +0530 Subject: [PATCH 6/7] refactor method --- .../main/java/io/druid/indexer/DeterminePartitionsJob.java | 2 ++ processing/src/main/java/io/druid/query/DataSource.java | 2 +- .../src/main/java/io/druid/query/QueryDataSource.java | 7 ++----- .../src/main/java/io/druid/query/TableDataSource.java | 2 +- .../src/main/java/io/druid/query/UnionDataSource.java | 6 ++---- .../io/druid/query/groupby/GroupByQueryQueryToolChest.java | 2 +- .../query/metadata/SegmentMetadataQueryQueryToolChest.java | 2 +- .../io/druid/query/search/SearchQueryQueryToolChest.java | 2 +- .../io/druid/query/select/SelectQueryQueryToolChest.java | 2 +- .../timeboundary/TimeBoundaryQueryQueryToolChest.java | 2 +- .../query/timeseries/TimeseriesQueryQueryToolChest.java | 2 +- .../java/io/druid/query/topn/TopNQueryQueryToolChest.java | 2 +- .../src/test/java/io/druid/query/DataSourceTest.java | 5 +---- .../java/io/druid/server/AsyncQueryForwardingServlet.java | 2 +- server/src/main/java/io/druid/server/QueryResource.java | 2 +- 15 files changed, 18 insertions(+), 24 deletions(-) diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java index 7a685650528..ddcb691ef09 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java @@ -762,6 +762,8 @@ public class DeterminePartitionsJob implements Jobby log.info(" %s", HadoopDruidIndexerConfig.jsonMapper.writeValueAsString(shardSpec)); } + System.out.println(HadoopDruidIndexerConfig.jsonMapper.writeValueAsString(chosenShardSpecs)); + try { HadoopDruidIndexerConfig.jsonMapper .writerWithType( diff --git a/processing/src/main/java/io/druid/query/DataSource.java b/processing/src/main/java/io/druid/query/DataSource.java index 436e2e0de02..693e80b1ce1 100644 --- a/processing/src/main/java/io/druid/query/DataSource.java +++ b/processing/src/main/java/io/druid/query/DataSource.java @@ -39,5 +39,5 @@ public interface DataSource { public Iterable getNames(); - public String getMetricName(); + public String toShortString(); } diff --git a/processing/src/main/java/io/druid/query/QueryDataSource.java b/processing/src/main/java/io/druid/query/QueryDataSource.java index de3ff3982cb..d0f4c4ae9c1 100644 --- a/processing/src/main/java/io/druid/query/QueryDataSource.java +++ b/processing/src/main/java/io/druid/query/QueryDataSource.java @@ -24,9 +24,6 @@ package io.druid.query; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; -import com.google.common.collect.Lists; - -import java.util.List; @JsonTypeName("query") public class QueryDataSource implements DataSource @@ -47,9 +44,9 @@ public class QueryDataSource implements DataSource } @Override - public String getMetricName() + public String toShortString() { - return query.getDataSource().getMetricName(); + return query.getDataSource().toShortString(); } @JsonProperty diff --git a/processing/src/main/java/io/druid/query/TableDataSource.java b/processing/src/main/java/io/druid/query/TableDataSource.java index bf2512dce53..34eb7237f71 100644 --- a/processing/src/main/java/io/druid/query/TableDataSource.java +++ b/processing/src/main/java/io/druid/query/TableDataSource.java @@ -51,7 +51,7 @@ public class TableDataSource implements DataSource } @Override - public String getMetricName() + public String toShortString() { return name; } diff --git a/processing/src/main/java/io/druid/query/UnionDataSource.java b/processing/src/main/java/io/druid/query/UnionDataSource.java index ac62a7e3991..d30f44976b7 100644 --- a/processing/src/main/java/io/druid/query/UnionDataSource.java +++ b/processing/src/main/java/io/druid/query/UnionDataSource.java @@ -27,9 +27,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; -import javax.annotation.Nullable; import java.util.List; import java.util.SortedSet; import java.util.TreeSet; @@ -60,11 +58,11 @@ public class UnionDataSource implements DataSource } @Override - public String getMetricName() + public String toShortString() { SortedSet str = new TreeSet<>(); for(DataSource ds : dataSources){ - str.add(ds.getMetricName()); + str.add(ds.toShortString()); } return str.toString(); } diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java index 5be40a9dbbf..6e30561bc0c 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -163,7 +163,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest Date: Wed, 7 May 2014 00:54:17 +0530 Subject: [PATCH 7/7] review comments remove toShortString minor other changes --- .../main/java/io/druid/query/DataSource.java | 4 +- .../java/io/druid/query/DataSourceUtil.java | 31 ++++++++++++++++ .../java/io/druid/query/QueryDataSource.java | 10 ++--- .../java/io/druid/query/TableDataSource.java | 11 ++---- .../java/io/druid/query/UnionDataSource.java | 37 ++++++++----------- .../groupby/GroupByQueryQueryToolChest.java | 3 +- .../SegmentMetadataQueryQueryToolChest.java | 3 +- .../search/SearchQueryQueryToolChest.java | 3 +- .../select/SelectQueryQueryToolChest.java | 3 +- .../TimeBoundaryQueryQueryToolChest.java | 3 +- .../TimeseriesQueryQueryToolChest.java | 3 +- .../query/topn/TopNQueryQueryToolChest.java | 3 +- .../java/io/druid/query/DataSourceTest.java | 1 - .../server/AsyncQueryForwardingServlet.java | 3 +- .../java/io/druid/server/QueryResource.java | 3 +- .../server/coordination/ServerManager.java | 5 --- 16 files changed, 71 insertions(+), 55 deletions(-) create mode 100644 processing/src/main/java/io/druid/query/DataSourceUtil.java diff --git a/processing/src/main/java/io/druid/query/DataSource.java b/processing/src/main/java/io/druid/query/DataSource.java index 693e80b1ce1..ee0c24dac1e 100644 --- a/processing/src/main/java/io/druid/query/DataSource.java +++ b/processing/src/main/java/io/druid/query/DataSource.java @@ -37,7 +37,5 @@ import java.util.List; }) public interface DataSource { - public Iterable getNames(); - - public String toShortString(); + public List getNames(); } diff --git a/processing/src/main/java/io/druid/query/DataSourceUtil.java b/processing/src/main/java/io/druid/query/DataSourceUtil.java new file mode 100644 index 00000000000..109db95ee04 --- /dev/null +++ b/processing/src/main/java/io/druid/query/DataSourceUtil.java @@ -0,0 +1,31 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.query; + +import java.util.List; + +public class DataSourceUtil +{ + public static String getMetricName(DataSource dataSource) + { + final List names = dataSource.getNames(); + return names.size() == 1 ? names.get(0) : names.toString(); + } +} diff --git a/processing/src/main/java/io/druid/query/QueryDataSource.java b/processing/src/main/java/io/druid/query/QueryDataSource.java index d0f4c4ae9c1..d81633d5fc4 100644 --- a/processing/src/main/java/io/druid/query/QueryDataSource.java +++ b/processing/src/main/java/io/druid/query/QueryDataSource.java @@ -25,6 +25,8 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; +import java.util.List; + @JsonTypeName("query") public class QueryDataSource implements DataSource { @@ -38,17 +40,11 @@ public class QueryDataSource implements DataSource } @Override - public Iterable getNames() + public List getNames() { return query.getDataSource().getNames(); } - @Override - public String toShortString() - { - return query.getDataSource().toShortString(); - } - @JsonProperty public Query getQuery() { diff --git a/processing/src/main/java/io/druid/query/TableDataSource.java b/processing/src/main/java/io/druid/query/TableDataSource.java index 34eb7237f71..9fbb26cbe0f 100644 --- a/processing/src/main/java/io/druid/query/TableDataSource.java +++ b/processing/src/main/java/io/druid/query/TableDataSource.java @@ -25,6 +25,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; import com.google.common.collect.Lists; +import java.util.Arrays; import java.util.List; @JsonTypeName("table") @@ -45,15 +46,9 @@ public class TableDataSource implements DataSource } @Override - public Iterable getNames() + public List getNames() { - return Lists.newArrayList(name); - } - - @Override - public String toShortString() - { - return name; + return Arrays.asList(name); } public String toString() { return name; } diff --git a/processing/src/main/java/io/druid/query/UnionDataSource.java b/processing/src/main/java/io/druid/query/UnionDataSource.java index d30f44976b7..a861637e318 100644 --- a/processing/src/main/java/io/druid/query/UnionDataSource.java +++ b/processing/src/main/java/io/druid/query/UnionDataSource.java @@ -27,10 +27,9 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; import java.util.List; -import java.util.SortedSet; -import java.util.TreeSet; public class UnionDataSource implements DataSource { @@ -45,30 +44,24 @@ public class UnionDataSource implements DataSource } @Override - public Iterable getNames() + public List getNames() { - return Iterables.concat(Iterables.transform(dataSources, new Function>() - { - @Override - public Iterable apply(DataSource input) - { - return input.getNames(); - } - })); - } - - @Override - public String toShortString() - { - SortedSet str = new TreeSet<>(); - for(DataSource ds : dataSources){ - str.add(ds.toShortString()); - } - return str.toString(); + return Lists.transform( + dataSources, + new Function() + { + @Override + public String apply(TableDataSource input) + { + return Iterables.getOnlyElement(input.getNames()); + } + } + ); } @JsonProperty - public List getDataSources(){ + public List getDataSources() + { return dataSources; } diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java index 6e30561bc0c..0e00ceae46d 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -35,6 +35,7 @@ import com.metamx.emitter.service.ServiceMetricEvent; import io.druid.data.input.MapBasedRow; import io.druid.data.input.Row; import io.druid.query.DataSource; +import io.druid.query.DataSourceUtil; import io.druid.query.IntervalChunkingQueryRunner; import io.druid.query.Query; import io.druid.query.QueryDataSource; @@ -163,7 +164,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest