Merge pull request #528 from metamx/union-query-source

Union query source
This commit is contained in:
fjy 2014-05-06 14:13:33 -06:00
commit 79e6d4eb56
34 changed files with 420 additions and 93 deletions

View File

@ -2,7 +2,7 @@
"dataSource": "wikipedia", "dataSource": "wikipedia",
"timestampSpec" : { "timestampSpec" : {
"column": "timestamp", "column": "timestamp",
"format": "iso", "format": "iso"
}, },
"dataSpec": { "dataSpec": {
"format": "json", "format": "json",

View File

@ -22,6 +22,7 @@ package io.druid.indexing.overlord;
import com.google.api.client.repackaged.com.google.common.base.Preconditions; import com.google.api.client.repackaged.com.google.common.base.Preconditions;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Futures;
@ -153,13 +154,7 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker
private <T> QueryRunner<T> getQueryRunnerImpl(Query<T> query) private <T> QueryRunner<T> getQueryRunnerImpl(Query<T> query)
{ {
QueryRunner<T> queryRunner = null; QueryRunner<T> queryRunner = null;
String queryDataSource; final String queryDataSource = Iterables.getOnlyElement(query.getDataSource().getNames());
try {
queryDataSource = ((TableDataSource)query.getDataSource()).getName();
}
catch (ClassCastException e) {
throw new IllegalArgumentException("Subqueries are not welcome here");
}
for (final ThreadPoolTaskRunnerWorkItem taskRunnerWorkItem : ImmutableList.copyOf(runningItems)) { for (final ThreadPoolTaskRunnerWorkItem taskRunnerWorkItem : ImmutableList.copyOf(runningItems)) {
final Task task = taskRunnerWorkItem.getTask(); final Task task = taskRunnerWorkItem.getTask();

View File

@ -24,15 +24,18 @@ package io.druid.query;
import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo; import com.fasterxml.jackson.annotation.JsonTypeInfo;
import java.util.List;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, @JsonTypeInfo(use = JsonTypeInfo.Id.NAME,
include = JsonTypeInfo.As.PROPERTY, include = JsonTypeInfo.As.PROPERTY,
property = "type", property = "type",
defaultImpl = LegacyDataSource.class) defaultImpl = LegacyDataSource.class)
@JsonSubTypes({ @JsonSubTypes({
@JsonSubTypes.Type(value = TableDataSource.class, name = "table"), @JsonSubTypes.Type(value = TableDataSource.class, name = "table"),
@JsonSubTypes.Type(value = QueryDataSource.class, name = "query") @JsonSubTypes.Type(value = QueryDataSource.class, name = "query"),
@JsonSubTypes.Type(value = UnionDataSource.class, name = "union")
}) })
public interface DataSource public interface DataSource
{ {
public String getName(); public List<String> getNames();
} }

View File

@ -0,0 +1,31 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query;
import java.util.List;
public class DataSourceUtil
{
public static String getMetricName(DataSource dataSource)
{
final List<String> names = dataSource.getNames();
return names.size() == 1 ? names.get(0) : names.toString();
}
}

View File

@ -537,7 +537,7 @@ public class Druids
public SearchQueryBuilder copy(SearchQuery query) public SearchQueryBuilder copy(SearchQuery query)
{ {
return new SearchQueryBuilder() return new SearchQueryBuilder()
.dataSource(((TableDataSource)query.getDataSource()).getName()) .dataSource(query.getDataSource())
.intervals(query.getQuerySegmentSpec()) .intervals(query.getQuerySegmentSpec())
.filters(query.getDimensionsFilter()) .filters(query.getDimensionsFilter())
.granularity(query.getGranularity()) .granularity(query.getGranularity())

View File

@ -88,4 +88,6 @@ public interface Query<T>
public Query<T> withId(String id); public Query<T> withId(String id);
public String getId(); public String getId();
Query<T> withDataSource(DataSource dataSource);
} }

View File

@ -25,6 +25,8 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName; import com.fasterxml.jackson.annotation.JsonTypeName;
import java.util.List;
@JsonTypeName("query") @JsonTypeName("query")
public class QueryDataSource implements DataSource public class QueryDataSource implements DataSource
{ {
@ -38,9 +40,9 @@ public class QueryDataSource implements DataSource
} }
@Override @Override
public String getName() public List<String> getNames()
{ {
return query.getDataSource().getName(); return query.getDataSource().getNames();
} }
@JsonProperty @JsonProperty

View File

@ -23,6 +23,10 @@ package io.druid.query;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName; import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.collect.Lists;
import java.util.Arrays;
import java.util.List;
@JsonTypeName("table") @JsonTypeName("table")
public class TableDataSource implements DataSource public class TableDataSource implements DataSource
@ -37,12 +41,16 @@ public class TableDataSource implements DataSource
} }
@JsonProperty @JsonProperty
@Override public String getName(){
public String getName()
{
return name; return name;
} }
@Override
public List<String> getNames()
{
return Arrays.asList(name);
}
public String toString() { return name; } public String toString() { return name; }
@Override @Override

View File

@ -0,0 +1,100 @@
/*
* Druid - a distributed column store.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*
* This file Copyright (C) 2014 N3TWORK, Inc. and contributed to the Druid project
* under the Druid Corporate Contributor License Agreement.
*/
package io.druid.query;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import java.util.List;
public class UnionDataSource implements DataSource
{
@JsonProperty
private final List<TableDataSource> dataSources;
@JsonCreator
public UnionDataSource(@JsonProperty("dataSources") List<TableDataSource> dataSources)
{
Preconditions.checkNotNull(dataSources, "dataSources cannot be null for unionDataSource");
this.dataSources = dataSources;
}
@Override
public List<String> getNames()
{
return Lists.transform(
dataSources,
new Function<TableDataSource, String>()
{
@Override
public String apply(TableDataSource input)
{
return Iterables.getOnlyElement(input.getNames());
}
}
);
}
@JsonProperty
public List<TableDataSource> getDataSources()
{
return dataSources;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
UnionDataSource that = (UnionDataSource) o;
if (!dataSources.equals(that.dataSources)) {
return false;
}
return true;
}
@Override
public int hashCode()
{
return dataSources.hashCode();
}
@Override
public String toString()
{
return "UnionDataSource{" +
"dataSources=" + dataSources +
'}';
}
}

View File

@ -0,0 +1,63 @@
/*
* Druid - a distributed column store.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*
* This file Copyright (C) 2014 N3TWORK, Inc. and contributed to the Druid project
* under the Druid Corporate Contributor License Agreement.
*/
package io.druid.query;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
public class UnionQueryRunner<T> implements QueryRunner<T>
{
private final QueryRunner<T> baseRunner;
public UnionQueryRunner(QueryRunner<T> baseRunner)
{
this.baseRunner = baseRunner;
}
@Override
public Sequence<T> run(final Query<T> query)
{
DataSource dataSource = query.getDataSource();
if (dataSource instanceof UnionDataSource) {
return Sequences.concat(
Iterables.transform(
((UnionDataSource) dataSource).getDataSources(),
new Function<DataSource, Sequence<T>>()
{
@Override
public Sequence<T> apply(DataSource singleSource)
{
return baseRunner.run(
query.withDataSource(singleSource)
);
}
}
)
);
} else {
return baseRunner.run(query);
}
}
}

View File

@ -257,6 +257,24 @@ public class GroupByQuery extends BaseQuery<Row>
); );
} }
@Override
public Query<Row> withDataSource(DataSource dataSource)
{
return new GroupByQuery(
dataSource,
getQuerySegmentSpec(),
dimFilter,
granularity,
dimensions,
aggregatorSpecs,
postAggregatorSpecs,
havingSpec,
limitSpec,
orderByLimitFn,
getContext()
);
}
public static class Builder public static class Builder
{ {
private DataSource dataSource; private DataSource dataSource;

View File

@ -35,6 +35,7 @@ import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.data.input.MapBasedRow; import io.druid.data.input.MapBasedRow;
import io.druid.data.input.Row; import io.druid.data.input.Row;
import io.druid.query.DataSource; import io.druid.query.DataSource;
import io.druid.query.DataSourceUtil;
import io.druid.query.IntervalChunkingQueryRunner; import io.druid.query.IntervalChunkingQueryRunner;
import io.druid.query.Query; import io.druid.query.Query;
import io.druid.query.QueryDataSource; import io.druid.query.QueryDataSource;
@ -163,7 +164,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
} }
return new ServiceMetricEvent.Builder() return new ServiceMetricEvent.Builder()
.setUser2(query.getDataSource().toString()) .setUser2(DataSourceUtil.getMetricName(query.getDataSource()))
.setUser3(String.format("%,d dims", query.getDimensions().size())) .setUser3(String.format("%,d dims", query.getDimensions().size()))
.setUser4("groupBy") .setUser4("groupBy")
.setUser5(Joiner.on(",").join(query.getIntervals())) .setUser5(Joiner.on(",").join(query.getIntervals()))
@ -203,6 +204,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
public QueryRunner<Row> preMergeQueryDecoration(QueryRunner<Row> runner) public QueryRunner<Row> preMergeQueryDecoration(QueryRunner<Row> runner)
{ {
return new SubqueryQueryRunner<Row>( return new SubqueryQueryRunner<Row>(
new IntervalChunkingQueryRunner<Row>(runner, configSupplier.get().getChunkPeriod())); new IntervalChunkingQueryRunner<Row>(runner, configSupplier.get().getChunkPeriod())
);
} }
} }

