diff --git a/processing/src/main/java/io/druid/query/QueryToolChest.java b/processing/src/main/java/io/druid/query/QueryToolChest.java index d2722c622be..bf5483f01c5 100644 --- a/processing/src/main/java/io/druid/query/QueryToolChest.java +++ b/processing/src/main/java/io/druid/query/QueryToolChest.java @@ -21,6 +21,7 @@ package io.druid.query; import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.base.Function; +import com.google.common.collect.Ordering; import com.metamx.common.guava.Sequence; import com.metamx.emitter.service.ServiceMetricEvent; import io.druid.query.aggregation.MetricManipulationFn; @@ -79,4 +80,6 @@ public abstract class QueryToolChest getOrdering(); } diff --git a/processing/src/main/java/io/druid/query/UnionQueryRunner.java b/processing/src/main/java/io/druid/query/UnionQueryRunner.java index 6679e6d7c77..727f8d3c4d2 100644 --- a/processing/src/main/java/io/druid/query/UnionQueryRunner.java +++ b/processing/src/main/java/io/druid/query/UnionQueryRunner.java @@ -23,11 +23,10 @@ package io.druid.query; import com.google.common.base.Function; import com.google.common.collect.Lists; +import com.metamx.common.guava.MergeSequence; import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; -import java.util.ArrayList; - public class UnionQueryRunner implements QueryRunner { private final QueryRunner baseRunner; @@ -47,7 +46,8 @@ public class UnionQueryRunner implements QueryRunner { DataSource dataSource = query.getDataSource(); if (dataSource instanceof UnionDataSource) { - return toolChest.mergeSequences( + return new MergeSequence( + toolChest.getOrdering(), Sequences.simple( Lists.transform( ((UnionDataSource) dataSource).getDataSources(), diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java index 123bd230287..96e70957888 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -193,7 +193,13 @@ public class GroupByQueryQueryToolChest extends QueryToolChest mergeSequences(Sequence> seqOfSequences) { - return new OrderedMergeSequence<>(Ordering.natural().nullsFirst(), seqOfSequences); + return new OrderedMergeSequence<>(getOrdering(), seqOfSequences); + } + + @Override + public Ordering getOrdering() + { + return Ordering.natural().nullsFirst(); } @Override @@ -372,7 +378,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest mergeSequences(Sequence> seqOfSequences) { - return new MergeSequence<>(Ordering.natural().nullsFirst(), seqOfSequences); + return new MergeSequence<>(getOrdering(), seqOfSequences); } }; } diff --git a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java index f21665b1a38..d7b41f86d9f 100644 --- a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java @@ -234,7 +234,8 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest getOrdering() + @Override + public Ordering getOrdering() { return new Ordering() { diff --git a/processing/src/test/java/io/druid/query/timeseries/TimeSeriesUnionQueryRunnerTest.java b/processing/src/test/java/io/druid/query/timeseries/TimeSeriesUnionQueryRunnerTest.java index 17d61908c3c..84ace0a798f 100644 --- a/processing/src/test/java/io/druid/query/timeseries/TimeSeriesUnionQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/timeseries/TimeSeriesUnionQueryRunnerTest.java @@ -21,12 +21,18 @@ package io.druid.query.timeseries; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; +import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; import io.druid.query.Druids; +import io.druid.query.Query; import io.druid.query.QueryConfig; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerTestHelper; +import io.druid.query.QueryToolChest; import io.druid.query.Result; +import io.druid.query.TableDataSource; +import io.druid.query.UnionDataSource; +import io.druid.query.UnionQueryRunner; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory; import io.druid.segment.TestHelper; @@ -43,6 +49,15 @@ import java.util.List; @RunWith(Parameterized.class) public class TimeSeriesUnionQueryRunnerTest { + private final QueryRunner runner; + + public TimeSeriesUnionQueryRunnerTest( + QueryRunner runner + ) + { + this.runner = runner; + } + @Parameterized.Parameters public static Collection constructorFeeder() throws IOException { @@ -55,15 +70,6 @@ public class TimeSeriesUnionQueryRunnerTest ); } - private final QueryRunner runner; - - public TimeSeriesUnionQueryRunnerTest( - QueryRunner runner - ) - { - this.runner = runner; - } - @Test public void testUnionTimeseries() { @@ -106,6 +112,145 @@ public class TimeSeriesUnionQueryRunnerTest TestHelper.assertExpectedResults(expectedResults, results); } + @Test + public void testUnionResultMerging() + { + TimeseriesQuery query = Druids.newTimeseriesQueryBuilder() + .dataSource( + new UnionDataSource( + Lists.newArrayList( + new TableDataSource("ds1"), + new TableDataSource("ds2") + ) + ) + ) + .granularity(QueryRunnerTestHelper.dayGran) + .intervals(QueryRunnerTestHelper.firstToThird) + .aggregators( + Arrays.asList( + QueryRunnerTestHelper.rowsCount, + new LongSumAggregatorFactory( + "idx", + "index" + ) + ) + ) + .build(); + QueryToolChest toolChest = new TimeseriesQueryQueryToolChest(new QueryConfig()); + QueryRunner mergingrunner = toolChest.mergeResults( + new UnionQueryRunner>( + new QueryRunner>() + { + @Override + public Sequence> run(Query> query) + { + if (query.getDataSource().equals(new TableDataSource("ds1"))) { + return Sequences.simple( + Lists.newArrayList( + new Result( + new DateTime("2011-04-02"), + new TimeseriesResultValue( + ImmutableMap.of( + "rows", + 1L, + "idx", + 2L + ) + ) + ), + new Result( + new DateTime("2011-04-03"), + new TimeseriesResultValue( + ImmutableMap.of( + "rows", + 3L, + "idx", + 4L + ) + ) + ) + ) + ); + } else { + return Sequences.simple( + Lists.newArrayList( + new Result( + new DateTime("2011-04-01"), + new TimeseriesResultValue( + ImmutableMap.of( + "rows", + 5L, + "idx", + 6L + ) + ) + ), + new Result( + new DateTime("2011-04-02"), + new TimeseriesResultValue( + ImmutableMap.of( + "rows", + 7L, + "idx", + 8L + ) + ) + ), + new Result( + new DateTime("2011-04-04"), + new TimeseriesResultValue( + ImmutableMap.of( + "rows", + 9L, + "idx", + 10L + ) + ) + ) + ) + ); + } + } + }, + toolChest + ) + ); + List> expectedResults = Arrays.asList( + new Result( + new DateTime("2011-04-01"), + new TimeseriesResultValue( + ImmutableMap.of("rows", 5L, "idx", 6L) + ) + ), + new Result( + new DateTime("2011-04-02"), + new TimeseriesResultValue( + ImmutableMap.of("rows", 8L, "idx", 10L) + ) + ), + new Result( + new DateTime("2011-04-03"), + new TimeseriesResultValue( + ImmutableMap.of("rows", 3L, "idx", 4L) + ) + ), + new Result( + new DateTime("2011-04-04"), + new TimeseriesResultValue( + ImmutableMap.of("rows", 9L, "idx", 10L) + ) + ) + ); + + Iterable> results = Sequences.toList( + mergingrunner.run(query), + Lists.>newArrayList() + ); + + System.out.println(results); + TestHelper.assertExpectedResults(expectedResults, results); + + } } diff --git a/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java b/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java index 431fdce8318..1148a107752 100644 --- a/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java +++ b/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java @@ -26,6 +26,7 @@ import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; +import com.google.common.collect.Ordering; import com.metamx.common.IAE; import com.metamx.common.MapUtils; import com.metamx.common.Pair; @@ -590,6 +591,12 @@ public class ServerManagerTest { }; } + + @Override + public Ordering getOrdering() + { + return null; + } } private static class SegmentForTesting implements Segment