diff --git a/processing/src/main/java/io/druid/query/QueryToolChest.java b/processing/src/main/java/io/druid/query/QueryToolChest.java index bf5483f01c5..250c7baa695 100644 --- a/processing/src/main/java/io/druid/query/QueryToolChest.java +++ b/processing/src/main/java/io/druid/query/QueryToolChest.java @@ -42,10 +42,11 @@ public abstract class QueryToolChest mergeSequences(Sequence> seqOfSequences); + public abstract Sequence mergeSequences(Sequence> seqOfSequences, boolean ordered); public abstract ServiceMetricEvent.Builder makeMetricBuilder(QueryType query); @@ -80,6 +81,4 @@ public abstract class QueryToolChest getOrdering(); } diff --git a/processing/src/main/java/io/druid/query/UnionQueryRunner.java b/processing/src/main/java/io/druid/query/UnionQueryRunner.java index 727f8d3c4d2..1954937094c 100644 --- a/processing/src/main/java/io/druid/query/UnionQueryRunner.java +++ b/processing/src/main/java/io/druid/query/UnionQueryRunner.java @@ -46,8 +46,7 @@ public class UnionQueryRunner implements QueryRunner { DataSource dataSource = query.getDataSource(); if (dataSource instanceof UnionDataSource) { - return new MergeSequence( - toolChest.getOrdering(), + return toolChest.mergeSequences( Sequences.simple( Lists.transform( ((UnionDataSource) dataSource).getDataSources(), @@ -62,7 +61,8 @@ public class UnionQueryRunner implements QueryRunner } } ) - ) + ), + false ); } else { return baseRunner.run(query); diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java index 96e70957888..ef7c7e8708c 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -191,13 +191,16 @@ public class GroupByQueryQueryToolChest extends QueryToolChest mergeSequences(Sequence> seqOfSequences) + public Sequence mergeSequences(Sequence> seqOfSequences, boolean ordered) { - return new OrderedMergeSequence<>(getOrdering(), seqOfSequences); + if (ordered) { + return new OrderedMergeSequence<>(getOrdering(), seqOfSequences); + } else { + return new MergeSequence<>(getOrdering(), seqOfSequences); + } } - @Override - public Ordering getOrdering() + private Ordering getOrdering() { return Ordering.natural().nullsFirst(); } diff --git a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java index d7b41f86d9f..6ab91ca9596 100644 --- a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java @@ -144,9 +144,13 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest mergeSequences(Sequence> seqOfSequences) + public Sequence mergeSequences(Sequence> seqOfSequences, boolean ordered) { - return new OrderedMergeSequence(getOrdering(), seqOfSequences); + if (ordered) { + return new OrderedMergeSequence<>(getOrdering(), seqOfSequences); + } else { + return new MergeSequence<>(getOrdering(), seqOfSequences); + } } @Override @@ -234,8 +238,7 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest getOrdering() + private Ordering getOrdering() { return new Ordering() { diff --git a/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java index e24239f8896..c7bf9f52c3e 100644 --- a/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java @@ -109,9 +109,16 @@ public class SearchQueryQueryToolChest extends QueryToolChest> mergeSequences(Sequence>> seqOfSequences) + public Sequence> mergeSequences( + Sequence>> seqOfSequences, + boolean ordered + ) { - return new OrderedMergeSequence>(getOrdering(), seqOfSequences); + if (ordered) { + return new OrderedMergeSequence<>(getOrdering(), seqOfSequences); + } else { + return new MergeSequence<>(getOrdering(), seqOfSequences); + } } @Override diff --git a/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java index 95480302336..79297ca9ebe 100644 --- a/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java @@ -111,9 +111,16 @@ public class SelectQueryQueryToolChest extends QueryToolChest> mergeSequences(Sequence>> seqOfSequences) + public Sequence> mergeSequences( + Sequence>> seqOfSequences, + boolean ordered + ) { - return new OrderedMergeSequence>(getOrdering(), seqOfSequences); + if (ordered) { + return new OrderedMergeSequence<>(getOrdering(), seqOfSequences); + } else { + return new MergeSequence<>(getOrdering(), seqOfSequences); + } } @Override diff --git a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java index 088ec0b6eb8..c3bbd88bcca 100644 --- a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java @@ -109,9 +109,16 @@ public class TimeBoundaryQueryQueryToolChest } @Override - public Sequence> mergeSequences(Sequence>> seqOfSequences) + public Sequence> mergeSequences( + Sequence>> seqOfSequences, + boolean ordered + ) { - return new OrderedMergeSequence<>(getOrdering(), seqOfSequences); + if (ordered) { + return new OrderedMergeSequence<>(getOrdering(), seqOfSequences); + } else { + return new MergeSequence<>(getOrdering(), seqOfSequences); + } } @Override diff --git a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java index e7c7e0849b5..5c1b15e0b96 100644 --- a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java @@ -110,9 +110,16 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest> mergeSequences(Sequence>> seqOfSequences) + public Sequence> mergeSequences( + Sequence>> seqOfSequences, + boolean ordered + ) { - return new OrderedMergeSequence>(getOrdering(), seqOfSequences); + if (ordered) { + return new OrderedMergeSequence<>(getOrdering(), seqOfSequences); + } else { + return new MergeSequence<>(getOrdering(), seqOfSequences); + } } @Override diff --git a/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java index ab1faebf67e..7e60209f987 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java @@ -126,9 +126,16 @@ public class TopNQueryQueryToolChest extends QueryToolChest> mergeSequences(Sequence>> seqOfSequences) + public Sequence> mergeSequences( + Sequence>> seqOfSequences, + boolean ordered + ) { - return new OrderedMergeSequence>(getOrdering(), seqOfSequences); + if (ordered) { + return new OrderedMergeSequence<>(getOrdering(), seqOfSequences); + } else { + return new MergeSequence<>(getOrdering(), seqOfSequences); + } } @Override diff --git a/server/src/main/java/io/druid/client/CachingClusteredClient.java b/server/src/main/java/io/druid/client/CachingClusteredClient.java index d0135eb7f4a..11f2e09681c 100644 --- a/server/src/main/java/io/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/io/druid/client/CachingClusteredClient.java @@ -265,7 +265,7 @@ public class CachingClusteredClient implements QueryRunner Iterables.transform(listOfSequences, Pair.>rhsFn()) ); if (strategy == null) { - return toolChest.mergeSequences(seq); + return toolChest.mergeSequences(seq, true); } else { return strategy.mergeSequences(seq); } @@ -383,7 +383,8 @@ public class CachingClusteredClient implements QueryRunner ); } } - ) + ), + true ); } diff --git a/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java b/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java index 1148a107752..10580482e98 100644 --- a/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java +++ b/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java @@ -567,7 +567,7 @@ public class ServerManagerTest } @Override - public Sequence mergeSequences(Sequence> seqOfSequences) + public Sequence mergeSequences(Sequence> seqOfSequences, boolean ordered) { return new ConcatSequence(seqOfSequences); } @@ -591,12 +591,6 @@ public class ServerManagerTest { }; } - - @Override - public Ordering getOrdering() - { - return null; - } } private static class SegmentForTesting implements Segment