diff --git a/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java b/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java index 1e2b6b4601d..d72c97594f4 100644 --- a/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java +++ b/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java @@ -19,6 +19,7 @@ package io.druid.query; +import com.google.common.base.Function; import com.google.common.collect.Lists; import io.druid.granularity.QueryGranularity; import io.druid.query.aggregation.AggregatorFactory; @@ -41,6 +42,7 @@ import io.druid.segment.incremental.IncrementalIndex; import org.joda.time.DateTime; import org.joda.time.Interval; +import javax.annotation.Nullable; import java.io.IOException; import java.util.Arrays; import java.util.Collection; @@ -52,6 +54,19 @@ public class QueryRunnerTestHelper { public static final String segmentId = "testSegment"; public static final String dataSource = "testing"; + public static final UnionDataSource unionDataSource = new UnionDataSource( + Lists.transform( + Lists.newArrayList(dataSource, dataSource, dataSource, dataSource), new Function() + { + @Nullable + @Override + public TableDataSource apply(@Nullable String input) + { + return new TableDataSource(input); + } + } + ) + ); public static final QueryGranularity dayGran = QueryGranularity.DAY; public static final QueryGranularity allGran = QueryGranularity.ALL; public static final String providerDimension = "proVider"; @@ -165,6 +180,30 @@ public class QueryRunnerTestHelper ); } + @SuppressWarnings("unchecked") + public static Collection makeUnionQueryRunners( + QueryRunnerFactory factory + ) + throws IOException + { + final IncrementalIndex rtIndex = TestIndex.getIncrementalTestIndex(); + final QueryableIndex mMappedTestIndex = TestIndex.getMMappedTestIndex(); + final QueryableIndex mergedRealtimeIndex = TestIndex.mergedRealtimeIndex(); + return Arrays.asList( + new Object[][]{ + { + makeUnionQueryRunner(factory, new IncrementalIndexSegment(rtIndex, segmentId)) + }, + { + makeUnionQueryRunner(factory, new QueryableIndexSegment(segmentId, mMappedTestIndex)) + }, + { + makeUnionQueryRunner(factory, new QueryableIndexSegment(segmentId, mergedRealtimeIndex)) + } + } + ); + } + public static QueryRunner makeQueryRunner( QueryRunnerFactory> factory, Segment adapter @@ -178,4 +217,25 @@ public class QueryRunnerTestHelper factory.getToolchest() ); } + + public static QueryRunner makeUnionQueryRunner( + QueryRunnerFactory> factory, + Segment adapter + ) + { + return new FinalizeResultsQueryRunner( + factory.getToolchest().postMergeQueryDecoration( + factory.getToolchest().mergeResults( + new UnionQueryRunner( + new BySegmentQueryRunner( + segmentId, adapter.getDataInterval().getStart(), + factory.createRunner(adapter) + ), + factory.getToolchest() + ) + ) + ), + factory.getToolchest() + ); + } } diff --git a/processing/src/test/java/io/druid/query/timeseries/TimeSeriesUnionQueryRunnerTest.java b/processing/src/test/java/io/druid/query/timeseries/TimeSeriesUnionQueryRunnerTest.java new file mode 100644 index 00000000000..e56f53b9e55 --- /dev/null +++ b/processing/src/test/java/io/druid/query/timeseries/TimeSeriesUnionQueryRunnerTest.java @@ -0,0 +1,88 @@ + +package io.druid.query.timeseries; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.metamx.common.guava.Sequences; +import io.druid.query.Druids; +import io.druid.query.QueryRunner; +import io.druid.query.QueryRunnerTestHelper; +import io.druid.query.Result; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.LongSumAggregatorFactory; +import io.druid.segment.TestHelper; +import org.joda.time.DateTime; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; + +@RunWith(Parameterized.class) +public class TimeSeriesUnionQueryRunnerTest +{ + @Parameterized.Parameters + public static Collection constructorFeeder() throws IOException + { + return QueryRunnerTestHelper.makeUnionQueryRunners( + TimeseriesQueryRunnerFactory.create() + ); + } + + private final QueryRunner runner; + + public TimeSeriesUnionQueryRunnerTest( + QueryRunner runner + ) + { + this.runner = runner; + } + + @Test + public void testUnionTimeseries() + { + TimeseriesQuery query = Druids.newTimeseriesQueryBuilder() + .dataSource(QueryRunnerTestHelper.unionDataSource) + .granularity(QueryRunnerTestHelper.dayGran) + .intervals(QueryRunnerTestHelper.firstToThird) + .aggregators( + Arrays.asList( + QueryRunnerTestHelper.rowsCount, + new LongSumAggregatorFactory( + "idx", + "index" + ), + QueryRunnerTestHelper.qualityUniques + ) + ) + .build(); + + List> expectedResults = Arrays.asList( + new Result( + new DateTime("2011-04-01"), + new TimeseriesResultValue( + ImmutableMap.of("rows", 52L, "idx", 26476L, "uniques", QueryRunnerTestHelper.UNIQUES_9) + ) + ), + new Result( + new DateTime("2011-04-02"), + new TimeseriesResultValue( + ImmutableMap.of("rows", 52L, "idx", 23308L, "uniques", QueryRunnerTestHelper.UNIQUES_9) + ) + ) + ); + + Iterable> results = Sequences.toList( + runner.run(query), + Lists.>newArrayList() + ); + + TestHelper.assertExpectedResults(expectedResults, results); + } + + + +} diff --git a/processing/src/test/java/io/druid/query/topn/TopNUnionQueryTest.java b/processing/src/test/java/io/druid/query/topn/TopNUnionQueryTest.java new file mode 100644 index 00000000000..1fdc3b11cf5 --- /dev/null +++ b/processing/src/test/java/io/druid/query/topn/TopNUnionQueryTest.java @@ -0,0 +1,179 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.query.topn; + +import com.google.common.base.Supplier; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import io.druid.collections.StupidPool; +import io.druid.query.QueryRunner; +import io.druid.query.QueryRunnerTestHelper; +import io.druid.query.Result; +import io.druid.query.TestQueryRunners; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.MaxAggregatorFactory; +import io.druid.query.aggregation.MinAggregatorFactory; +import io.druid.query.aggregation.PostAggregator; +import io.druid.segment.TestHelper; +import org.joda.time.DateTime; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +@RunWith(Parameterized.class) +public class TopNUnionQueryTest +{ + private final QueryRunner runner; + + public TopNUnionQueryTest( + QueryRunner runner + ) + { + this.runner = runner; + } + + @Parameterized.Parameters + public static Collection constructorFeeder() throws IOException + { + List retVal = Lists.newArrayList(); + retVal.addAll( + QueryRunnerTestHelper.makeUnionQueryRunners( + new TopNQueryRunnerFactory( + TestQueryRunners.getPool(), + new TopNQueryQueryToolChest(new TopNQueryConfig()) + ) + ) + ); + retVal.addAll( + QueryRunnerTestHelper.makeUnionQueryRunners( + new TopNQueryRunnerFactory( + new StupidPool( + new Supplier() + { + @Override + public ByteBuffer get() + { + return ByteBuffer.allocate(2000); + } + } + ), + new TopNQueryQueryToolChest(new TopNQueryConfig()) + ) + ) + ); + + return retVal; + } + + @Test + public void testTopNUnionQuery() + { + TopNQuery query = new TopNQueryBuilder() + .dataSource(QueryRunnerTestHelper.unionDataSource) + .granularity(QueryRunnerTestHelper.allGran) + .dimension(QueryRunnerTestHelper.providerDimension) + .metric(QueryRunnerTestHelper.dependentPostAggMetric) + .threshold(4) + .intervals(QueryRunnerTestHelper.fullOnInterval) + .aggregators( + Lists.newArrayList( + Iterables.concat( + QueryRunnerTestHelper.commonAggregators, + Lists.newArrayList( + new MaxAggregatorFactory("maxIndex", "index"), + new MinAggregatorFactory("minIndex", "index") + ) + ) + ) + ) + .postAggregators( + Arrays.asList( + QueryRunnerTestHelper.addRowsIndexConstant, + QueryRunnerTestHelper.dependentPostAgg, + QueryRunnerTestHelper.hyperUniqueFinalizingPostAgg + ) + ) + .build(); + + List> expectedResults = Arrays.asList( + new Result( + new DateTime("2011-01-12T00:00:00.000Z"), + new TopNResultValue( + Arrays.>asList( + ImmutableMap.builder() + .put(QueryRunnerTestHelper.providerDimension, "total_market") + .put("rows", 744L) + .put("index", 862719.3151855469D) + .put("addRowsIndexConstant", 863464.3151855469D) + .put(QueryRunnerTestHelper.dependentPostAggMetric, 864209.3151855469D) + .put("uniques", QueryRunnerTestHelper.UNIQUES_2) + .put("maxIndex", 1743.9217529296875D) + .put("minIndex", 792.3260498046875D) + .put( + QueryRunnerTestHelper.hyperUniqueFinalizingPostAggMetric, + QueryRunnerTestHelper.UNIQUES_2 + 1.0 + ) + .build(), + ImmutableMap.builder() + .put(QueryRunnerTestHelper.providerDimension, "upfront") + .put("rows", 744L) + .put("index", 768184.4240722656D) + .put("addRowsIndexConstant", 768929.4240722656D) + .put(QueryRunnerTestHelper.dependentPostAggMetric, 769674.4240722656D) + .put("uniques", QueryRunnerTestHelper.UNIQUES_2) + .put("maxIndex", 1870.06103515625D) + .put("minIndex", 545.9906005859375D) + .put( + QueryRunnerTestHelper.hyperUniqueFinalizingPostAggMetric, + QueryRunnerTestHelper.UNIQUES_2 + 1.0 + ) + .build(), + ImmutableMap.builder() + .put(QueryRunnerTestHelper.providerDimension, "spot") + .put("rows", 3348L) + .put("index", 382426.28929138184D) + .put("addRowsIndexConstant", 385775.28929138184D) + .put(QueryRunnerTestHelper.dependentPostAggMetric, 389124.28929138184D) + .put("uniques", QueryRunnerTestHelper.UNIQUES_9) + .put( + QueryRunnerTestHelper.hyperUniqueFinalizingPostAggMetric, + QueryRunnerTestHelper.UNIQUES_9 + 1.0 + ) + .put("maxIndex", 277.2735290527344D) + .put("minIndex", 59.02102279663086D) + .build() + ) + ) + ) + ); + + TestHelper.assertExpectedResults(expectedResults, runner.run(query)); + } + + +}