View File

@ -35,6 +35,7 @@ import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.collections.OrderedMergeSequence; import io.druid.collections.OrderedMergeSequence;
import io.druid.common.utils.JodaUtils; import io.druid.common.utils.JodaUtils;
import io.druid.query.CacheStrategy; import io.druid.query.CacheStrategy;
import io.druid.query.DataSourceUtil;
import io.druid.query.Query; import io.druid.query.Query;
import io.druid.query.QueryRunner; import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest; import io.druid.query.QueryToolChest;
@ -147,7 +148,7 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest<SegmentAn
} }
return new ServiceMetricEvent.Builder() return new ServiceMetricEvent.Builder()
.setUser2(query.getDataSource().toString()) .setUser2(DataSourceUtil.getMetricName(query.getDataSource()))
.setUser4(query.getType()) .setUser4(query.getType())
.setUser5(Joiner.on(",").join(query.getIntervals())) .setUser5(Joiner.on(",").join(query.getIntervals()))
.setUser6(String.valueOf(query.hasFilters())) .setUser6(String.valueOf(query.hasFilters()))

View File

@ -92,6 +92,17 @@ public class SegmentMetadataQuery extends BaseQuery<SegmentAnalysis>
spec, toInclude, merge, getContext()); spec, toInclude, merge, getContext());
} }
@Override
public Query<SegmentAnalysis> withDataSource(DataSource dataSource)
{
return new SegmentMetadataQuery(
dataSource,
getQuerySegmentSpec(),
toInclude,
merge,
getContext());
}
@Override @Override
public boolean equals(Object o) public boolean equals(Object o)
{ {

View File

@ -38,6 +38,7 @@ import com.metamx.common.guava.nary.BinaryFn;
import com.metamx.emitter.service.ServiceMetricEvent; import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.collections.OrderedMergeSequence; import io.druid.collections.OrderedMergeSequence;
import io.druid.query.CacheStrategy; import io.druid.query.CacheStrategy;
import io.druid.query.DataSourceUtil;
import io.druid.query.IntervalChunkingQueryRunner; import io.druid.query.IntervalChunkingQueryRunner;
import io.druid.query.Query; import io.druid.query.Query;
import io.druid.query.QueryRunner; import io.druid.query.QueryRunner;
@ -121,7 +122,7 @@ public class SearchQueryQueryToolChest extends QueryToolChest<Result<SearchResul
} }
return new ServiceMetricEvent.Builder() return new ServiceMetricEvent.Builder()
.setUser2(query.getDataSource().toString()) .setUser2(DataSourceUtil.getMetricName(query.getDataSource()))
.setUser4("search") .setUser4("search")
.setUser5(COMMA_JOIN.join(query.getIntervals())) .setUser5(COMMA_JOIN.join(query.getIntervals()))
.setUser6(String.valueOf(query.hasFilters())) .setUser6(String.valueOf(query.hasFilters()))

