review comments - move merging to tool chest

This commit is contained in:
nishantmonu51 2014-09-04 11:31:04 +05:30
parent 268a42bd79
commit 2772a78b3b
11 changed files with 68 additions and 33 deletions

View File

@ -42,10 +42,11 @@ public abstract class QueryToolChest<ResultType, QueryType extends Query<ResultT
* This method doesn't belong here, but it's here for now just to make it work.
*
* @param seqOfSequences
* @param ordered whether the seqOfSequences is ordered or not.
*
* @return
*/
public abstract Sequence<ResultType> mergeSequences(Sequence<Sequence<ResultType>> seqOfSequences);
public abstract Sequence<ResultType> mergeSequences(Sequence<Sequence<ResultType>> seqOfSequences, boolean ordered);
public abstract ServiceMetricEvent.Builder makeMetricBuilder(QueryType query);
@ -80,6 +81,4 @@ public abstract class QueryToolChest<ResultType, QueryType extends Query<ResultT
{
return segments;
}
public abstract Ordering<ResultType> getOrdering();
}

View File

@ -46,8 +46,7 @@ public class UnionQueryRunner<T> implements QueryRunner<T>
{
DataSource dataSource = query.getDataSource();
if (dataSource instanceof UnionDataSource) {
return new MergeSequence<T>(
toolChest.getOrdering(),
return toolChest.mergeSequences(
Sequences.simple(
Lists.transform(
((UnionDataSource) dataSource).getDataSources(),
@ -62,7 +61,8 @@ public class UnionQueryRunner<T> implements QueryRunner<T>
}
}
)
)
),
false
);
} else {
return baseRunner.run(query);

View File

@ -191,13 +191,16 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
@Override
public Sequence<Row> mergeSequences(Sequence<Sequence<Row>> seqOfSequences)
public Sequence<Row> mergeSequences(Sequence<Sequence<Row>> seqOfSequences, boolean ordered)
{
return new OrderedMergeSequence<>(getOrdering(), seqOfSequences);
if (ordered) {
return new OrderedMergeSequence<>(getOrdering(), seqOfSequences);
} else {
return new MergeSequence<>(getOrdering(), seqOfSequences);
}
}
@Override
public Ordering<Row> getOrdering()
private Ordering<Row> getOrdering()
{
return Ordering.<Row>natural().nullsFirst();
}

View File

@ -144,9 +144,13 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest<SegmentAn
}
@Override
public Sequence<SegmentAnalysis> mergeSequences(Sequence<Sequence<SegmentAnalysis>> seqOfSequences)
public Sequence<SegmentAnalysis> mergeSequences(Sequence<Sequence<SegmentAnalysis>> seqOfSequences, boolean ordered)
{
return new OrderedMergeSequence<SegmentAnalysis>(getOrdering(), seqOfSequences);
if (ordered) {
return new OrderedMergeSequence<>(getOrdering(), seqOfSequences);
} else {
return new MergeSequence<>(getOrdering(), seqOfSequences);
}
}
@Override
@ -234,8 +238,7 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest<SegmentAn
};
}
@Override
public Ordering<SegmentAnalysis> getOrdering()
private Ordering<SegmentAnalysis> getOrdering()
{
return new Ordering<SegmentAnalysis>()
{

View File

@ -109,9 +109,16 @@ public class SearchQueryQueryToolChest extends QueryToolChest<Result<SearchResul
}
@Override
public Sequence<Result<SearchResultValue>> mergeSequences(Sequence<Sequence<Result<SearchResultValue>>> seqOfSequences)
public Sequence<Result<SearchResultValue>> mergeSequences(
Sequence<Sequence<Result<SearchResultValue>>> seqOfSequences,
boolean ordered
)
{
return new OrderedMergeSequence<Result<SearchResultValue>>(getOrdering(), seqOfSequences);
if (ordered) {
return new OrderedMergeSequence<>(getOrdering(), seqOfSequences);
} else {
return new MergeSequence<>(getOrdering(), seqOfSequences);
}
}
@Override

View File

@ -111,9 +111,16 @@ public class SelectQueryQueryToolChest extends QueryToolChest<Result<SelectResul
}
@Override
public Sequence<Result<SelectResultValue>> mergeSequences(Sequence<Sequence<Result<SelectResultValue>>> seqOfSequences)
public Sequence<Result<SelectResultValue>> mergeSequences(
Sequence<Sequence<Result<SelectResultValue>>> seqOfSequences,
boolean ordered
)
{
return new OrderedMergeSequence<Result<SelectResultValue>>(getOrdering(), seqOfSequences);
if (ordered) {
return new OrderedMergeSequence<>(getOrdering(), seqOfSequences);
} else {
return new MergeSequence<>(getOrdering(), seqOfSequences);
}
}
@Override

