address code review comments

This commit is contained in:
fjy 2014-07-16 13:52:06 -07:00
parent de8cb55260
commit b70a6b1061
17 changed files with 36 additions and 131 deletions

View File

@ -35,7 +35,5 @@ public interface CacheStrategy<T, CacheType, QueryType extends Query<T>>
public Function<CacheType, T> pullFromCache();
public int getCacheLimit();
public Sequence<T> mergeSequences(Sequence<Sequence<T>> seqOfSequences);
}

View File

@ -84,30 +84,30 @@ public class GroupByParallelQueryRunner<T> implements QueryRunner<T>
if (Iterables.isEmpty(queryables)) {
log.warn("No queryables found.");
}
ListenableFuture<List<Boolean>> futures = Futures.allAsList(
ListenableFuture<List<Void>> futures = Futures.allAsList(
Lists.newArrayList(
Iterables.transform(
queryables,
new Function<QueryRunner<T>, ListenableFuture<Boolean>>()
new Function<QueryRunner<T>, ListenableFuture<Void>>()
{
@Override
public ListenableFuture<Boolean> apply(final QueryRunner<T> input)
public ListenableFuture<Void> apply(final QueryRunner<T> input)
{
return exec.submit(
new AbstractPrioritizedCallable<Boolean>(priority)
new AbstractPrioritizedCallable<Void>(priority)
{
@Override
public Boolean call() throws Exception
public Void call() throws Exception
{
try {
if (bySegment) {
input.run(queryParam)
.accumulate(bySegmentAccumulatorPair.lhs, bySegmentAccumulatorPair.rhs);
return true;
} else {
input.run(queryParam).accumulate(indexAccumulatorPair.lhs, indexAccumulatorPair.rhs);
}
input.run(queryParam).accumulate(indexAccumulatorPair.lhs, indexAccumulatorPair.rhs);
return true;
return null;
}
catch (QueryInterruptedException e) {
throw Throwables.propagate(e);

View File

@ -29,16 +29,8 @@ public class QueryConfig
@JsonProperty
private Period chunkPeriod = new Period();
@JsonProperty
private int maxResultsToCache = Integer.MAX_VALUE;
public Period getChunkPeriod()
{
return chunkPeriod;
}
public int getMaxResultsToCache()
{
return maxResultsToCache;
}
}

View File

@ -35,9 +35,6 @@ public class GroupByQueryConfig extends QueryConfig
@JsonProperty
private int maxResults = 500000;
@JsonProperty
private int maxResultsToCache = 10000;
public boolean isSingleThreaded()
{
return singleThreaded;
@ -57,10 +54,4 @@ public class GroupByQueryConfig extends QueryConfig
{
return maxResults;
}
@Override
public int getMaxResultsToCache()
{
return maxResultsToCache;
}
}

View File

@ -160,7 +160,22 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
private Sequence<Row> postAggregate(final GroupByQuery query, IncrementalIndex index)
{
return Sequences.simple(index.iterableWithPostAggregations(query.getPostAggregatorSpecs()));
return Sequences.map(
Sequences.simple(index.iterableWithPostAggregations(query.getPostAggregatorSpecs())),
new Function<Row, Row>()
{
@Override
public Row apply(Row input)
{
final MapBasedRow row = (MapBasedRow) input;
return new MapBasedRow(
query.getGranularity()
.toDateTime(row.getTimestampFromEpoch()),
row.getEvent()
);
}
}
);
}
private IncrementalIndex makeIncrementalIndex(GroupByQuery query, Sequence<Row> rows)
@ -179,7 +194,6 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
public Sequence<Row> mergeSequences(Sequence<Sequence<Row>> seqOfSequences)
{
return new OrderedMergeSequence<>(Ordering.<Row>natural().nullsFirst(), seqOfSequences);
//return new ConcatSequence<>(seqOfSequences);
}
@Override
@ -341,12 +355,6 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
};
}
@Override
public int getCacheLimit()
{
return configSupplier.get().getMaxResultsToCache();
}
@Override
public Sequence<Row> mergeSequences(Sequence<Sequence<Row>> seqOfSequences)
{

View File

@ -117,11 +117,11 @@ public class GroupByQueryRunnerFactory implements QueryRunnerFactory<Row, GroupB
final int priority = query.getContextPriority(0);
final boolean bySegment = query.getContextBySegment(false);
final ListenableFuture<Boolean> future = queryExecutor.submit(
new AbstractPrioritizedCallable<Boolean>(priority)
final ListenableFuture<Void> future = queryExecutor.submit(
new AbstractPrioritizedCallable<Void>(priority)
{
@Override
public Boolean call() throws Exception
public Void call() throws Exception
{
if (bySegment) {
input.run(queryParam)
@ -129,11 +129,11 @@ public class GroupByQueryRunnerFactory implements QueryRunnerFactory<Row, GroupB
bySegmentAccumulatorPair.lhs,
bySegmentAccumulatorPair.rhs
);
return true;
} else {
input.run(query).accumulate(indexAccumulatorPair.lhs, indexAccumulatorPair.rhs);
}
input.run(query).accumulate(indexAccumulatorPair.lhs, indexAccumulatorPair.rhs);
return true;
return null;
}
}
);

View File

@ -226,12 +226,6 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest<SegmentAn
};
}
@Override
public int getCacheLimit()
{
return config.getMaxResultsToCache();
}
@Override
public Sequence<SegmentAnalysis> mergeSequences(Sequence<Sequence<SegmentAnalysis>> seqOfSequences)
{

View File

@ -248,13 +248,6 @@ public class SearchQueryQueryToolChest extends QueryToolChest<Result<SearchResul
};
}
@Override
public int getCacheLimit()
{
return config.getMaxResultsToCache();
}
@Override
public Sequence<Result<SearchResultValue>> mergeSequences(Sequence<Sequence<Result<SearchResultValue>>> seqOfSequences)
{

View File

@ -1,37 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.select;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.druid.query.QueryConfig;
/**
*/
public class SelectQueryConfig extends QueryConfig
{
@JsonProperty
private int maxResultsToCache = 10000;
@Override
public int getMaxResultsToCache()
{
return maxResultsToCache;
}
}

View File

@ -71,11 +71,11 @@ public class SelectQueryQueryToolChest extends QueryToolChest<Result<SelectResul
{
};
private final SelectQueryConfig config;
private final QueryConfig config;
private final ObjectMapper jsonMapper;
@Inject
public SelectQueryQueryToolChest(SelectQueryConfig config, ObjectMapper jsonMapper)
public SelectQueryQueryToolChest(QueryConfig config, ObjectMapper jsonMapper)
{
this.config = config;
this.jsonMapper = jsonMapper;
@ -268,12 +268,6 @@ public class SelectQueryQueryToolChest extends QueryToolChest<Result<SelectResul
};
}
@Override
public int getCacheLimit()
{
return config.getMaxResultsToCache();
}
@Override
public Sequence<Result<SelectResultValue>> mergeSequences(Sequence<Sequence<Result<SelectResultValue>>> seqOfSequences)
{

View File

@ -189,12 +189,6 @@ public class TimeBoundaryQueryQueryToolChest
};
}
@Override
public int getCacheLimit()
{
return Integer.MAX_VALUE;
}
@Override
public Sequence<Result<TimeBoundaryResultValue>> mergeSequences(Sequence<Sequence<Result<TimeBoundaryResultValue>>> seqOfSequences)
{

View File

@ -220,12 +220,6 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<Timeser
};
}
@Override
public int getCacheLimit()
{
return config.getMaxResultsToCache();
}
@Override
public Sequence<Result<TimeseriesResultValue>> mergeSequences(Sequence<Sequence<Result<TimeseriesResultValue>>> seqOfSequences)
{

View File

@ -368,12 +368,6 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
};
}
@Override
public int getCacheLimit()
{
return config.getMaxResultsToCache();
}
@Override
public Sequence<Result<TopNResultValue>> mergeSequences(Sequence<Sequence<Result<TopNResultValue>>> seqOfSequences)
{

View File

@ -534,7 +534,7 @@ public class IncrementalIndex implements Iterable<Row>
}
}
return new MapBasedRow(gran.toDateTime(timeAndDims.getTimestamp()), theVals);
return new MapBasedRow(timeAndDims.getTimestamp(), theVals);
}
}
);