View File

@ -111,6 +111,22 @@ public class SearchQuery extends BaseQuery<Result<SearchResultValue>>
); );
} }
@Override
public Query<Result<SearchResultValue>> withDataSource(DataSource dataSource)
{
return new SearchQuery(
dataSource,
dimFilter,
granularity,
limit,
getQuerySegmentSpec(),
dimensions,
querySpec,
sortSpec,
getContext()
);
}
@Override @Override
public SearchQuery withOverriddenContext(Map<String, Object> contextOverrides) public SearchQuery withOverriddenContext(Map<String, Object> contextOverrides)
{ {

View File

@ -40,7 +40,7 @@ public class PagingSpec
@JsonProperty("threshold") int threshold @JsonProperty("threshold") int threshold
) )
{ {
this.pagingIdentifiers = pagingIdentifiers; this.pagingIdentifiers = pagingIdentifiers == null ? new LinkedHashMap<String, Integer>() : pagingIdentifiers;
this.threshold = threshold; this.threshold = threshold;
} }

View File

@ -120,6 +120,21 @@ public class SelectQuery extends BaseQuery<Result<SelectResultValue>>
); );
} }
@Override
public Query<Result<SelectResultValue>> withDataSource(DataSource dataSource)
{
return new SelectQuery(
dataSource,
getQuerySegmentSpec(),
dimFilter,
granularity,
dimensions,
metrics,
pagingSpec,
getContext()
);
}
public SelectQuery withOverriddenContext(Map<String, Object> contextOverrides) public SelectQuery withOverriddenContext(Map<String, Object> contextOverrides)
{ {
return new SelectQuery( return new SelectQuery(

View File

@ -34,6 +34,7 @@ import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.collections.OrderedMergeSequence; import io.druid.collections.OrderedMergeSequence;
import io.druid.granularity.QueryGranularity; import io.druid.granularity.QueryGranularity;
import io.druid.query.CacheStrategy; import io.druid.query.CacheStrategy;
import io.druid.query.DataSourceUtil;
import io.druid.query.IntervalChunkingQueryRunner; import io.druid.query.IntervalChunkingQueryRunner;
import io.druid.query.Query; import io.druid.query.Query;
import io.druid.query.QueryConfig; import io.druid.query.QueryConfig;
@ -123,7 +124,7 @@ public class SelectQueryQueryToolChest extends QueryToolChest<Result<SelectResul
} }
return new ServiceMetricEvent.Builder() return new ServiceMetricEvent.Builder()
.setUser2(query.getDataSource().toString()) .setUser2(DataSourceUtil.getMetricName(query.getDataSource()))
.setUser4("Select") .setUser4("Select")
.setUser5(COMMA_JOIN.join(query.getIntervals())) .setUser5(COMMA_JOIN.join(query.getIntervals()))
.setUser6(String.valueOf(query.hasFilters())) .setUser6(String.valueOf(query.hasFilters()))
@ -277,7 +278,11 @@ public class SelectQueryQueryToolChest extends QueryToolChest<Result<SelectResul
@Override @Override
public QueryRunner<Result<SelectResultValue>> preMergeQueryDecoration(QueryRunner<Result<SelectResultValue>> runner) public QueryRunner<Result<SelectResultValue>> preMergeQueryDecoration(QueryRunner<Result<SelectResultValue>> runner)
{ {
return new IntervalChunkingQueryRunner<Result<SelectResultValue>>(runner, config.getChunkPeriod()); return new IntervalChunkingQueryRunner<Result<SelectResultValue>>(
runner,
config.getChunkPeriod()
);
} }
public Ordering<Result<SelectResultValue>> getOrdering() public Ordering<Result<SelectResultValue>> getOrdering()

View File

@ -97,6 +97,16 @@ public class TimeBoundaryQuery extends BaseQuery<Result<TimeBoundaryResultValue>
); );
} }
@Override
public Query<Result<TimeBoundaryResultValue>> withDataSource(DataSource dataSource)
{
return new TimeBoundaryQuery(
dataSource,
getQuerySegmentSpec(),
getContext()
);
}
public byte[] getCacheKey() public byte[] getCacheKey()
{ {
return ByteBuffer.allocate(1) return ByteBuffer.allocate(1)

View File

@ -33,6 +33,7 @@ import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.collections.OrderedMergeSequence; import io.druid.collections.OrderedMergeSequence;
import io.druid.query.BySegmentSkippingQueryRunner; import io.druid.query.BySegmentSkippingQueryRunner;
import io.druid.query.CacheStrategy; import io.druid.query.CacheStrategy;
import io.druid.query.DataSourceUtil;
import io.druid.query.Query; import io.druid.query.Query;
import io.druid.query.QueryRunner; import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest; import io.druid.query.QueryToolChest;
@ -117,7 +118,7 @@ public class TimeBoundaryQueryQueryToolChest
public ServiceMetricEvent.Builder makeMetricBuilder(TimeBoundaryQuery query) public ServiceMetricEvent.Builder makeMetricBuilder(TimeBoundaryQuery query)
{ {
return new ServiceMetricEvent.Builder() return new ServiceMetricEvent.Builder()
.setUser2(query.getDataSource().toString()) .setUser2(DataSourceUtil.getMetricName(query.getDataSource()))
.setUser4(query.getType()) .setUser4(query.getType())
.setUser6("false"); .setUser6("false");
} }

View File

@ -116,6 +116,20 @@ public class TimeseriesQuery extends BaseQuery<Result<TimeseriesResultValue>>
); );
} }
@Override
public Query<Result<TimeseriesResultValue>> withDataSource(DataSource dataSource)
{
return new TimeseriesQuery(
dataSource,
getQuerySegmentSpec(),
dimFilter,
granularity,
aggregatorSpecs,
postAggregatorSpecs,
getContext()
);
}
public TimeseriesQuery withOverriddenContext(Map<String, Object> contextOverrides) public TimeseriesQuery withOverriddenContext(Map<String, Object> contextOverrides)
{ {
return new TimeseriesQuery( return new TimeseriesQuery(

View File

@ -33,6 +33,7 @@ import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.collections.OrderedMergeSequence; import io.druid.collections.OrderedMergeSequence;
import io.druid.granularity.QueryGranularity; import io.druid.granularity.QueryGranularity;
import io.druid.query.CacheStrategy; import io.druid.query.CacheStrategy;
import io.druid.query.DataSourceUtil;
import io.druid.query.IntervalChunkingQueryRunner; import io.druid.query.IntervalChunkingQueryRunner;
import io.druid.query.Query; import io.druid.query.Query;
import io.druid.query.QueryCacheHelper; import io.druid.query.QueryCacheHelper;
@ -122,7 +123,7 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<Timeser
} }
return new ServiceMetricEvent.Builder() return new ServiceMetricEvent.Builder()
.setUser2(query.getDataSource().toString()) .setUser2(DataSourceUtil.getMetricName(query.getDataSource()))
.setUser4("timeseries") .setUser4("timeseries")
.setUser5(COMMA_JOIN.join(query.getIntervals())) .setUser5(COMMA_JOIN.join(query.getIntervals()))
.setUser6(String.valueOf(query.hasFilters())) .setUser6(String.valueOf(query.hasFilters()))
@ -229,7 +230,11 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<Timeser
@Override @Override
public QueryRunner<Result<TimeseriesResultValue>> preMergeQueryDecoration(QueryRunner<Result<TimeseriesResultValue>> runner) public QueryRunner<Result<TimeseriesResultValue>> preMergeQueryDecoration(QueryRunner<Result<TimeseriesResultValue>> runner)
{ {
return new IntervalChunkingQueryRunner<Result<TimeseriesResultValue>>(runner, config.getChunkPeriod()); return new IntervalChunkingQueryRunner<Result<TimeseriesResultValue>>(
runner,
config.getChunkPeriod()
);
} }
public Ordering<Result<TimeseriesResultValue>> getOrdering() public Ordering<Result<TimeseriesResultValue>> getOrdering()

View File

@ -27,6 +27,7 @@ import io.druid.granularity.QueryGranularity;
import io.druid.query.BaseQuery; import io.druid.query.BaseQuery;
import io.druid.query.DataSource; import io.druid.query.DataSource;
import io.druid.query.Queries; import io.druid.query.Queries;
import io.druid.query.Query;
import io.druid.query.Result; import io.druid.query.Result;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator; import io.druid.query.aggregation.PostAggregator;
@ -162,6 +163,23 @@ public class TopNQuery extends BaseQuery<Result<TopNResultValue>>
); );
} }
@Override
public Query<Result<TopNResultValue>> withDataSource(DataSource dataSource)
{
return new TopNQuery(
dataSource,
dimensionSpec,
topNMetricSpec,
threshold,
getQuerySegmentSpec(),
dimFilter,
granularity,
aggregatorSpecs,
postAggregatorSpecs,
getContext()
);
}
public TopNQuery withThreshold(int threshold) public TopNQuery withThreshold(int threshold)
{ {
return new TopNQuery( return new TopNQuery(

View File

@ -37,6 +37,7 @@ import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.collections.OrderedMergeSequence; import io.druid.collections.OrderedMergeSequence;
import io.druid.granularity.QueryGranularity; import io.druid.granularity.QueryGranularity;
import io.druid.query.CacheStrategy; import io.druid.query.CacheStrategy;
import io.druid.query.DataSourceUtil;
import io.druid.query.IntervalChunkingQueryRunner; import io.druid.query.IntervalChunkingQueryRunner;
import io.druid.query.Query; import io.druid.query.Query;
import io.druid.query.QueryCacheHelper; import io.druid.query.QueryCacheHelper;
@ -54,7 +55,6 @@ import org.joda.time.DateTime;
import org.joda.time.Interval; import org.joda.time.Interval;
import org.joda.time.Minutes; import org.joda.time.Minutes;
import javax.annotation.Nullable;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
@ -82,6 +82,14 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
this.config = config; this.config = config;
} }
private static List<PostAggregator> prunePostAggregators(TopNQuery query)
{
return AggregatorUtil.pruneDependentPostAgg(
query.getPostAggregatorSpecs(),
query.getTopNMetricSpec().getMetricName(query.getDimensionSpec())
);
}
@Override @Override
public QueryRunner<Result<TopNResultValue>> mergeResults(QueryRunner<Result<TopNResultValue>> runner) public QueryRunner<Result<TopNResultValue>> mergeResults(QueryRunner<Result<TopNResultValue>> runner)
{ {
@ -131,7 +139,7 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
} }
return new ServiceMetricEvent.Builder() return new ServiceMetricEvent.Builder()
.setUser2(query.getDataSource().toString()) .setUser2(DataSourceUtil.getMetricName(query.getDataSource()))
.setUser4(String.format("topN/%s/%s", query.getThreshold(), query.getDimensionSpec().getDimension())) .setUser4(String.format("topN/%s/%s", query.getThreshold(), query.getDimensionSpec().getDimension()))
.setUser5(COMMA_JOIN.join(query.getIntervals())) .setUser5(COMMA_JOIN.join(query.getIntervals()))
.setUser6(String.valueOf(query.hasFilters())) .setUser6(String.valueOf(query.hasFilters()))
@ -370,7 +378,10 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
@Override @Override
public QueryRunner<Result<TopNResultValue>> preMergeQueryDecoration(QueryRunner<Result<TopNResultValue>> runner) public QueryRunner<Result<TopNResultValue>> preMergeQueryDecoration(QueryRunner<Result<TopNResultValue>> runner)
{ {
return new IntervalChunkingQueryRunner<Result<TopNResultValue>>(runner, config.getChunkPeriod()); return new IntervalChunkingQueryRunner<Result<TopNResultValue>>(
runner,
config.getChunkPeriod()
);
} }
@Override @Override
@ -468,12 +479,4 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
); );
} }
} }
private static List<PostAggregator> prunePostAggregators(TopNQuery query)
{
return AggregatorUtil.pruneDependentPostAgg(
query.getPostAggregatorSpecs(),
query.getTopNMetricSpec().getMetricName(query.getDimensionSpec())
);
}
} }