View File

@ -109,9 +109,16 @@ public class TimeBoundaryQueryQueryToolChest
}
@Override
public Sequence<Result<TimeBoundaryResultValue>> mergeSequences(Sequence<Sequence<Result<TimeBoundaryResultValue>>> seqOfSequences)
public Sequence<Result<TimeBoundaryResultValue>> mergeSequences(
Sequence<Sequence<Result<TimeBoundaryResultValue>>> seqOfSequences,
boolean ordered
)
{
return new OrderedMergeSequence<>(getOrdering(), seqOfSequences);
if (ordered) {
return new OrderedMergeSequence<>(getOrdering(), seqOfSequences);
} else {
return new MergeSequence<>(getOrdering(), seqOfSequences);
}
}
@Override

View File

@ -110,9 +110,16 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<Timeser
}
@Override
public Sequence<Result<TimeseriesResultValue>> mergeSequences(Sequence<Sequence<Result<TimeseriesResultValue>>> seqOfSequences)
public Sequence<Result<TimeseriesResultValue>> mergeSequences(
Sequence<Sequence<Result<TimeseriesResultValue>>> seqOfSequences,
boolean ordered
)
{
return new OrderedMergeSequence<Result<TimeseriesResultValue>>(getOrdering(), seqOfSequences);
if (ordered) {
return new OrderedMergeSequence<>(getOrdering(), seqOfSequences);
} else {
return new MergeSequence<>(getOrdering(), seqOfSequences);
}
}
@Override

View File

@ -126,9 +126,16 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
}
@Override
public Sequence<Result<TopNResultValue>> mergeSequences(Sequence<Sequence<Result<TopNResultValue>>> seqOfSequences)
public Sequence<Result<TopNResultValue>> mergeSequences(
Sequence<Sequence<Result<TopNResultValue>>> seqOfSequences,
boolean ordered
)
{
return new OrderedMergeSequence<Result<TopNResultValue>>(getOrdering(), seqOfSequences);
if (ordered) {
return new OrderedMergeSequence<>(getOrdering(), seqOfSequences);
} else {
return new MergeSequence<>(getOrdering(), seqOfSequences);
}
}
@Override

View File

@ -265,7 +265,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
Iterables.transform(listOfSequences, Pair.<DateTime, Sequence<T>>rhsFn())
);
if (strategy == null) {
return toolChest.mergeSequences(seq);
return toolChest.mergeSequences(seq, true);
} else {
return strategy.mergeSequences(seq);
}
@ -383,7 +383,8 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
);
}
}
)
),
true
);
}

View File

@ -567,7 +567,7 @@ public class ServerManagerTest
}
@Override
public Sequence<T> mergeSequences(Sequence<Sequence<T>> seqOfSequences)
public Sequence<T> mergeSequences(Sequence<Sequence<T>> seqOfSequences, boolean ordered)
{
return new ConcatSequence<T>(seqOfSequences);
}
@ -591,12 +591,6 @@ public class ServerManagerTest
{
};
}
@Override
public Ordering<T> getOrdering()
{
return null;
}
}
private static class SegmentForTesting implements Segment