fix Union Query result merging

This commit is contained in:
nishantmonu51 2014-09-03 16:38:02 +05:30
parent a4930375f3
commit 268a42bd79
6 changed files with 177 additions and 15 deletions

View File

@ -21,6 +21,7 @@ package io.druid.query;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Function;
import com.google.common.collect.Ordering;
import com.metamx.common.guava.Sequence;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.query.aggregation.MetricManipulationFn;
@ -79,4 +80,6 @@ public abstract class QueryToolChest<ResultType, QueryType extends Query<ResultT
{
return segments;
}
public abstract Ordering<ResultType> getOrdering();
}

View File

@ -23,11 +23,10 @@ package io.druid.query;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
import com.metamx.common.guava.MergeSequence;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import java.util.ArrayList;
public class UnionQueryRunner<T> implements QueryRunner<T>
{
private final QueryRunner<T> baseRunner;
@ -47,7 +46,8 @@ public class UnionQueryRunner<T> implements QueryRunner<T>
{
DataSource dataSource = query.getDataSource();
if (dataSource instanceof UnionDataSource) {
return toolChest.mergeSequences(
return new MergeSequence<T>(
toolChest.getOrdering(),
Sequences.simple(
Lists.transform(
((UnionDataSource) dataSource).getDataSources(),

View File

@ -193,7 +193,13 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
@Override
public Sequence<Row> mergeSequences(Sequence<Sequence<Row>> seqOfSequences)
{
return new OrderedMergeSequence<>(Ordering.<Row>natural().nullsFirst(), seqOfSequences);
return new OrderedMergeSequence<>(getOrdering(), seqOfSequences);
}
@Override
public Ordering<Row> getOrdering()
{
return Ordering.<Row>natural().nullsFirst();
}
@Override
@ -372,7 +378,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
@Override
public Sequence<Row> mergeSequences(Sequence<Sequence<Row>> seqOfSequences)
{
return new MergeSequence<>(Ordering.<Row>natural().nullsFirst(), seqOfSequences);
return new MergeSequence<>(getOrdering(), seqOfSequences);
}
};
}

View File

@ -234,7 +234,8 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest<SegmentAn
};
}
private Ordering<SegmentAnalysis> getOrdering()
@Override
public Ordering<SegmentAnalysis> getOrdering()
{
return new Ordering<SegmentAnalysis>()
{

View File

@ -21,12 +21,18 @@ package io.druid.query.timeseries;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import io.druid.query.Druids;
import io.druid.query.Query;
import io.druid.query.QueryConfig;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.QueryToolChest;
import io.druid.query.Result;
import io.druid.query.TableDataSource;
import io.druid.query.UnionDataSource;
import io.druid.query.UnionQueryRunner;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.segment.TestHelper;
@ -43,6 +49,15 @@ import java.util.List;
@RunWith(Parameterized.class)
public class TimeSeriesUnionQueryRunnerTest
{
private final QueryRunner runner;
public TimeSeriesUnionQueryRunnerTest(
QueryRunner runner
)
{
this.runner = runner;
}
@Parameterized.Parameters
public static Collection<?> constructorFeeder() throws IOException
{
@ -55,15 +70,6 @@ public class TimeSeriesUnionQueryRunnerTest
);
}
private final QueryRunner runner;
public TimeSeriesUnionQueryRunnerTest(
QueryRunner runner
)
{
this.runner = runner;
}
@Test
public void testUnionTimeseries()
{
@ -106,6 +112,145 @@ public class TimeSeriesUnionQueryRunnerTest
TestHelper.assertExpectedResults(expectedResults, results);
}
@Test
public void testUnionResultMerging()
{
TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
.dataSource(
new UnionDataSource(
Lists.newArrayList(
new TableDataSource("ds1"),
new TableDataSource("ds2")
)
)
)
.granularity(QueryRunnerTestHelper.dayGran)
.intervals(QueryRunnerTestHelper.firstToThird)
.aggregators(
Arrays.<AggregatorFactory>asList(
QueryRunnerTestHelper.rowsCount,
new LongSumAggregatorFactory(
"idx",
"index"
)
)
)
.build();
QueryToolChest toolChest = new TimeseriesQueryQueryToolChest(new QueryConfig());
QueryRunner mergingrunner = toolChest.mergeResults(
new UnionQueryRunner<Result<TimeseriesResultValue>>(
new QueryRunner<Result<TimeseriesResultValue>>()
{
@Override
public Sequence<Result<TimeseriesResultValue>> run(Query<Result<TimeseriesResultValue>> query)
{
if (query.getDataSource().equals(new TableDataSource("ds1"))) {
return Sequences.simple(
Lists.newArrayList(
new Result<TimeseriesResultValue>(
new DateTime("2011-04-02"),
new TimeseriesResultValue(
ImmutableMap.<String, Object>of(
"rows",
1L,
"idx",
2L
)
)
),
new Result<TimeseriesResultValue>(
new DateTime("2011-04-03"),
new TimeseriesResultValue(
ImmutableMap.<String, Object>of(
"rows",
3L,
"idx",
4L
)
)
)
)
);
} else {
return Sequences.simple(
Lists.newArrayList(
new Result<TimeseriesResultValue>(
new DateTime("2011-04-01"),
new TimeseriesResultValue(
ImmutableMap.<String, Object>of(
"rows",
5L,
"idx",
6L
)
)
),
new Result<TimeseriesResultValue>(
new DateTime("2011-04-02"),
new TimeseriesResultValue(
ImmutableMap.<String, Object>of(
"rows",
7L,
"idx",
8L
)
)
),
new Result<TimeseriesResultValue>(
new DateTime("2011-04-04"),
new TimeseriesResultValue(
ImmutableMap.<String, Object>of(
"rows",
9L,
"idx",
10L
)
)
)
)
);
}
}
},
toolChest
)
);
List<Result<TimeseriesResultValue>> expectedResults = Arrays.asList(
new Result<TimeseriesResultValue>(
new DateTime("2011-04-01"),
new TimeseriesResultValue(
ImmutableMap.<String, Object>of("rows", 5L, "idx", 6L)
)
),
new Result<TimeseriesResultValue>(
new DateTime("2011-04-02"),
new TimeseriesResultValue(
ImmutableMap.<String, Object>of("rows", 8L, "idx", 10L)
)
),
new Result<TimeseriesResultValue>(
new DateTime("2011-04-03"),
new TimeseriesResultValue(
ImmutableMap.<String, Object>of("rows", 3L, "idx", 4L)
)
),
new Result<TimeseriesResultValue>(
new DateTime("2011-04-04"),
new TimeseriesResultValue(
ImmutableMap.<String, Object>of("rows", 9L, "idx", 10L)
)
)
);
Iterable<Result<TimeseriesResultValue>> results = Sequences.toList(
mergingrunner.run(query),
Lists.<Result<TimeseriesResultValue>>newArrayList()
);
System.out.println(results);
TestHelper.assertExpectedResults(expectedResults, results);
}
}

View File

@ -26,6 +26,7 @@ import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.metamx.common.IAE;
import com.metamx.common.MapUtils;
import com.metamx.common.Pair;
@ -590,6 +591,12 @@ public class ServerManagerTest
{
};
}
@Override
public Ordering<T> getOrdering()
{
return null;
}
}
private static class SegmentForTesting implements Segment