View File

@ -85,4 +85,25 @@ public class DataSourceTest
Assert.assertEquals(new QueryDataSource(query), dataSource); Assert.assertEquals(new QueryDataSource(query), dataSource);
} }
@Test
public void testUnionDataSource() throws Exception
{
DataSource dataSource = jsonMapper.readValue(
"{\"type\":\"union\", \"dataSources\":[\"ds1\", \"ds2\"]}",
DataSource.class
);
Assert.assertTrue(dataSource instanceof UnionDataSource);
Assert.assertEquals(
Lists.newArrayList(new TableDataSource("ds1"), new TableDataSource("ds2")),
Lists.newArrayList(((UnionDataSource) dataSource).getDataSources())
);
Assert.assertEquals(
Lists.newArrayList("ds1", "ds2"),
Lists.newArrayList(dataSource.getNames())
);
final DataSource serde = jsonMapper.readValue(jsonMapper.writeValueAsString(dataSource), DataSource.class);
Assert.assertEquals(dataSource, serde);
}
} }

View File

@ -20,6 +20,7 @@
package io.druid.query.metadata; package io.druid.query.metadata;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Iterables;
import io.druid.jackson.DefaultObjectMapper; import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.Query; import io.druid.query.Query;
import io.druid.query.metadata.metadata.SegmentMetadataQuery; import io.druid.query.metadata.metadata.SegmentMetadataQuery;
@ -41,7 +42,7 @@ public class SegmentMetadataQueryTest
+ "}"; + "}";
Query query = mapper.readValue(queryStr, Query.class); Query query = mapper.readValue(queryStr, Query.class);
Assert.assertTrue(query instanceof SegmentMetadataQuery); Assert.assertTrue(query instanceof SegmentMetadataQuery);
Assert.assertEquals("test_ds", query.getDataSource().getName()); Assert.assertEquals("test_ds", Iterables.getOnlyElement(query.getDataSource().getNames()));
Assert.assertEquals(new Interval("2013-12-04T00:00:00.000Z/2013-12-05T00:00:00.000Z"), query.getIntervals().get(0)); Assert.assertEquals(new Interval("2013-12-04T00:00:00.000Z/2013-12-05T00:00:00.000Z"), query.getIntervals().get(0));
// test serialize and deserialize // test serialize and deserialize

