From ddc36c89cc5a53f6a201c43cbf839f5c960b21a2 Mon Sep 17 00:00:00 2001 From: nishantmonu51 Date: Thu, 15 May 2014 23:10:40 +0530 Subject: [PATCH 1/3] union query fix and a broken test --- .../collections/OrderedMergeSequenceTest.java | 67 ++++++++++++++++--- .../java/io/druid/query/UnionQueryRunner.java | 39 ++++++----- .../server/ClientQuerySegmentWalker.java | 3 +- 3 files changed, 84 insertions(+), 25 deletions(-) diff --git a/common/src/test/java/io/druid/collections/OrderedMergeSequenceTest.java b/common/src/test/java/io/druid/collections/OrderedMergeSequenceTest.java index 257043ca0d8..073dcfd4375 100644 --- a/common/src/test/java/io/druid/collections/OrderedMergeSequenceTest.java +++ b/common/src/test/java/io/druid/collections/OrderedMergeSequenceTest.java @@ -23,6 +23,7 @@ import com.google.common.base.Function; import com.google.common.collect.Lists; import com.google.common.collect.Ordering; import com.metamx.common.guava.BaseSequence; +import com.metamx.common.guava.MergeSequence; import com.metamx.common.guava.Sequence; import com.metamx.common.guava.SequenceTestHelper; import com.metamx.common.guava.Sequences; @@ -230,17 +231,65 @@ public class OrderedMergeSequenceTest ordering, Sequences.simple( Lists.transform( // OMG WTF, the java type system really annoys me at times... - seqs, - new Function, Sequence>() - { - @Override - public Sequence apply(@Nullable TestSequence input) - { - return input; - } - } + seqs, + new Function, Sequence>() + { + @Override + public Sequence apply(@Nullable TestSequence input) + { + return input; + } + } ) ) ); } + + private MergeSequence makeUnorderedMergedSequence( + Ordering ordering, + List> seqs + ) + { + return new MergeSequence( + ordering, + Sequences.simple( + Lists.transform( // OMG WTF, the java type system really annoys me at times... + seqs, + new Function, Sequence>() + { + @Override + public Sequence apply(@Nullable TestSequence input) + { + return input; + } + } + ) + ) + ); + } + + @Test + public void testHierarchicalMerge() throws Exception + { + final Sequence seq1 = makeUnorderedMergedSequence( + Ordering.natural(), Lists.newArrayList( + TestSequence.create(1) + ) + ); + + + final Sequence seq2 = makeUnorderedMergedSequence( + Ordering.natural(), Lists.newArrayList( + TestSequence.create(1) + ) + ); + final OrderedMergeSequence finalMerged = new OrderedMergeSequence( + Ordering.natural(), + Sequences.simple( + Lists.>newArrayList(seq1, seq2) + ) + ); + + SequenceTestHelper.testAll(finalMerged, Arrays.asList(1, 1)); + } } diff --git a/processing/src/main/java/io/druid/query/UnionQueryRunner.java b/processing/src/main/java/io/druid/query/UnionQueryRunner.java index 0e3088c4847..6679e6d7c77 100644 --- a/processing/src/main/java/io/druid/query/UnionQueryRunner.java +++ b/processing/src/main/java/io/druid/query/UnionQueryRunner.java @@ -22,17 +22,24 @@ package io.druid.query; import com.google.common.base.Function; -import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; +import java.util.ArrayList; + public class UnionQueryRunner implements QueryRunner { private final QueryRunner baseRunner; + private final QueryToolChest> toolChest; - public UnionQueryRunner(QueryRunner baseRunner) + public UnionQueryRunner( + QueryRunner baseRunner, + QueryToolChest> toolChest + ) { this.baseRunner = baseRunner; + this.toolChest = toolChest; } @Override @@ -40,19 +47,21 @@ public class UnionQueryRunner implements QueryRunner { DataSource dataSource = query.getDataSource(); if (dataSource instanceof UnionDataSource) { - return Sequences.concat( - Iterables.transform( - ((UnionDataSource) dataSource).getDataSources(), - new Function>() - { - @Override - public Sequence apply(DataSource singleSource) - { - return baseRunner.run( - query.withDataSource(singleSource) - ); - } - } + return toolChest.mergeSequences( + Sequences.simple( + Lists.transform( + ((UnionDataSource) dataSource).getDataSources(), + new Function>() + { + @Override + public Sequence apply(DataSource singleSource) + { + return baseRunner.run( + query.withDataSource(singleSource) + ); + } + } + ) ) ); } else { diff --git a/server/src/main/java/io/druid/server/ClientQuerySegmentWalker.java b/server/src/main/java/io/druid/server/ClientQuerySegmentWalker.java index d500c9f70e2..f0e8aa01153 100644 --- a/server/src/main/java/io/druid/server/ClientQuerySegmentWalker.java +++ b/server/src/main/java/io/druid/server/ClientQuerySegmentWalker.java @@ -87,7 +87,8 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker } }, toolChest.preMergeQueryDecoration(baseClient) - ).withWaitMeasuredFromNow() + ).withWaitMeasuredFromNow(), + toolChest ) ) ), From e06b29edb9be870ea5d0611e87d0685a8415580f Mon Sep 17 00:00:00 2001 From: nishantmonu51 Date: Fri, 16 May 2014 03:15:35 +0530 Subject: [PATCH 2/3] add sanity unit tests --- .../io/druid/query/QueryRunnerTestHelper.java | 60 ++++++ .../TimeSeriesUnionQueryRunnerTest.java | 88 +++++++++ .../druid/query/topn/TopNUnionQueryTest.java | 179 ++++++++++++++++++ 3 files changed, 327 insertions(+) create mode 100644 processing/src/test/java/io/druid/query/timeseries/TimeSeriesUnionQueryRunnerTest.java create mode 100644 processing/src/test/java/io/druid/query/topn/TopNUnionQueryTest.java diff --git a/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java b/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java index 1e2b6b4601d..d72c97594f4 100644 --- a/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java +++ b/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java @@ -19,6 +19,7 @@ package io.druid.query; +import com.google.common.base.Function; import com.google.common.collect.Lists; import io.druid.granularity.QueryGranularity; import io.druid.query.aggregation.AggregatorFactory; @@ -41,6 +42,7 @@ import io.druid.segment.incremental.IncrementalIndex; import org.joda.time.DateTime; import org.joda.time.Interval; +import javax.annotation.Nullable; import java.io.IOException; import java.util.Arrays; import java.util.Collection; @@ -52,6 +54,19 @@ public class QueryRunnerTestHelper { public static final String segmentId = "testSegment"; public static final String dataSource = "testing"; + public static final UnionDataSource unionDataSource = new UnionDataSource( + Lists.transform( + Lists.newArrayList(dataSource, dataSource, dataSource, dataSource), new Function() + { + @Nullable + @Override + public TableDataSource apply(@Nullable String input) + { + return new TableDataSource(input); + } + } + ) + ); public static final QueryGranularity dayGran = QueryGranularity.DAY; public static final QueryGranularity allGran = QueryGranularity.ALL; public static final String providerDimension = "proVider"; @@ -165,6 +180,30 @@ public class QueryRunnerTestHelper ); } + @SuppressWarnings("unchecked") + public static Collection makeUnionQueryRunners( + QueryRunnerFactory factory + ) + throws IOException + { + final IncrementalIndex rtIndex = TestIndex.getIncrementalTestIndex(); + final QueryableIndex mMappedTestIndex = TestIndex.getMMappedTestIndex(); + final QueryableIndex mergedRealtimeIndex = TestIndex.mergedRealtimeIndex(); + return Arrays.asList( + new Object[][]{ + { + makeUnionQueryRunner(factory, new IncrementalIndexSegment(rtIndex, segmentId)) + }, + { + makeUnionQueryRunner(factory, new QueryableIndexSegment(segmentId, mMappedTestIndex)) + }, + { + makeUnionQueryRunner(factory, new QueryableIndexSegment(segmentId, mergedRealtimeIndex)) + } + } + ); + } + public static QueryRunner makeQueryRunner( QueryRunnerFactory> factory, Segment adapter @@ -178,4 +217,25 @@ public class QueryRunnerTestHelper factory.getToolchest() ); } + + public static QueryRunner makeUnionQueryRunner( + QueryRunnerFactory> factory, + Segment adapter + ) + { + return new FinalizeResultsQueryRunner( + factory.getToolchest().postMergeQueryDecoration( + factory.getToolchest().mergeResults( + new UnionQueryRunner( + new BySegmentQueryRunner( + segmentId, adapter.getDataInterval().getStart(), + factory.createRunner(adapter) + ), + factory.getToolchest() + ) + ) + ), + factory.getToolchest() + ); + } } diff --git a/processing/src/test/java/io/druid/query/timeseries/TimeSeriesUnionQueryRunnerTest.java b/processing/src/test/java/io/druid/query/timeseries/TimeSeriesUnionQueryRunnerTest.java new file mode 100644 index 00000000000..e56f53b9e55 --- /dev/null +++ b/processing/src/test/java/io/druid/query/timeseries/TimeSeriesUnionQueryRunnerTest.java @@ -0,0 +1,88 @@ + +package io.druid.query.timeseries; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.metamx.common.guava.Sequences; +import io.druid.query.Druids; +import io.druid.query.QueryRunner; +import io.druid.query.QueryRunnerTestHelper; +import io.druid.query.Result; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.LongSumAggregatorFactory; +import io.druid.segment.TestHelper; +import org.joda.time.DateTime; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; + +@RunWith(Parameterized.class) +public class TimeSeriesUnionQueryRunnerTest +{ + @Parameterized.Parameters + public static Collection constructorFeeder() throws IOException + { + return QueryRunnerTestHelper.makeUnionQueryRunners( + TimeseriesQueryRunnerFactory.create() + ); + } + + private final QueryRunner runner; + + public TimeSeriesUnionQueryRunnerTest( + QueryRunner runner + ) + { + this.runner = runner; + } + + @Test + public void testUnionTimeseries() + { + TimeseriesQuery query = Druids.newTimeseriesQueryBuilder() + .dataSource(QueryRunnerTestHelper.unionDataSource) + .granularity(QueryRunnerTestHelper.dayGran) + .intervals(QueryRunnerTestHelper.firstToThird) + .aggregators( + Arrays.asList( + QueryRunnerTestHelper.rowsCount, + new LongSumAggregatorFactory( + "idx", + "index" + ), + QueryRunnerTestHelper.qualityUniques + ) + ) + .build(); + + List> expectedResults = Arrays.asList( + new Result( + new DateTime("2011-04-01"), + new TimeseriesResultValue( + ImmutableMap.of("rows", 52L, "idx", 26476L, "uniques", QueryRunnerTestHelper.UNIQUES_9) + ) + ), + new Result( + new DateTime("2011-04-02"), + new TimeseriesResultValue( + ImmutableMap.of("rows", 52L, "idx", 23308L, "uniques", QueryRunnerTestHelper.UNIQUES_9) + ) + ) + ); + + Iterable> results = Sequences.toList( + runner.run(query), + Lists.>newArrayList() + ); + + TestHelper.assertExpectedResults(expectedResults, results); + } + + + +} diff --git a/processing/src/test/java/io/druid/query/topn/TopNUnionQueryTest.java b/processing/src/test/java/io/druid/query/topn/TopNUnionQueryTest.java new file mode 100644 index 00000000000..1fdc3b11cf5 --- /dev/null +++ b/processing/src/test/java/io/druid/query/topn/TopNUnionQueryTest.java @@ -0,0 +1,179 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.query.topn; + +import com.google.common.base.Supplier; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import io.druid.collections.StupidPool; +import io.druid.query.QueryRunner; +import io.druid.query.QueryRunnerTestHelper; +import io.druid.query.Result; +import io.druid.query.TestQueryRunners; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.MaxAggregatorFactory; +import io.druid.query.aggregation.MinAggregatorFactory; +import io.druid.query.aggregation.PostAggregator; +import io.druid.segment.TestHelper; +import org.joda.time.DateTime; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +@RunWith(Parameterized.class) +public class TopNUnionQueryTest +{ + private final QueryRunner runner; + + public TopNUnionQueryTest( + QueryRunner runner + ) + { + this.runner = runner; + } + + @Parameterized.Parameters + public static Collection constructorFeeder() throws IOException + { + List retVal = Lists.newArrayList(); + retVal.addAll( + QueryRunnerTestHelper.makeUnionQueryRunners( + new TopNQueryRunnerFactory( + TestQueryRunners.getPool(), + new TopNQueryQueryToolChest(new TopNQueryConfig()) + ) + ) + ); + retVal.addAll( + QueryRunnerTestHelper.makeUnionQueryRunners( + new TopNQueryRunnerFactory( + new StupidPool( + new Supplier() + { + @Override + public ByteBuffer get() + { + return ByteBuffer.allocate(2000); + } + } + ), + new TopNQueryQueryToolChest(new TopNQueryConfig()) + ) + ) + ); + + return retVal; + } + + @Test + public void testTopNUnionQuery() + { + TopNQuery query = new TopNQueryBuilder() + .dataSource(QueryRunnerTestHelper.unionDataSource) + .granularity(QueryRunnerTestHelper.allGran) + .dimension(QueryRunnerTestHelper.providerDimension) + .metric(QueryRunnerTestHelper.dependentPostAggMetric) + .threshold(4) + .intervals(QueryRunnerTestHelper.fullOnInterval) + .aggregators( + Lists.newArrayList( + Iterables.concat( + QueryRunnerTestHelper.commonAggregators, + Lists.newArrayList( + new MaxAggregatorFactory("maxIndex", "index"), + new MinAggregatorFactory("minIndex", "index") + ) + ) + ) + ) + .postAggregators( + Arrays.asList( + QueryRunnerTestHelper.addRowsIndexConstant, + QueryRunnerTestHelper.dependentPostAgg, + QueryRunnerTestHelper.hyperUniqueFinalizingPostAgg + ) + ) + .build(); + + List> expectedResults = Arrays.asList( + new Result( + new DateTime("2011-01-12T00:00:00.000Z"), + new TopNResultValue( + Arrays.>asList( + ImmutableMap.builder() + .put(QueryRunnerTestHelper.providerDimension, "total_market") + .put("rows", 744L) + .put("index", 862719.3151855469D) + .put("addRowsIndexConstant", 863464.3151855469D) + .put(QueryRunnerTestHelper.dependentPostAggMetric, 864209.3151855469D) + .put("uniques", QueryRunnerTestHelper.UNIQUES_2) + .put("maxIndex", 1743.9217529296875D) + .put("minIndex", 792.3260498046875D) + .put( + QueryRunnerTestHelper.hyperUniqueFinalizingPostAggMetric, + QueryRunnerTestHelper.UNIQUES_2 + 1.0 + ) + .build(), + ImmutableMap.builder() + .put(QueryRunnerTestHelper.providerDimension, "upfront") + .put("rows", 744L) + .put("index", 768184.4240722656D) + .put("addRowsIndexConstant", 768929.4240722656D) + .put(QueryRunnerTestHelper.dependentPostAggMetric, 769674.4240722656D) + .put("uniques", QueryRunnerTestHelper.UNIQUES_2) + .put("maxIndex", 1870.06103515625D) + .put("minIndex", 545.9906005859375D) + .put( + QueryRunnerTestHelper.hyperUniqueFinalizingPostAggMetric, + QueryRunnerTestHelper.UNIQUES_2 + 1.0 + ) + .build(), + ImmutableMap.builder() + .put(QueryRunnerTestHelper.providerDimension, "spot") + .put("rows", 3348L) + .put("index", 382426.28929138184D) + .put("addRowsIndexConstant", 385775.28929138184D) + .put(QueryRunnerTestHelper.dependentPostAggMetric, 389124.28929138184D) + .put("uniques", QueryRunnerTestHelper.UNIQUES_9) + .put( + QueryRunnerTestHelper.hyperUniqueFinalizingPostAggMetric, + QueryRunnerTestHelper.UNIQUES_9 + 1.0 + ) + .put("maxIndex", 277.2735290527344D) + .put("minIndex", 59.02102279663086D) + .build() + ) + ) + ) + ); + + TestHelper.assertExpectedResults(expectedResults, runner.run(query)); + } + + +} From ce3d461e7d0e752d6d24ff5cec675f26232b2099 Mon Sep 17 00:00:00 2001 From: nishantmonu51 Date: Fri, 16 May 2014 04:30:58 +0530 Subject: [PATCH 3/3] add header --- .../TimeSeriesUnionQueryRunnerTest.java | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/processing/src/test/java/io/druid/query/timeseries/TimeSeriesUnionQueryRunnerTest.java b/processing/src/test/java/io/druid/query/timeseries/TimeSeriesUnionQueryRunnerTest.java index e56f53b9e55..36e1fc13955 100644 --- a/processing/src/test/java/io/druid/query/timeseries/TimeSeriesUnionQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/timeseries/TimeSeriesUnionQueryRunnerTest.java @@ -1,3 +1,21 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ package io.druid.query.timeseries;