mirror of https://github.com/apache/druid.git
Merge pull request #721 from metamx/fix-union-query
fix Union Query result merging
This commit is contained in:
commit
c4c54f7bc9
|
@ -46,6 +46,9 @@ public abstract class QueryToolChest<ResultType, QueryType extends Query<ResultT
|
||||||
*/
|
*/
|
||||||
public abstract Sequence<ResultType> mergeSequences(Sequence<Sequence<ResultType>> seqOfSequences);
|
public abstract Sequence<ResultType> mergeSequences(Sequence<Sequence<ResultType>> seqOfSequences);
|
||||||
|
|
||||||
|
public abstract Sequence<ResultType> mergeSequencesUnordered(Sequence<Sequence<ResultType>> seqOfSequences);
|
||||||
|
|
||||||
|
|
||||||
public abstract ServiceMetricEvent.Builder makeMetricBuilder(QueryType query);
|
public abstract ServiceMetricEvent.Builder makeMetricBuilder(QueryType query);
|
||||||
|
|
||||||
public abstract Function<ResultType, ResultType> makePreComputeManipulatorFn(
|
public abstract Function<ResultType, ResultType> makePreComputeManipulatorFn(
|
||||||
|
|
|
@ -45,7 +45,7 @@ public class UnionQueryRunner<T> implements QueryRunner<T>
|
||||||
{
|
{
|
||||||
DataSource dataSource = query.getDataSource();
|
DataSource dataSource = query.getDataSource();
|
||||||
if (dataSource instanceof UnionDataSource) {
|
if (dataSource instanceof UnionDataSource) {
|
||||||
return toolChest.mergeSequences(
|
return toolChest.mergeSequencesUnordered(
|
||||||
Sequences.simple(
|
Sequences.simple(
|
||||||
Lists.transform(
|
Lists.transform(
|
||||||
((UnionDataSource) dataSource).getDataSources(),
|
((UnionDataSource) dataSource).getDataSources(),
|
||||||
|
|
|
@ -193,7 +193,18 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
|
||||||
@Override
|
@Override
|
||||||
public Sequence<Row> mergeSequences(Sequence<Sequence<Row>> seqOfSequences)
|
public Sequence<Row> mergeSequences(Sequence<Sequence<Row>> seqOfSequences)
|
||||||
{
|
{
|
||||||
return new OrderedMergeSequence<>(Ordering.<Row>natural().nullsFirst(), seqOfSequences);
|
return new OrderedMergeSequence<>(getOrdering(), seqOfSequences);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Sequence<Row> mergeSequencesUnordered(Sequence<Sequence<Row>> seqOfSequences)
|
||||||
|
{
|
||||||
|
return new MergeSequence<>(getOrdering(), seqOfSequences);
|
||||||
|
}
|
||||||
|
|
||||||
|
private Ordering<Row> getOrdering()
|
||||||
|
{
|
||||||
|
return Ordering.<Row>natural().nullsFirst();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -372,7 +383,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
|
||||||
@Override
|
@Override
|
||||||
public Sequence<Row> mergeSequences(Sequence<Sequence<Row>> seqOfSequences)
|
public Sequence<Row> mergeSequences(Sequence<Sequence<Row>> seqOfSequences)
|
||||||
{
|
{
|
||||||
return new MergeSequence<>(Ordering.<Row>natural().nullsFirst(), seqOfSequences);
|
return new MergeSequence<>(getOrdering(), seqOfSequences);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
|
@ -146,7 +146,13 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest<SegmentAn
|
||||||
@Override
|
@Override
|
||||||
public Sequence<SegmentAnalysis> mergeSequences(Sequence<Sequence<SegmentAnalysis>> seqOfSequences)
|
public Sequence<SegmentAnalysis> mergeSequences(Sequence<Sequence<SegmentAnalysis>> seqOfSequences)
|
||||||
{
|
{
|
||||||
return new OrderedMergeSequence<SegmentAnalysis>(getOrdering(), seqOfSequences);
|
return new OrderedMergeSequence<>(getOrdering(), seqOfSequences);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Sequence<SegmentAnalysis> mergeSequencesUnordered(Sequence<Sequence<SegmentAnalysis>> seqOfSequences)
|
||||||
|
{
|
||||||
|
return new MergeSequence<>(getOrdering(), seqOfSequences);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -111,7 +111,13 @@ public class SearchQueryQueryToolChest extends QueryToolChest<Result<SearchResul
|
||||||
@Override
|
@Override
|
||||||
public Sequence<Result<SearchResultValue>> mergeSequences(Sequence<Sequence<Result<SearchResultValue>>> seqOfSequences)
|
public Sequence<Result<SearchResultValue>> mergeSequences(Sequence<Sequence<Result<SearchResultValue>>> seqOfSequences)
|
||||||
{
|
{
|
||||||
return new OrderedMergeSequence<Result<SearchResultValue>>(getOrdering(), seqOfSequences);
|
return new OrderedMergeSequence<>(getOrdering(), seqOfSequences);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Sequence<Result<SearchResultValue>> mergeSequencesUnordered(Sequence<Sequence<Result<SearchResultValue>>> seqOfSequences)
|
||||||
|
{
|
||||||
|
return new MergeSequence<>(getOrdering(), seqOfSequences);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -113,7 +113,13 @@ public class SelectQueryQueryToolChest extends QueryToolChest<Result<SelectResul
|
||||||
@Override
|
@Override
|
||||||
public Sequence<Result<SelectResultValue>> mergeSequences(Sequence<Sequence<Result<SelectResultValue>>> seqOfSequences)
|
public Sequence<Result<SelectResultValue>> mergeSequences(Sequence<Sequence<Result<SelectResultValue>>> seqOfSequences)
|
||||||
{
|
{
|
||||||
return new OrderedMergeSequence<Result<SelectResultValue>>(getOrdering(), seqOfSequences);
|
return new OrderedMergeSequence<>(getOrdering(), seqOfSequences);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Sequence<Result<SelectResultValue>> mergeSequencesUnordered(Sequence<Sequence<Result<SelectResultValue>>> seqOfSequences)
|
||||||
|
{
|
||||||
|
return new MergeSequence<>(getOrdering(), seqOfSequences);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -113,6 +113,12 @@ public class TimeBoundaryQueryQueryToolChest
|
||||||
return new OrderedMergeSequence<>(getOrdering(), seqOfSequences);
|
return new OrderedMergeSequence<>(getOrdering(), seqOfSequences);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Sequence<Result<TimeBoundaryResultValue>> mergeSequencesUnordered(Sequence<Sequence<Result<TimeBoundaryResultValue>>> seqOfSequences)
|
||||||
|
{
|
||||||
|
return new MergeSequence<>(getOrdering(), seqOfSequences);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ServiceMetricEvent.Builder makeMetricBuilder(TimeBoundaryQuery query)
|
public ServiceMetricEvent.Builder makeMetricBuilder(TimeBoundaryQuery query)
|
||||||
{
|
{
|
||||||
|
|
|
@ -112,7 +112,13 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<Timeser
|
||||||
@Override
|
@Override
|
||||||
public Sequence<Result<TimeseriesResultValue>> mergeSequences(Sequence<Sequence<Result<TimeseriesResultValue>>> seqOfSequences)
|
public Sequence<Result<TimeseriesResultValue>> mergeSequences(Sequence<Sequence<Result<TimeseriesResultValue>>> seqOfSequences)
|
||||||
{
|
{
|
||||||
return new OrderedMergeSequence<Result<TimeseriesResultValue>>(getOrdering(), seqOfSequences);
|
return new OrderedMergeSequence<>(getOrdering(), seqOfSequences);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Sequence<Result<TimeseriesResultValue>> mergeSequencesUnordered(Sequence<Sequence<Result<TimeseriesResultValue>>> seqOfSequences)
|
||||||
|
{
|
||||||
|
return new MergeSequence<>(getOrdering(), seqOfSequences);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -72,7 +72,6 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
|
||||||
private static final TypeReference<Object> OBJECT_TYPE_REFERENCE = new TypeReference<Object>()
|
private static final TypeReference<Object> OBJECT_TYPE_REFERENCE = new TypeReference<Object>()
|
||||||
{
|
{
|
||||||
};
|
};
|
||||||
|
|
||||||
private final TopNQueryConfig config;
|
private final TopNQueryConfig config;
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
|
@ -128,7 +127,13 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
|
||||||
@Override
|
@Override
|
||||||
public Sequence<Result<TopNResultValue>> mergeSequences(Sequence<Sequence<Result<TopNResultValue>>> seqOfSequences)
|
public Sequence<Result<TopNResultValue>> mergeSequences(Sequence<Sequence<Result<TopNResultValue>>> seqOfSequences)
|
||||||
{
|
{
|
||||||
return new OrderedMergeSequence<Result<TopNResultValue>>(getOrdering(), seqOfSequences);
|
return new OrderedMergeSequence<>(getOrdering(), seqOfSequences);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Sequence<Result<TopNResultValue>> mergeSequencesUnordered(Sequence<Sequence<Result<TopNResultValue>>> seqOfSequences)
|
||||||
|
{
|
||||||
|
return new MergeSequence<>(getOrdering(), seqOfSequences);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -21,12 +21,18 @@ package io.druid.query.timeseries;
|
||||||
|
|
||||||
import com.google.common.collect.ImmutableMap;
|
import com.google.common.collect.ImmutableMap;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
|
import com.metamx.common.guava.Sequence;
|
||||||
import com.metamx.common.guava.Sequences;
|
import com.metamx.common.guava.Sequences;
|
||||||
import io.druid.query.Druids;
|
import io.druid.query.Druids;
|
||||||
|
import io.druid.query.Query;
|
||||||
import io.druid.query.QueryConfig;
|
import io.druid.query.QueryConfig;
|
||||||
import io.druid.query.QueryRunner;
|
import io.druid.query.QueryRunner;
|
||||||
import io.druid.query.QueryRunnerTestHelper;
|
import io.druid.query.QueryRunnerTestHelper;
|
||||||
|
import io.druid.query.QueryToolChest;
|
||||||
import io.druid.query.Result;
|
import io.druid.query.Result;
|
||||||
|
import io.druid.query.TableDataSource;
|
||||||
|
import io.druid.query.UnionDataSource;
|
||||||
|
import io.druid.query.UnionQueryRunner;
|
||||||
import io.druid.query.aggregation.AggregatorFactory;
|
import io.druid.query.aggregation.AggregatorFactory;
|
||||||
import io.druid.query.aggregation.LongSumAggregatorFactory;
|
import io.druid.query.aggregation.LongSumAggregatorFactory;
|
||||||
import io.druid.segment.TestHelper;
|
import io.druid.segment.TestHelper;
|
||||||
|
@ -43,6 +49,15 @@ import java.util.List;
|
||||||
@RunWith(Parameterized.class)
|
@RunWith(Parameterized.class)
|
||||||
public class TimeSeriesUnionQueryRunnerTest
|
public class TimeSeriesUnionQueryRunnerTest
|
||||||
{
|
{
|
||||||
|
private final QueryRunner runner;
|
||||||
|
|
||||||
|
public TimeSeriesUnionQueryRunnerTest(
|
||||||
|
QueryRunner runner
|
||||||
|
)
|
||||||
|
{
|
||||||
|
this.runner = runner;
|
||||||
|
}
|
||||||
|
|
||||||
@Parameterized.Parameters
|
@Parameterized.Parameters
|
||||||
public static Collection<?> constructorFeeder() throws IOException
|
public static Collection<?> constructorFeeder() throws IOException
|
||||||
{
|
{
|
||||||
|
@ -55,15 +70,6 @@ public class TimeSeriesUnionQueryRunnerTest
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
private final QueryRunner runner;
|
|
||||||
|
|
||||||
public TimeSeriesUnionQueryRunnerTest(
|
|
||||||
QueryRunner runner
|
|
||||||
)
|
|
||||||
{
|
|
||||||
this.runner = runner;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testUnionTimeseries()
|
public void testUnionTimeseries()
|
||||||
{
|
{
|
||||||
|
@ -106,6 +112,145 @@ public class TimeSeriesUnionQueryRunnerTest
|
||||||
TestHelper.assertExpectedResults(expectedResults, results);
|
TestHelper.assertExpectedResults(expectedResults, results);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUnionResultMerging()
|
||||||
|
{
|
||||||
|
TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
|
||||||
|
.dataSource(
|
||||||
|
new UnionDataSource(
|
||||||
|
Lists.newArrayList(
|
||||||
|
new TableDataSource("ds1"),
|
||||||
|
new TableDataSource("ds2")
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
.granularity(QueryRunnerTestHelper.dayGran)
|
||||||
|
.intervals(QueryRunnerTestHelper.firstToThird)
|
||||||
|
.aggregators(
|
||||||
|
Arrays.<AggregatorFactory>asList(
|
||||||
|
QueryRunnerTestHelper.rowsCount,
|
||||||
|
new LongSumAggregatorFactory(
|
||||||
|
"idx",
|
||||||
|
"index"
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
.build();
|
||||||
|
QueryToolChest toolChest = new TimeseriesQueryQueryToolChest(new QueryConfig());
|
||||||
|
QueryRunner mergingrunner = toolChest.mergeResults(
|
||||||
|
new UnionQueryRunner<Result<TimeseriesResultValue>>(
|
||||||
|
new QueryRunner<Result<TimeseriesResultValue>>()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public Sequence<Result<TimeseriesResultValue>> run(Query<Result<TimeseriesResultValue>> query)
|
||||||
|
{
|
||||||
|
if (query.getDataSource().equals(new TableDataSource("ds1"))) {
|
||||||
|
return Sequences.simple(
|
||||||
|
Lists.newArrayList(
|
||||||
|
new Result<TimeseriesResultValue>(
|
||||||
|
new DateTime("2011-04-02"),
|
||||||
|
new TimeseriesResultValue(
|
||||||
|
ImmutableMap.<String, Object>of(
|
||||||
|
"rows",
|
||||||
|
1L,
|
||||||
|
"idx",
|
||||||
|
2L
|
||||||
|
)
|
||||||
|
)
|
||||||
|
),
|
||||||
|
new Result<TimeseriesResultValue>(
|
||||||
|
new DateTime("2011-04-03"),
|
||||||
|
new TimeseriesResultValue(
|
||||||
|
ImmutableMap.<String, Object>of(
|
||||||
|
"rows",
|
||||||
|
3L,
|
||||||
|
"idx",
|
||||||
|
4L
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
return Sequences.simple(
|
||||||
|
Lists.newArrayList(
|
||||||
|
new Result<TimeseriesResultValue>(
|
||||||
|
new DateTime("2011-04-01"),
|
||||||
|
new TimeseriesResultValue(
|
||||||
|
ImmutableMap.<String, Object>of(
|
||||||
|
"rows",
|
||||||
|
5L,
|
||||||
|
"idx",
|
||||||
|
6L
|
||||||
|
)
|
||||||
|
)
|
||||||
|
),
|
||||||
|
new Result<TimeseriesResultValue>(
|
||||||
|
new DateTime("2011-04-02"),
|
||||||
|
new TimeseriesResultValue(
|
||||||
|
ImmutableMap.<String, Object>of(
|
||||||
|
"rows",
|
||||||
|
7L,
|
||||||
|
"idx",
|
||||||
|
8L
|
||||||
|
)
|
||||||
|
)
|
||||||
|
),
|
||||||
|
new Result<TimeseriesResultValue>(
|
||||||
|
new DateTime("2011-04-04"),
|
||||||
|
new TimeseriesResultValue(
|
||||||
|
ImmutableMap.<String, Object>of(
|
||||||
|
"rows",
|
||||||
|
9L,
|
||||||
|
"idx",
|
||||||
|
10L
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
toolChest
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
List<Result<TimeseriesResultValue>> expectedResults = Arrays.asList(
|
||||||
|
new Result<TimeseriesResultValue>(
|
||||||
|
new DateTime("2011-04-01"),
|
||||||
|
new TimeseriesResultValue(
|
||||||
|
ImmutableMap.<String, Object>of("rows", 5L, "idx", 6L)
|
||||||
|
)
|
||||||
|
),
|
||||||
|
new Result<TimeseriesResultValue>(
|
||||||
|
new DateTime("2011-04-02"),
|
||||||
|
new TimeseriesResultValue(
|
||||||
|
ImmutableMap.<String, Object>of("rows", 8L, "idx", 10L)
|
||||||
|
)
|
||||||
|
),
|
||||||
|
new Result<TimeseriesResultValue>(
|
||||||
|
new DateTime("2011-04-03"),
|
||||||
|
new TimeseriesResultValue(
|
||||||
|
ImmutableMap.<String, Object>of("rows", 3L, "idx", 4L)
|
||||||
|
)
|
||||||
|
),
|
||||||
|
new Result<TimeseriesResultValue>(
|
||||||
|
new DateTime("2011-04-04"),
|
||||||
|
new TimeseriesResultValue(
|
||||||
|
ImmutableMap.<String, Object>of("rows", 9L, "idx", 10L)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
Iterable<Result<TimeseriesResultValue>> results = Sequences.toList(
|
||||||
|
mergingrunner.run(query),
|
||||||
|
Lists.<Result<TimeseriesResultValue>>newArrayList()
|
||||||
|
);
|
||||||
|
|
||||||
|
System.out.println(results);
|
||||||
|
TestHelper.assertExpectedResults(expectedResults, results);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -571,6 +571,12 @@ public class ServerManagerTest
|
||||||
return new ConcatSequence<T>(seqOfSequences);
|
return new ConcatSequence<T>(seqOfSequences);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Sequence<T> mergeSequencesUnordered(Sequence<Sequence<T>> seqOfSequences)
|
||||||
|
{
|
||||||
|
return new ConcatSequence<T>(seqOfSequences);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ServiceMetricEvent.Builder makeMetricBuilder(QueryType query)
|
public ServiceMetricEvent.Builder makeMetricBuilder(QueryType query)
|
||||||
{
|
{
|
||||||
|
|
Loading…
Reference in New Issue