View File

@ -20,6 +20,7 @@
package io.druid.client; package io.druid.client;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.collect.Ordering; import com.google.common.collect.Ordering;
import com.google.inject.Inject; import com.google.inject.Inject;
@ -237,17 +238,7 @@ public class BrokerServerView implements TimelineServerView
@Override @Override
public VersionedIntervalTimeline<String, ServerSelector> getTimeline(DataSource dataSource) public VersionedIntervalTimeline<String, ServerSelector> getTimeline(DataSource dataSource)
{ {
String table; String table = Iterables.getOnlyElement(dataSource.getNames());
while (dataSource instanceof QueryDataSource) {
dataSource = ((QueryDataSource) dataSource).getQuery().getDataSource();
}
if (dataSource instanceof TableDataSource) {
table = ((TableDataSource) dataSource).getName();
} else {
throw new UnsupportedOperationException("Unsupported data source type: " + dataSource.getClass().getSimpleName());
}
synchronized (lock) { synchronized (lock) {
return timelines.get(table); return timelines.get(table);
} }

View File

@ -21,6 +21,7 @@ package io.druid.segment.realtime;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.io.Closeables; import com.google.common.io.Closeables;
import com.google.inject.Inject; import com.google.inject.Inject;
@ -126,18 +127,7 @@ public class RealtimeManager implements QuerySegmentWalker
private <T> String getDataSourceName(Query<T> query) private <T> String getDataSourceName(Query<T> query)
{ {
DataSource dataSource = query.getDataSource(); return Iterables.getOnlyElement(query.getDataSource().getNames());
if (!(dataSource instanceof TableDataSource)) {
throw new UnsupportedOperationException("data source type '" + dataSource.getClass().getName() + "' unsupported");
}
String dataSourceName;
try {
dataSourceName = ((TableDataSource) query.getDataSource()).getName();
} catch (ClassCastException e) {
throw new UnsupportedOperationException("Subqueries are only supported in the broker");
}
return dataSourceName;
} }

View File

@ -31,6 +31,7 @@ import com.metamx.http.client.response.HttpResponseHandler;
import io.druid.client.RoutingDruidClient; import io.druid.client.RoutingDruidClient;
import io.druid.guice.annotations.Json; import io.druid.guice.annotations.Json;
import io.druid.guice.annotations.Smile; import io.druid.guice.annotations.Smile;
import io.druid.query.DataSourceUtil;
import io.druid.query.Query; import io.druid.query.Query;
import io.druid.server.log.RequestLogger; import io.druid.server.log.RequestLogger;
import io.druid.server.router.QueryHostFinder; import io.druid.server.router.QueryHostFinder;
@ -282,7 +283,7 @@ public class AsyncQueryForwardingServlet extends HttpServlet
emitter.emit( emitter.emit(
new ServiceMetricEvent.Builder() new ServiceMetricEvent.Builder()
.setUser2(theQuery.getDataSource().getName()) .setUser2(DataSourceUtil.getMetricName(theQuery.getDataSource()))
.setUser4(theQuery.getType()) .setUser4(theQuery.getType())
.setUser5(COMMA_JOIN.join(theQuery.getIntervals())) .setUser5(COMMA_JOIN.join(theQuery.getIntervals()))
.setUser6(String.valueOf(theQuery.hasFilters())) .setUser6(String.valueOf(theQuery.hasFilters()))

View File

@ -32,12 +32,13 @@ import io.druid.query.QuerySegmentWalker;
import io.druid.query.QueryToolChest; import io.druid.query.QueryToolChest;
import io.druid.query.QueryToolChestWarehouse; import io.druid.query.QueryToolChestWarehouse;
import io.druid.query.SegmentDescriptor; import io.druid.query.SegmentDescriptor;
import io.druid.query.UnionQueryRunner;
import org.joda.time.Interval; import org.joda.time.Interval;
import javax.annotation.Nullable; import javax.annotation.Nullable;
/** /**
*/ */
public class ClientQuerySegmentWalker implements QuerySegmentWalker public class ClientQuerySegmentWalker implements QuerySegmentWalker
{ {
private final ServiceEmitter emitter; private final ServiceEmitter emitter;
@ -70,22 +71,24 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
private <T> FinalizeResultsQueryRunner<T> makeRunner(final Query<T> query) private <T> FinalizeResultsQueryRunner<T> makeRunner(final Query<T> query)
{ {
final QueryToolChest<T,Query<T>> toolChest = warehouse.getToolChest(query); final QueryToolChest<T, Query<T>> toolChest = warehouse.getToolChest(query);
return new FinalizeResultsQueryRunner<T>( return new FinalizeResultsQueryRunner<T>(
toolChest.postMergeQueryDecoration( toolChest.postMergeQueryDecoration(
toolChest.mergeResults( toolChest.mergeResults(
new MetricsEmittingQueryRunner<T>( new UnionQueryRunner<T>(
emitter, new MetricsEmittingQueryRunner<T>(
new Function<Query<T>, ServiceMetricEvent.Builder>() emitter,
{ new Function<Query<T>, ServiceMetricEvent.Builder>()
@Override {
public ServiceMetricEvent.Builder apply(@Nullable Query<T> input) @Override
{ public ServiceMetricEvent.Builder apply(@Nullable Query<T> input)
return toolChest.makeMetricBuilder(query); {
} return toolChest.makeMetricBuilder(query);
}, }
toolChest.preMergeQueryDecoration(baseClient) },
).withWaitMeasuredFromNow() toolChest.preMergeQueryDecoration(baseClient)
).withWaitMeasuredFromNow()
)
) )
), ),
toolChest toolChest

View File

@ -34,6 +34,7 @@ import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent; import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.guice.annotations.Json; import io.druid.guice.annotations.Json;
import io.druid.guice.annotations.Smile; import io.druid.guice.annotations.Smile;
import io.druid.query.DataSourceUtil;
import io.druid.query.Query; import io.druid.query.Query;
import io.druid.query.QuerySegmentWalker; import io.druid.query.QuerySegmentWalker;
import io.druid.server.log.RequestLogger; import io.druid.server.log.RequestLogger;
@ -131,7 +132,7 @@ public class QueryResource
emitter.emit( emitter.emit(
new ServiceMetricEvent.Builder() new ServiceMetricEvent.Builder()
.setUser2(query.getDataSource().toString()) .setUser2(DataSourceUtil.getMetricName(query.getDataSource()))
.setUser4(query.getType()) .setUser4(query.getType())
.setUser5(COMMA_JOIN.join(query.getIntervals())) .setUser5(COMMA_JOIN.join(query.getIntervals()))
.setUser6(String.valueOf(query.hasFilters())) .setUser6(String.valueOf(query.hasFilters()))

View File

@ -21,7 +21,9 @@ package io.druid.server.coordination;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicates; import com.google.common.base.Predicates;
import com.google.common.collect.Iterables;
import com.google.common.collect.Ordering; import com.google.common.collect.Ordering;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.metamx.common.ISE; import com.metamx.common.ISE;
@ -252,14 +254,7 @@ public class ServerManager implements QuerySegmentWalker
if (!(dataSource instanceof TableDataSource)) { if (!(dataSource instanceof TableDataSource)) {
throw new UnsupportedOperationException("data source type '" + dataSource.getClass().getName() + "' unsupported"); throw new UnsupportedOperationException("data source type '" + dataSource.getClass().getName() + "' unsupported");
} }
String dataSourceName = getDataSourceName(dataSource);
String dataSourceName;
try {
dataSourceName = ((TableDataSource) query.getDataSource()).getName();
}
catch (ClassCastException e) {
throw new UnsupportedOperationException("Subqueries are only supported in the broker");
}
final VersionedIntervalTimeline<String, ReferenceCountingSegment> timeline = dataSources.get(dataSourceName); final VersionedIntervalTimeline<String, ReferenceCountingSegment> timeline = dataSources.get(dataSourceName);
@ -325,6 +320,11 @@ public class ServerManager implements QuerySegmentWalker
return new FinalizeResultsQueryRunner<T>(toolChest.mergeResults(factory.mergeRunners(exec, adapters)), toolChest); return new FinalizeResultsQueryRunner<T>(toolChest.mergeResults(factory.mergeRunners(exec, adapters)), toolChest);
} }
private String getDataSourceName(DataSource dataSource)
{
return Iterables.getOnlyElement(dataSource.getNames());
}
@Override @Override
public <T> QueryRunner<T> getQueryRunnerForSegments(Query<T> query, Iterable<SegmentDescriptor> specs) public <T> QueryRunner<T> getQueryRunnerForSegments(Query<T> query, Iterable<SegmentDescriptor> specs)
{ {
@ -338,13 +338,7 @@ public class ServerManager implements QuerySegmentWalker
final QueryToolChest<T, Query<T>> toolChest = factory.getToolchest(); final QueryToolChest<T, Query<T>> toolChest = factory.getToolchest();
String dataSourceName; String dataSourceName = getDataSourceName(query.getDataSource());
try {
dataSourceName = ((TableDataSource) query.getDataSource()).getName();
}
catch (ClassCastException e) {
throw new UnsupportedOperationException("Subqueries are only supported in the broker");
}
final VersionedIntervalTimeline<String, ReferenceCountingSegment> timeline = dataSources.get(dataSourceName); final VersionedIntervalTimeline<String, ReferenceCountingSegment> timeline = dataSources.get(dataSourceName);

View File

@ -137,7 +137,8 @@ public class TieredBrokerHostSelector<T> implements HostSelector<T>
} }
if (brokerServiceName == null) { if (brokerServiceName == null) {
List<Rule> rules = ruleManager.getRulesWithDefault((query.getDataSource()).getName()); // For Union Queries tier will be selected on the rules for first dataSource.
List<Rule> rules = ruleManager.getRulesWithDefault(Iterables.getFirst(query.getDataSource().getNames(), null));
// find the rule that can apply to the entire set of intervals // find the rule that can apply to the entire set of intervals
DateTime now = new DateTime(); DateTime now = new DateTime();