union query fix and a broken test

This commit is contained in:
nishantmonu51 2014-05-15 23:10:40 +05:30
parent c57a18d6b6
commit ddc36c89cc
3 changed files with 84 additions and 25 deletions

View File

@ -23,6 +23,7 @@ import com.google.common.base.Function;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.metamx.common.guava.BaseSequence;
import com.metamx.common.guava.MergeSequence;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.SequenceTestHelper;
import com.metamx.common.guava.Sequences;
@ -243,4 +244,52 @@ public class OrderedMergeSequenceTest
)
);
}
private <T> MergeSequence<T> makeUnorderedMergedSequence(
Ordering<T> ordering,
List<TestSequence<T>> seqs
)
{
return new MergeSequence<T>(
ordering,
Sequences.simple(
Lists.transform( // OMG WTF, the java type system really annoys me at times...
seqs,
new Function<TestSequence<T>, Sequence<T>>()
{
@Override
public Sequence<T> apply(@Nullable TestSequence<T> input)
{
return input;
}
}
)
)
);
}
@Test
public void testHierarchicalMerge() throws Exception
{
final Sequence<Integer> seq1 = makeUnorderedMergedSequence(
Ordering.<Integer>natural(), Lists.newArrayList(
TestSequence.create(1)
)
);
final Sequence<Integer> seq2 = makeUnorderedMergedSequence(
Ordering.<Integer>natural(), Lists.newArrayList(
TestSequence.create(1)
)
);
final OrderedMergeSequence<Integer> finalMerged = new OrderedMergeSequence<Integer>(
Ordering.<Integer>natural(),
Sequences.simple(
Lists.<Sequence<Integer>>newArrayList(seq1, seq2)
)
);
SequenceTestHelper.testAll(finalMerged, Arrays.asList(1, 1));
}
}

View File

@ -22,17 +22,24 @@
package io.druid.query;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import java.util.ArrayList;
public class UnionQueryRunner<T> implements QueryRunner<T>
{
private final QueryRunner<T> baseRunner;
private final QueryToolChest<T, Query<T>> toolChest;
public UnionQueryRunner(QueryRunner<T> baseRunner)
public UnionQueryRunner(
QueryRunner<T> baseRunner,
QueryToolChest<T, Query<T>> toolChest
)
{
this.baseRunner = baseRunner;
this.toolChest = toolChest;
}
@Override
@ -40,8 +47,9 @@ public class UnionQueryRunner<T> implements QueryRunner<T>
{
DataSource dataSource = query.getDataSource();
if (dataSource instanceof UnionDataSource) {
return Sequences.concat(
Iterables.transform(
return toolChest.mergeSequences(
Sequences.simple(
Lists.transform(
((UnionDataSource) dataSource).getDataSources(),
new Function<DataSource, Sequence<T>>()
{
@ -54,6 +62,7 @@ public class UnionQueryRunner<T> implements QueryRunner<T>
}
}
)
)
);
} else {
return baseRunner.run(query);

View File

@ -87,7 +87,8 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
}
},
toolChest.preMergeQueryDecoration(baseClient)
).withWaitMeasuredFromNow()
).withWaitMeasuredFromNow(),
toolChest
)
)
),