diff --git a/processing/src/main/java/io/druid/query/QueryToolChest.java b/processing/src/main/java/io/druid/query/QueryToolChest.java index 7d42eda6792..fb8c73bb4c6 100644 --- a/processing/src/main/java/io/druid/query/QueryToolChest.java +++ b/processing/src/main/java/io/druid/query/QueryToolChest.java @@ -46,6 +46,9 @@ public abstract class QueryToolChest mergeSequences(Sequence> seqOfSequences); + public abstract Sequence mergeSequencesUnordered(Sequence> seqOfSequences); + + public abstract ServiceMetricEvent.Builder makeMetricBuilder(QueryType query); public abstract Function makePreComputeManipulatorFn( diff --git a/processing/src/main/java/io/druid/query/UnionQueryRunner.java b/processing/src/main/java/io/druid/query/UnionQueryRunner.java index 2610cdfc20e..f14bb180f62 100644 --- a/processing/src/main/java/io/druid/query/UnionQueryRunner.java +++ b/processing/src/main/java/io/druid/query/UnionQueryRunner.java @@ -45,7 +45,7 @@ public class UnionQueryRunner implements QueryRunner { DataSource dataSource = query.getDataSource(); if (dataSource instanceof UnionDataSource) { - return toolChest.mergeSequences( + return toolChest.mergeSequencesUnordered( Sequences.simple( Lists.transform( ((UnionDataSource) dataSource).getDataSources(), diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java index 123bd230287..e2ddc18da5c 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -193,7 +193,18 @@ public class GroupByQueryQueryToolChest extends QueryToolChest mergeSequences(Sequence> seqOfSequences) { - return new OrderedMergeSequence<>(Ordering.natural().nullsFirst(), seqOfSequences); + return new OrderedMergeSequence<>(getOrdering(), seqOfSequences); + } + + @Override + public Sequence mergeSequencesUnordered(Sequence> seqOfSequences) + { + return new MergeSequence<>(getOrdering(), seqOfSequences); + } + + private Ordering getOrdering() + { + return Ordering.natural().nullsFirst(); } @Override @@ -372,7 +383,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest mergeSequences(Sequence> seqOfSequences) { - return new MergeSequence<>(Ordering.natural().nullsFirst(), seqOfSequences); + return new MergeSequence<>(getOrdering(), seqOfSequences); } }; } diff --git a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java index f21665b1a38..21f02145988 100644 --- a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java @@ -146,7 +146,13 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest mergeSequences(Sequence> seqOfSequences) { - return new OrderedMergeSequence(getOrdering(), seqOfSequences); + return new OrderedMergeSequence<>(getOrdering(), seqOfSequences); + } + + @Override + public Sequence mergeSequencesUnordered(Sequence> seqOfSequences) + { + return new MergeSequence<>(getOrdering(), seqOfSequences); } @Override diff --git a/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java index e24239f8896..558d7ee3f13 100644 --- a/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java @@ -111,7 +111,13 @@ public class SearchQueryQueryToolChest extends QueryToolChest> mergeSequences(Sequence>> seqOfSequences) { - return new OrderedMergeSequence>(getOrdering(), seqOfSequences); + return new OrderedMergeSequence<>(getOrdering(), seqOfSequences); + } + + @Override + public Sequence> mergeSequencesUnordered(Sequence>> seqOfSequences) + { + return new MergeSequence<>(getOrdering(), seqOfSequences); } @Override diff --git a/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java index 95480302336..345aeff1d5b 100644 --- a/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java @@ -113,7 +113,13 @@ public class SelectQueryQueryToolChest extends QueryToolChest> mergeSequences(Sequence>> seqOfSequences) { - return new OrderedMergeSequence>(getOrdering(), seqOfSequences); + return new OrderedMergeSequence<>(getOrdering(), seqOfSequences); + } + + @Override + public Sequence> mergeSequencesUnordered(Sequence>> seqOfSequences) + { + return new MergeSequence<>(getOrdering(), seqOfSequences); } @Override diff --git a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java index 6e0f41db855..a45a63e031d 100644 --- a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java @@ -113,6 +113,12 @@ public class TimeBoundaryQueryQueryToolChest return new OrderedMergeSequence<>(getOrdering(), seqOfSequences); } + @Override + public Sequence> mergeSequencesUnordered(Sequence>> seqOfSequences) + { + return new MergeSequence<>(getOrdering(), seqOfSequences); + } + @Override public ServiceMetricEvent.Builder makeMetricBuilder(TimeBoundaryQuery query) { diff --git a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java index e7c7e0849b5..3ed7e84149d 100644 --- a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java @@ -112,7 +112,13 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest> mergeSequences(Sequence>> seqOfSequences) { - return new OrderedMergeSequence>(getOrdering(), seqOfSequences); + return new OrderedMergeSequence<>(getOrdering(), seqOfSequences); + } + + @Override + public Sequence> mergeSequencesUnordered(Sequence>> seqOfSequences) + { + return new MergeSequence<>(getOrdering(), seqOfSequences); } @Override diff --git a/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java index ab1faebf67e..cc00ea3292e 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java @@ -72,7 +72,6 @@ public class TopNQueryQueryToolChest extends QueryToolChest OBJECT_TYPE_REFERENCE = new TypeReference() { }; - private final TopNQueryConfig config; @Inject @@ -128,7 +127,13 @@ public class TopNQueryQueryToolChest extends QueryToolChest> mergeSequences(Sequence>> seqOfSequences) { - return new OrderedMergeSequence>(getOrdering(), seqOfSequences); + return new OrderedMergeSequence<>(getOrdering(), seqOfSequences); + } + + @Override + public Sequence> mergeSequencesUnordered(Sequence>> seqOfSequences) + { + return new MergeSequence<>(getOrdering(), seqOfSequences); } @Override diff --git a/processing/src/test/java/io/druid/query/timeseries/TimeSeriesUnionQueryRunnerTest.java b/processing/src/test/java/io/druid/query/timeseries/TimeSeriesUnionQueryRunnerTest.java index 17d61908c3c..84ace0a798f 100644 --- a/processing/src/test/java/io/druid/query/timeseries/TimeSeriesUnionQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/timeseries/TimeSeriesUnionQueryRunnerTest.java @@ -21,12 +21,18 @@ package io.druid.query.timeseries; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; +import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; import io.druid.query.Druids; +import io.druid.query.Query; import io.druid.query.QueryConfig; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerTestHelper; +import io.druid.query.QueryToolChest; import io.druid.query.Result; +import io.druid.query.TableDataSource; +import io.druid.query.UnionDataSource; +import io.druid.query.UnionQueryRunner; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory; import io.druid.segment.TestHelper; @@ -43,6 +49,15 @@ import java.util.List; @RunWith(Parameterized.class) public class TimeSeriesUnionQueryRunnerTest { + private final QueryRunner runner; + + public TimeSeriesUnionQueryRunnerTest( + QueryRunner runner + ) + { + this.runner = runner; + } + @Parameterized.Parameters public static Collection constructorFeeder() throws IOException { @@ -55,15 +70,6 @@ public class TimeSeriesUnionQueryRunnerTest ); } - private final QueryRunner runner; - - public TimeSeriesUnionQueryRunnerTest( - QueryRunner runner - ) - { - this.runner = runner; - } - @Test public void testUnionTimeseries() { @@ -106,6 +112,145 @@ public class TimeSeriesUnionQueryRunnerTest TestHelper.assertExpectedResults(expectedResults, results); } + @Test + public void testUnionResultMerging() + { + TimeseriesQuery query = Druids.newTimeseriesQueryBuilder() + .dataSource( + new UnionDataSource( + Lists.newArrayList( + new TableDataSource("ds1"), + new TableDataSource("ds2") + ) + ) + ) + .granularity(QueryRunnerTestHelper.dayGran) + .intervals(QueryRunnerTestHelper.firstToThird) + .aggregators( + Arrays.asList( + QueryRunnerTestHelper.rowsCount, + new LongSumAggregatorFactory( + "idx", + "index" + ) + ) + ) + .build(); + QueryToolChest toolChest = new TimeseriesQueryQueryToolChest(new QueryConfig()); + QueryRunner mergingrunner = toolChest.mergeResults( + new UnionQueryRunner>( + new QueryRunner>() + { + @Override + public Sequence> run(Query> query) + { + if (query.getDataSource().equals(new TableDataSource("ds1"))) { + return Sequences.simple( + Lists.newArrayList( + new Result( + new DateTime("2011-04-02"), + new TimeseriesResultValue( + ImmutableMap.of( + "rows", + 1L, + "idx", + 2L + ) + ) + ), + new Result( + new DateTime("2011-04-03"), + new TimeseriesResultValue( + ImmutableMap.of( + "rows", + 3L, + "idx", + 4L + ) + ) + ) + ) + ); + } else { + return Sequences.simple( + Lists.newArrayList( + new Result( + new DateTime("2011-04-01"), + new TimeseriesResultValue( + ImmutableMap.of( + "rows", + 5L, + "idx", + 6L + ) + ) + ), + new Result( + new DateTime("2011-04-02"), + new TimeseriesResultValue( + ImmutableMap.of( + "rows", + 7L, + "idx", + 8L + ) + ) + ), + new Result( + new DateTime("2011-04-04"), + new TimeseriesResultValue( + ImmutableMap.of( + "rows", + 9L, + "idx", + 10L + ) + ) + ) + ) + ); + } + } + }, + toolChest + ) + ); + List> expectedResults = Arrays.asList( + new Result( + new DateTime("2011-04-01"), + new TimeseriesResultValue( + ImmutableMap.of("rows", 5L, "idx", 6L) + ) + ), + new Result( + new DateTime("2011-04-02"), + new TimeseriesResultValue( + ImmutableMap.of("rows", 8L, "idx", 10L) + ) + ), + new Result( + new DateTime("2011-04-03"), + new TimeseriesResultValue( + ImmutableMap.of("rows", 3L, "idx", 4L) + ) + ), + new Result( + new DateTime("2011-04-04"), + new TimeseriesResultValue( + ImmutableMap.of("rows", 9L, "idx", 10L) + ) + ) + ); + + Iterable> results = Sequences.toList( + mergingrunner.run(query), + Lists.>newArrayList() + ); + + System.out.println(results); + TestHelper.assertExpectedResults(expectedResults, results); + + } } diff --git a/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java b/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java index 431fdce8318..a8789b021a3 100644 --- a/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java +++ b/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java @@ -571,6 +571,12 @@ public class ServerManagerTest return new ConcatSequence(seqOfSequences); } + @Override + public Sequence mergeSequencesUnordered(Sequence> seqOfSequences) + { + return new ConcatSequence(seqOfSequences); + } + @Override public ServiceMetricEvent.Builder makeMetricBuilder(QueryType query) {