diff --git a/common/src/test/java/io/druid/collections/OrderedMergeSequenceTest.java b/common/src/test/java/io/druid/collections/OrderedMergeSequenceTest.java index 257043ca0d8..073dcfd4375 100644 --- a/common/src/test/java/io/druid/collections/OrderedMergeSequenceTest.java +++ b/common/src/test/java/io/druid/collections/OrderedMergeSequenceTest.java @@ -23,6 +23,7 @@ import com.google.common.base.Function; import com.google.common.collect.Lists; import com.google.common.collect.Ordering; import com.metamx.common.guava.BaseSequence; +import com.metamx.common.guava.MergeSequence; import com.metamx.common.guava.Sequence; import com.metamx.common.guava.SequenceTestHelper; import com.metamx.common.guava.Sequences; @@ -230,17 +231,65 @@ public class OrderedMergeSequenceTest ordering, Sequences.simple( Lists.transform( // OMG WTF, the java type system really annoys me at times... - seqs, - new Function, Sequence>() - { - @Override - public Sequence apply(@Nullable TestSequence input) - { - return input; - } - } + seqs, + new Function, Sequence>() + { + @Override + public Sequence apply(@Nullable TestSequence input) + { + return input; + } + } ) ) ); } + + private MergeSequence makeUnorderedMergedSequence( + Ordering ordering, + List> seqs + ) + { + return new MergeSequence( + ordering, + Sequences.simple( + Lists.transform( // OMG WTF, the java type system really annoys me at times... + seqs, + new Function, Sequence>() + { + @Override + public Sequence apply(@Nullable TestSequence input) + { + return input; + } + } + ) + ) + ); + } + + @Test + public void testHierarchicalMerge() throws Exception + { + final Sequence seq1 = makeUnorderedMergedSequence( + Ordering.natural(), Lists.newArrayList( + TestSequence.create(1) + ) + ); + + + final Sequence seq2 = makeUnorderedMergedSequence( + Ordering.natural(), Lists.newArrayList( + TestSequence.create(1) + ) + ); + final OrderedMergeSequence finalMerged = new OrderedMergeSequence( + Ordering.natural(), + Sequences.simple( + Lists.>newArrayList(seq1, seq2) + ) + ); + + SequenceTestHelper.testAll(finalMerged, Arrays.asList(1, 1)); + } } diff --git a/processing/src/main/java/io/druid/query/UnionQueryRunner.java b/processing/src/main/java/io/druid/query/UnionQueryRunner.java index 0e3088c4847..6679e6d7c77 100644 --- a/processing/src/main/java/io/druid/query/UnionQueryRunner.java +++ b/processing/src/main/java/io/druid/query/UnionQueryRunner.java @@ -22,17 +22,24 @@ package io.druid.query; import com.google.common.base.Function; -import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; +import java.util.ArrayList; + public class UnionQueryRunner implements QueryRunner { private final QueryRunner baseRunner; + private final QueryToolChest> toolChest; - public UnionQueryRunner(QueryRunner baseRunner) + public UnionQueryRunner( + QueryRunner baseRunner, + QueryToolChest> toolChest + ) { this.baseRunner = baseRunner; + this.toolChest = toolChest; } @Override @@ -40,19 +47,21 @@ public class UnionQueryRunner implements QueryRunner { DataSource dataSource = query.getDataSource(); if (dataSource instanceof UnionDataSource) { - return Sequences.concat( - Iterables.transform( - ((UnionDataSource) dataSource).getDataSources(), - new Function>() - { - @Override - public Sequence apply(DataSource singleSource) - { - return baseRunner.run( - query.withDataSource(singleSource) - ); - } - } + return toolChest.mergeSequences( + Sequences.simple( + Lists.transform( + ((UnionDataSource) dataSource).getDataSources(), + new Function>() + { + @Override + public Sequence apply(DataSource singleSource) + { + return baseRunner.run( + query.withDataSource(singleSource) + ); + } + } + ) ) ); } else { diff --git a/server/src/main/java/io/druid/server/ClientQuerySegmentWalker.java b/server/src/main/java/io/druid/server/ClientQuerySegmentWalker.java index d500c9f70e2..f0e8aa01153 100644 --- a/server/src/main/java/io/druid/server/ClientQuerySegmentWalker.java +++ b/server/src/main/java/io/druid/server/ClientQuerySegmentWalker.java @@ -87,7 +87,8 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker } }, toolChest.preMergeQueryDecoration(baseClient) - ).withWaitMeasuredFromNow() + ).withWaitMeasuredFromNow(), + toolChest ) ) ),