From 268a42bd795258c62e8d5950fac803d0dddf2f84 Mon Sep 17 00:00:00 2001 From: nishantmonu51 Date: Wed, 3 Sep 2014 16:38:02 +0530 Subject: [PATCH 1/4] fix Union Query result merging --- .../java/io/druid/query/QueryToolChest.java | 3 + .../java/io/druid/query/UnionQueryRunner.java | 6 +- .../groupby/GroupByQueryQueryToolChest.java | 10 +- .../SegmentMetadataQueryQueryToolChest.java | 3 +- .../TimeSeriesUnionQueryRunnerTest.java | 163 +++++++++++++++++- .../coordination/ServerManagerTest.java | 7 + 6 files changed, 177 insertions(+), 15 deletions(-) diff --git a/processing/src/main/java/io/druid/query/QueryToolChest.java b/processing/src/main/java/io/druid/query/QueryToolChest.java index d2722c622be..bf5483f01c5 100644 --- a/processing/src/main/java/io/druid/query/QueryToolChest.java +++ b/processing/src/main/java/io/druid/query/QueryToolChest.java @@ -21,6 +21,7 @@ package io.druid.query; import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.base.Function; +import com.google.common.collect.Ordering; import com.metamx.common.guava.Sequence; import com.metamx.emitter.service.ServiceMetricEvent; import io.druid.query.aggregation.MetricManipulationFn; @@ -79,4 +80,6 @@ public abstract class QueryToolChest getOrdering(); } diff --git a/processing/src/main/java/io/druid/query/UnionQueryRunner.java b/processing/src/main/java/io/druid/query/UnionQueryRunner.java index 6679e6d7c77..727f8d3c4d2 100644 --- a/processing/src/main/java/io/druid/query/UnionQueryRunner.java +++ b/processing/src/main/java/io/druid/query/UnionQueryRunner.java @@ -23,11 +23,10 @@ package io.druid.query; import com.google.common.base.Function; import com.google.common.collect.Lists; +import com.metamx.common.guava.MergeSequence; import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; -import java.util.ArrayList; - public class UnionQueryRunner implements QueryRunner { private final QueryRunner baseRunner; @@ -47,7 +46,8 @@ public class UnionQueryRunner implements QueryRunner { DataSource dataSource = query.getDataSource(); if (dataSource instanceof UnionDataSource) { - return toolChest.mergeSequences( + return new MergeSequence( + toolChest.getOrdering(), Sequences.simple( Lists.transform( ((UnionDataSource) dataSource).getDataSources(), diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java index 123bd230287..96e70957888 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -193,7 +193,13 @@ public class GroupByQueryQueryToolChest extends QueryToolChest mergeSequences(Sequence> seqOfSequences) { - return new OrderedMergeSequence<>(Ordering.natural().nullsFirst(), seqOfSequences); + return new OrderedMergeSequence<>(getOrdering(), seqOfSequences); + } + + @Override + public Ordering getOrdering() + { + return Ordering.natural().nullsFirst(); } @Override @@ -372,7 +378,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest mergeSequences(Sequence> seqOfSequences) { - return new MergeSequence<>(Ordering.natural().nullsFirst(), seqOfSequences); + return new MergeSequence<>(getOrdering(), seqOfSequences); } }; } diff --git a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java index f21665b1a38..d7b41f86d9f 100644 --- a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java @@ -234,7 +234,8 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest getOrdering() + @Override + public Ordering getOrdering() { return new Ordering() { diff --git a/processing/src/test/java/io/druid/query/timeseries/TimeSeriesUnionQueryRunnerTest.java b/processing/src/test/java/io/druid/query/timeseries/TimeSeriesUnionQueryRunnerTest.java index 17d61908c3c..84ace0a798f 100644 --- a/processing/src/test/java/io/druid/query/timeseries/TimeSeriesUnionQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/timeseries/TimeSeriesUnionQueryRunnerTest.java @@ -21,12 +21,18 @@ package io.druid.query.timeseries; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; +import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; import io.druid.query.Druids; +import io.druid.query.Query; import io.druid.query.QueryConfig; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerTestHelper; +import io.druid.query.QueryToolChest; import io.druid.query.Result; +import io.druid.query.TableDataSource; +import io.druid.query.UnionDataSource; +import io.druid.query.UnionQueryRunner; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory; import io.druid.segment.TestHelper; @@ -43,6 +49,15 @@ import java.util.List; @RunWith(Parameterized.class) public class TimeSeriesUnionQueryRunnerTest { + private final QueryRunner runner; + + public TimeSeriesUnionQueryRunnerTest( + QueryRunner runner + ) + { + this.runner = runner; + } + @Parameterized.Parameters public static Collection constructorFeeder() throws IOException { @@ -55,15 +70,6 @@ public class TimeSeriesUnionQueryRunnerTest ); } - private final QueryRunner runner; - - public TimeSeriesUnionQueryRunnerTest( - QueryRunner runner - ) - { - this.runner = runner; - } - @Test public void testUnionTimeseries() { @@ -106,6 +112,145 @@ public class TimeSeriesUnionQueryRunnerTest TestHelper.assertExpectedResults(expectedResults, results); } + @Test + public void testUnionResultMerging() + { + TimeseriesQuery query = Druids.newTimeseriesQueryBuilder() + .dataSource( + new UnionDataSource( + Lists.newArrayList( + new TableDataSource("ds1"), + new TableDataSource("ds2") + ) + ) + ) + .granularity(QueryRunnerTestHelper.dayGran) + .intervals(QueryRunnerTestHelper.firstToThird) + .aggregators( + Arrays.asList( + QueryRunnerTestHelper.rowsCount, + new LongSumAggregatorFactory( + "idx", + "index" + ) + ) + ) + .build(); + QueryToolChest toolChest = new TimeseriesQueryQueryToolChest(new QueryConfig()); + QueryRunner mergingrunner = toolChest.mergeResults( + new UnionQueryRunner>( + new QueryRunner>() + { + @Override + public Sequence> run(Query> query) + { + if (query.getDataSource().equals(new TableDataSource("ds1"))) { + return Sequences.simple( + Lists.newArrayList( + new Result( + new DateTime("2011-04-02"), + new TimeseriesResultValue( + ImmutableMap.of( + "rows", + 1L, + "idx", + 2L + ) + ) + ), + new Result( + new DateTime("2011-04-03"), + new TimeseriesResultValue( + ImmutableMap.of( + "rows", + 3L, + "idx", + 4L + ) + ) + ) + ) + ); + } else { + return Sequences.simple( + Lists.newArrayList( + new Result( + new DateTime("2011-04-01"), + new TimeseriesResultValue( + ImmutableMap.of( + "rows", + 5L, + "idx", + 6L + ) + ) + ), + new Result( + new DateTime("2011-04-02"), + new TimeseriesResultValue( + ImmutableMap.of( + "rows", + 7L, + "idx", + 8L + ) + ) + ), + new Result( + new DateTime("2011-04-04"), + new TimeseriesResultValue( + ImmutableMap.of( + "rows", + 9L, + "idx", + 10L + ) + ) + ) + ) + ); + } + } + }, + toolChest + ) + ); + List> expectedResults = Arrays.asList( + new Result( + new DateTime("2011-04-01"), + new TimeseriesResultValue( + ImmutableMap.of("rows", 5L, "idx", 6L) + ) + ), + new Result( + new DateTime("2011-04-02"), + new TimeseriesResultValue( + ImmutableMap.of("rows", 8L, "idx", 10L) + ) + ), + new Result( + new DateTime("2011-04-03"), + new TimeseriesResultValue( + ImmutableMap.of("rows", 3L, "idx", 4L) + ) + ), + new Result( + new DateTime("2011-04-04"), + new TimeseriesResultValue( + ImmutableMap.of("rows", 9L, "idx", 10L) + ) + ) + ); + + Iterable> results = Sequences.toList( + mergingrunner.run(query), + Lists.>newArrayList() + ); + + System.out.println(results); + TestHelper.assertExpectedResults(expectedResults, results); + + } } diff --git a/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java b/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java index 431fdce8318..1148a107752 100644 --- a/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java +++ b/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java @@ -26,6 +26,7 @@ import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; +import com.google.common.collect.Ordering; import com.metamx.common.IAE; import com.metamx.common.MapUtils; import com.metamx.common.Pair; @@ -590,6 +591,12 @@ public class ServerManagerTest { }; } + + @Override + public Ordering getOrdering() + { + return null; + } } private static class SegmentForTesting implements Segment From 2772a78b3b07a1fbcb92bcb770284e3ce8acdfc6 Mon Sep 17 00:00:00 2001 From: nishantmonu51 Date: Thu, 4 Sep 2014 11:31:04 +0530 Subject: [PATCH 2/4] review comments - move merging to tool chest --- .../src/main/java/io/druid/query/QueryToolChest.java | 5 ++--- .../main/java/io/druid/query/UnionQueryRunner.java | 6 +++--- .../query/groupby/GroupByQueryQueryToolChest.java | 11 +++++++---- .../metadata/SegmentMetadataQueryQueryToolChest.java | 11 +++++++---- .../druid/query/search/SearchQueryQueryToolChest.java | 11 +++++++++-- .../druid/query/select/SelectQueryQueryToolChest.java | 11 +++++++++-- .../timeboundary/TimeBoundaryQueryQueryToolChest.java | 11 +++++++++-- .../timeseries/TimeseriesQueryQueryToolChest.java | 11 +++++++++-- .../io/druid/query/topn/TopNQueryQueryToolChest.java | 11 +++++++++-- .../java/io/druid/client/CachingClusteredClient.java | 5 +++-- .../druid/server/coordination/ServerManagerTest.java | 8 +------- 11 files changed, 68 insertions(+), 33 deletions(-) diff --git a/processing/src/main/java/io/druid/query/QueryToolChest.java b/processing/src/main/java/io/druid/query/QueryToolChest.java index bf5483f01c5..250c7baa695 100644 --- a/processing/src/main/java/io/druid/query/QueryToolChest.java +++ b/processing/src/main/java/io/druid/query/QueryToolChest.java @@ -42,10 +42,11 @@ public abstract class QueryToolChest mergeSequences(Sequence> seqOfSequences); + public abstract Sequence mergeSequences(Sequence> seqOfSequences, boolean ordered); public abstract ServiceMetricEvent.Builder makeMetricBuilder(QueryType query); @@ -80,6 +81,4 @@ public abstract class QueryToolChest getOrdering(); } diff --git a/processing/src/main/java/io/druid/query/UnionQueryRunner.java b/processing/src/main/java/io/druid/query/UnionQueryRunner.java index 727f8d3c4d2..1954937094c 100644 --- a/processing/src/main/java/io/druid/query/UnionQueryRunner.java +++ b/processing/src/main/java/io/druid/query/UnionQueryRunner.java @@ -46,8 +46,7 @@ public class UnionQueryRunner implements QueryRunner { DataSource dataSource = query.getDataSource(); if (dataSource instanceof UnionDataSource) { - return new MergeSequence( - toolChest.getOrdering(), + return toolChest.mergeSequences( Sequences.simple( Lists.transform( ((UnionDataSource) dataSource).getDataSources(), @@ -62,7 +61,8 @@ public class UnionQueryRunner implements QueryRunner } } ) - ) + ), + false ); } else { return baseRunner.run(query); diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java index 96e70957888..ef7c7e8708c 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -191,13 +191,16 @@ public class GroupByQueryQueryToolChest extends QueryToolChest mergeSequences(Sequence> seqOfSequences) + public Sequence mergeSequences(Sequence> seqOfSequences, boolean ordered) { - return new OrderedMergeSequence<>(getOrdering(), seqOfSequences); + if (ordered) { + return new OrderedMergeSequence<>(getOrdering(), seqOfSequences); + } else { + return new MergeSequence<>(getOrdering(), seqOfSequences); + } } - @Override - public Ordering getOrdering() + private Ordering getOrdering() { return Ordering.natural().nullsFirst(); } diff --git a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java index d7b41f86d9f..6ab91ca9596 100644 --- a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java @@ -144,9 +144,13 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest mergeSequences(Sequence> seqOfSequences) + public Sequence mergeSequences(Sequence> seqOfSequences, boolean ordered) { - return new OrderedMergeSequence(getOrdering(), seqOfSequences); + if (ordered) { + return new OrderedMergeSequence<>(getOrdering(), seqOfSequences); + } else { + return new MergeSequence<>(getOrdering(), seqOfSequences); + } } @Override @@ -234,8 +238,7 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest getOrdering() + private Ordering getOrdering() { return new Ordering() { diff --git a/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java index e24239f8896..c7bf9f52c3e 100644 --- a/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java @@ -109,9 +109,16 @@ public class SearchQueryQueryToolChest extends QueryToolChest> mergeSequences(Sequence>> seqOfSequences) + public Sequence> mergeSequences( + Sequence>> seqOfSequences, + boolean ordered + ) { - return new OrderedMergeSequence>(getOrdering(), seqOfSequences); + if (ordered) { + return new OrderedMergeSequence<>(getOrdering(), seqOfSequences); + } else { + return new MergeSequence<>(getOrdering(), seqOfSequences); + } } @Override diff --git a/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java index 95480302336..79297ca9ebe 100644 --- a/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java @@ -111,9 +111,16 @@ public class SelectQueryQueryToolChest extends QueryToolChest> mergeSequences(Sequence>> seqOfSequences) + public Sequence> mergeSequences( + Sequence>> seqOfSequences, + boolean ordered + ) { - return new OrderedMergeSequence>(getOrdering(), seqOfSequences); + if (ordered) { + return new OrderedMergeSequence<>(getOrdering(), seqOfSequences); + } else { + return new MergeSequence<>(getOrdering(), seqOfSequences); + } } @Override diff --git a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java index 088ec0b6eb8..c3bbd88bcca 100644 --- a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java @@ -109,9 +109,16 @@ public class TimeBoundaryQueryQueryToolChest } @Override - public Sequence> mergeSequences(Sequence>> seqOfSequences) + public Sequence> mergeSequences( + Sequence>> seqOfSequences, + boolean ordered + ) { - return new OrderedMergeSequence<>(getOrdering(), seqOfSequences); + if (ordered) { + return new OrderedMergeSequence<>(getOrdering(), seqOfSequences); + } else { + return new MergeSequence<>(getOrdering(), seqOfSequences); + } } @Override diff --git a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java index e7c7e0849b5..5c1b15e0b96 100644 --- a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java @@ -110,9 +110,16 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest> mergeSequences(Sequence>> seqOfSequences) + public Sequence> mergeSequences( + Sequence>> seqOfSequences, + boolean ordered + ) { - return new OrderedMergeSequence>(getOrdering(), seqOfSequences); + if (ordered) { + return new OrderedMergeSequence<>(getOrdering(), seqOfSequences); + } else { + return new MergeSequence<>(getOrdering(), seqOfSequences); + } } @Override diff --git a/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java index ab1faebf67e..7e60209f987 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java @@ -126,9 +126,16 @@ public class TopNQueryQueryToolChest extends QueryToolChest> mergeSequences(Sequence>> seqOfSequences) + public Sequence> mergeSequences( + Sequence>> seqOfSequences, + boolean ordered + ) { - return new OrderedMergeSequence>(getOrdering(), seqOfSequences); + if (ordered) { + return new OrderedMergeSequence<>(getOrdering(), seqOfSequences); + } else { + return new MergeSequence<>(getOrdering(), seqOfSequences); + } } @Override diff --git a/server/src/main/java/io/druid/client/CachingClusteredClient.java b/server/src/main/java/io/druid/client/CachingClusteredClient.java index d0135eb7f4a..11f2e09681c 100644 --- a/server/src/main/java/io/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/io/druid/client/CachingClusteredClient.java @@ -265,7 +265,7 @@ public class CachingClusteredClient implements QueryRunner Iterables.transform(listOfSequences, Pair.>rhsFn()) ); if (strategy == null) { - return toolChest.mergeSequences(seq); + return toolChest.mergeSequences(seq, true); } else { return strategy.mergeSequences(seq); } @@ -383,7 +383,8 @@ public class CachingClusteredClient implements QueryRunner ); } } - ) + ), + true ); } diff --git a/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java b/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java index 1148a107752..10580482e98 100644 --- a/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java +++ b/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java @@ -567,7 +567,7 @@ public class ServerManagerTest } @Override - public Sequence mergeSequences(Sequence> seqOfSequences) + public Sequence mergeSequences(Sequence> seqOfSequences, boolean ordered) { return new ConcatSequence(seqOfSequences); } @@ -591,12 +591,6 @@ public class ServerManagerTest { }; } - - @Override - public Ordering getOrdering() - { - return null; - } } private static class SegmentForTesting implements Segment From 90632ecf3af74af063e8a0172b67fb02b39c5dab Mon Sep 17 00:00:00 2001 From: nishantmonu51 Date: Fri, 5 Sep 2014 00:03:36 +0530 Subject: [PATCH 3/4] review comment - have separate methods --- .../java/io/druid/query/QueryToolChest.java | 6 ++++-- .../java/io/druid/query/UnionQueryRunner.java | 5 ++--- .../groupby/GroupByQueryQueryToolChest.java | 16 +++++++++------- .../SegmentMetadataQueryQueryToolChest.java | 14 ++++++++------ .../search/SearchQueryQueryToolChest.java | 17 ++++++++--------- .../select/SelectQueryQueryToolChest.java | 17 ++++++++--------- .../TimeBoundaryQueryQueryToolChest.java | 17 ++++++++--------- .../TimeseriesQueryQueryToolChest.java | 17 ++++++++--------- .../query/topn/TopNQueryQueryToolChest.java | 18 ++++++++---------- .../druid/client/CachingClusteredClient.java | 5 ++--- .../server/coordination/ServerManagerTest.java | 8 +++++++- 11 files changed, 72 insertions(+), 68 deletions(-) diff --git a/processing/src/main/java/io/druid/query/QueryToolChest.java b/processing/src/main/java/io/druid/query/QueryToolChest.java index 250c7baa695..26880bd8553 100644 --- a/processing/src/main/java/io/druid/query/QueryToolChest.java +++ b/processing/src/main/java/io/druid/query/QueryToolChest.java @@ -42,11 +42,13 @@ public abstract class QueryToolChest mergeSequences(Sequence> seqOfSequences, boolean ordered); + public abstract Sequence mergeSequences(Sequence> seqOfSequences); + + public abstract Sequence mergeSequencesUnordered(Sequence> seqOfSequences); + public abstract ServiceMetricEvent.Builder makeMetricBuilder(QueryType query); diff --git a/processing/src/main/java/io/druid/query/UnionQueryRunner.java b/processing/src/main/java/io/druid/query/UnionQueryRunner.java index 1954937094c..727940388a1 100644 --- a/processing/src/main/java/io/druid/query/UnionQueryRunner.java +++ b/processing/src/main/java/io/druid/query/UnionQueryRunner.java @@ -46,7 +46,7 @@ public class UnionQueryRunner implements QueryRunner { DataSource dataSource = query.getDataSource(); if (dataSource instanceof UnionDataSource) { - return toolChest.mergeSequences( + return toolChest.mergeSequencesUnordered( Sequences.simple( Lists.transform( ((UnionDataSource) dataSource).getDataSources(), @@ -61,8 +61,7 @@ public class UnionQueryRunner implements QueryRunner } } ) - ), - false + ) ); } else { return baseRunner.run(query); diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java index ef7c7e8708c..e2ddc18da5c 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -191,16 +191,18 @@ public class GroupByQueryQueryToolChest extends QueryToolChest mergeSequences(Sequence> seqOfSequences, boolean ordered) + public Sequence mergeSequences(Sequence> seqOfSequences) { - if (ordered) { - return new OrderedMergeSequence<>(getOrdering(), seqOfSequences); - } else { - return new MergeSequence<>(getOrdering(), seqOfSequences); - } + return new OrderedMergeSequence<>(getOrdering(), seqOfSequences); } - private Ordering getOrdering() + @Override + public Sequence mergeSequencesUnordered(Sequence> seqOfSequences) + { + return new MergeSequence<>(getOrdering(), seqOfSequences); + } + + private Ordering getOrdering() { return Ordering.natural().nullsFirst(); } diff --git a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java index 6ab91ca9596..21f02145988 100644 --- a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java @@ -144,13 +144,15 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest mergeSequences(Sequence> seqOfSequences, boolean ordered) + public Sequence mergeSequences(Sequence> seqOfSequences) { - if (ordered) { - return new OrderedMergeSequence<>(getOrdering(), seqOfSequences); - } else { - return new MergeSequence<>(getOrdering(), seqOfSequences); - } + return new OrderedMergeSequence<>(getOrdering(), seqOfSequences); + } + + @Override + public Sequence mergeSequencesUnordered(Sequence> seqOfSequences) + { + return new MergeSequence<>(getOrdering(), seqOfSequences); } @Override diff --git a/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java index c7bf9f52c3e..558d7ee3f13 100644 --- a/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java @@ -109,16 +109,15 @@ public class SearchQueryQueryToolChest extends QueryToolChest> mergeSequences( - Sequence>> seqOfSequences, - boolean ordered - ) + public Sequence> mergeSequences(Sequence>> seqOfSequences) { - if (ordered) { - return new OrderedMergeSequence<>(getOrdering(), seqOfSequences); - } else { - return new MergeSequence<>(getOrdering(), seqOfSequences); - } + return new OrderedMergeSequence<>(getOrdering(), seqOfSequences); + } + + @Override + public Sequence> mergeSequencesUnordered(Sequence>> seqOfSequences) + { + return new MergeSequence<>(getOrdering(), seqOfSequences); } @Override diff --git a/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java index 79297ca9ebe..345aeff1d5b 100644 --- a/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java @@ -111,16 +111,15 @@ public class SelectQueryQueryToolChest extends QueryToolChest> mergeSequences( - Sequence>> seqOfSequences, - boolean ordered - ) + public Sequence> mergeSequences(Sequence>> seqOfSequences) { - if (ordered) { - return new OrderedMergeSequence<>(getOrdering(), seqOfSequences); - } else { - return new MergeSequence<>(getOrdering(), seqOfSequences); - } + return new OrderedMergeSequence<>(getOrdering(), seqOfSequences); + } + + @Override + public Sequence> mergeSequencesUnordered(Sequence>> seqOfSequences) + { + return new MergeSequence<>(getOrdering(), seqOfSequences); } @Override diff --git a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java index c3bbd88bcca..e755986193b 100644 --- a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java @@ -109,16 +109,15 @@ public class TimeBoundaryQueryQueryToolChest } @Override - public Sequence> mergeSequences( - Sequence>> seqOfSequences, - boolean ordered - ) + public Sequence> mergeSequences(Sequence>> seqOfSequences) { - if (ordered) { - return new OrderedMergeSequence<>(getOrdering(), seqOfSequences); - } else { - return new MergeSequence<>(getOrdering(), seqOfSequences); - } + return new OrderedMergeSequence<>(getOrdering(), seqOfSequences); + } + + @Override + public Sequence> mergeSequencesUnordered(Sequence>> seqOfSequences) + { + return new MergeSequence<>(getOrdering(), seqOfSequences); } @Override diff --git a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java index 5c1b15e0b96..3ed7e84149d 100644 --- a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java @@ -110,16 +110,15 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest> mergeSequences( - Sequence>> seqOfSequences, - boolean ordered - ) + public Sequence> mergeSequences(Sequence>> seqOfSequences) { - if (ordered) { - return new OrderedMergeSequence<>(getOrdering(), seqOfSequences); - } else { - return new MergeSequence<>(getOrdering(), seqOfSequences); - } + return new OrderedMergeSequence<>(getOrdering(), seqOfSequences); + } + + @Override + public Sequence> mergeSequencesUnordered(Sequence>> seqOfSequences) + { + return new MergeSequence<>(getOrdering(), seqOfSequences); } @Override diff --git a/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java index 7e60209f987..cc00ea3292e 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java @@ -72,7 +72,6 @@ public class TopNQueryQueryToolChest extends QueryToolChest OBJECT_TYPE_REFERENCE = new TypeReference() { }; - private final TopNQueryConfig config; @Inject @@ -126,16 +125,15 @@ public class TopNQueryQueryToolChest extends QueryToolChest> mergeSequences( - Sequence>> seqOfSequences, - boolean ordered - ) + public Sequence> mergeSequences(Sequence>> seqOfSequences) { - if (ordered) { - return new OrderedMergeSequence<>(getOrdering(), seqOfSequences); - } else { - return new MergeSequence<>(getOrdering(), seqOfSequences); - } + return new OrderedMergeSequence<>(getOrdering(), seqOfSequences); + } + + @Override + public Sequence> mergeSequencesUnordered(Sequence>> seqOfSequences) + { + return new MergeSequence<>(getOrdering(), seqOfSequences); } @Override diff --git a/server/src/main/java/io/druid/client/CachingClusteredClient.java b/server/src/main/java/io/druid/client/CachingClusteredClient.java index 11f2e09681c..d0135eb7f4a 100644 --- a/server/src/main/java/io/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/io/druid/client/CachingClusteredClient.java @@ -265,7 +265,7 @@ public class CachingClusteredClient implements QueryRunner Iterables.transform(listOfSequences, Pair.>rhsFn()) ); if (strategy == null) { - return toolChest.mergeSequences(seq, true); + return toolChest.mergeSequences(seq); } else { return strategy.mergeSequences(seq); } @@ -383,8 +383,7 @@ public class CachingClusteredClient implements QueryRunner ); } } - ), - true + ) ); } diff --git a/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java b/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java index 10580482e98..4cc76770517 100644 --- a/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java +++ b/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java @@ -567,7 +567,13 @@ public class ServerManagerTest } @Override - public Sequence mergeSequences(Sequence> seqOfSequences, boolean ordered) + public Sequence mergeSequences(Sequence> seqOfSequences) + { + return new ConcatSequence(seqOfSequences); + } + + @Override + public Sequence mergeSequencesUnordered(Sequence> seqOfSequences) { return new ConcatSequence(seqOfSequences); } From 0dec5826ed32dda48262450122ab3063bfee4c7c Mon Sep 17 00:00:00 2001 From: nishantmonu51 Date: Fri, 5 Sep 2014 00:07:42 +0530 Subject: [PATCH 4/4] remove unused imports --- processing/src/main/java/io/druid/query/QueryToolChest.java | 1 - processing/src/main/java/io/druid/query/UnionQueryRunner.java | 1 - .../java/io/druid/server/coordination/ServerManagerTest.java | 1 - 3 files changed, 3 deletions(-) diff --git a/processing/src/main/java/io/druid/query/QueryToolChest.java b/processing/src/main/java/io/druid/query/QueryToolChest.java index 26880bd8553..34a33a02336 100644 --- a/processing/src/main/java/io/druid/query/QueryToolChest.java +++ b/processing/src/main/java/io/druid/query/QueryToolChest.java @@ -21,7 +21,6 @@ package io.druid.query; import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.base.Function; -import com.google.common.collect.Ordering; import com.metamx.common.guava.Sequence; import com.metamx.emitter.service.ServiceMetricEvent; import io.druid.query.aggregation.MetricManipulationFn; diff --git a/processing/src/main/java/io/druid/query/UnionQueryRunner.java b/processing/src/main/java/io/druid/query/UnionQueryRunner.java index 727940388a1..f14bb180f62 100644 --- a/processing/src/main/java/io/druid/query/UnionQueryRunner.java +++ b/processing/src/main/java/io/druid/query/UnionQueryRunner.java @@ -23,7 +23,6 @@ package io.druid.query; import com.google.common.base.Function; import com.google.common.collect.Lists; -import com.metamx.common.guava.MergeSequence; import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; diff --git a/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java b/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java index 4cc76770517..a8789b021a3 100644 --- a/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java +++ b/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java @@ -26,7 +26,6 @@ import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; -import com.google.common.collect.Ordering; import com.metamx.common.IAE; import com.metamx.common.MapUtils; import com.metamx.common.Pair;