View File

@ -59,7 +59,7 @@ public class SelectQueryRunnerTest
{
return QueryRunnerTestHelper.makeQueryRunners(
new SelectQueryRunnerFactory(
new SelectQueryQueryToolChest(new SelectQueryConfig(), new DefaultObjectMapper()),
new SelectQueryQueryToolChest(new QueryConfig(), new DefaultObjectMapper()),
new SelectQueryEngine(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
)

View File

@ -35,7 +35,6 @@ import io.druid.query.search.SearchQueryQueryToolChest;
import io.druid.query.search.search.SearchQuery;
import io.druid.query.search.search.SearchQueryConfig;
import io.druid.query.select.SelectQuery;
import io.druid.query.select.SelectQueryConfig;
import io.druid.query.select.SelectQueryQueryToolChest;
import io.druid.query.timeboundary.TimeBoundaryQuery;
import io.druid.query.timeboundary.TimeBoundaryQueryQueryToolChest;
@ -75,7 +74,6 @@ public class QueryToolChestModule implements Module
JsonConfigProvider.bind(binder, "druid.query", QueryConfig.class);
JsonConfigProvider.bind(binder, "druid.query.groupBy", GroupByQueryConfig.class);
JsonConfigProvider.bind(binder, "druid.query.search", SearchQueryConfig.class);
JsonConfigProvider.bind(binder, "druid.query.select", SelectQueryConfig.class);
JsonConfigProvider.bind(binder, "druid.query.topN", TopNQueryConfig.class);
}
}

View File

@ -32,7 +32,6 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.metamx.common.ISE;
import com.metamx.common.guava.ConcatSequence;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.guava.MergeIterable;
import com.metamx.common.guava.Sequence;
@ -84,7 +83,6 @@ import io.druid.query.search.search.SearchQueryConfig;
import io.druid.query.select.EventHolder;
import io.druid.query.select.PagingSpec;
import io.druid.query.select.SelectQuery;
import io.druid.query.select.SelectQueryConfig;
import io.druid.query.select.SelectQueryQueryToolChest;
import io.druid.query.select.SelectResultValue;
import io.druid.query.spec.MultipleIntervalSegmentSpec;
@ -779,7 +777,7 @@ public class CachingClusteredClientTest
QueryRunner runner = new FinalizeResultsQueryRunner(
client,
new SelectQueryQueryToolChest(
new SelectQueryConfig(),
new QueryConfig(),
jsonMapper
)
);
@ -1620,13 +1618,7 @@ public class CachingClusteredClientTest
int index = 0;
while (index < objects.length) {
DateTime timestamp = (DateTime) objects[index++];
//List values = Lists.newArrayList();
//while (index < objects.length && !(objects[index] instanceof DateTime)) {
retVal.add(new MapBasedRow(timestamp, (Map) objects[index++]));
//}
//retVal.add(new Result<>(timestamp, values));
}
return retVal;
}
@ -1663,7 +1655,7 @@ public class CachingClusteredClientTest
.put(SearchQuery.class, new SearchQueryQueryToolChest(new SearchQueryConfig()))
.put(
SelectQuery.class,
new SelectQueryQueryToolChest(new SelectQueryConfig(), jsonMapper)
new SelectQueryQueryToolChest(new QueryConfig(), jsonMapper)
)
.put(
GroupByQuery.class,