Merge pull request #467 from metamx/optimize-postAgg-calculation

Optimize post agg calculation
This commit is contained in:
fjy 2014-04-15 15:18:01 -06:00
commit 228be3acb0
23 changed files with 575 additions and 172 deletions

View File

@ -0,0 +1,56 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.aggregation;
import com.google.common.collect.Lists;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
public class AggregatorUtil
{
/**
* returns the list of dependent postAggregators that should be calculated in order to calculate given postAgg
*
* @param postAggregatorList List of postAggregator, there is a restriction that the list should be in an order
* such that all the dependencies of any given aggregator should occur before that aggregator.
* See AggregatorUtilTest.testOutOfOrderPruneDependentPostAgg for example.
* @param postAggName name of the postAgg on which dependency is to be calculated
*/
public static List<PostAggregator> pruneDependentPostAgg(List<PostAggregator> postAggregatorList, String postAggName)
{
LinkedList<PostAggregator> rv = Lists.newLinkedList();
Set<String> deps = new HashSet<>();
deps.add(postAggName);
// Iterate backwards to find the last calculated aggregate and add dependent aggregator as we find dependencies in reverse order
for (PostAggregator agg : Lists.reverse(postAggregatorList)) {
if (deps.contains(agg.getName())) {
rv.addFirst(agg); // add to the beginning of List
deps.remove(agg.getName());
deps.addAll(agg.getDependentFields());
}
}
return rv;
}
}

View File

@ -24,7 +24,6 @@ import io.druid.granularity.AllGranularity;
import io.druid.granularity.QueryGranularity; import io.druid.granularity.QueryGranularity;
import io.druid.query.Result; import io.druid.query.Result;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
@ -37,17 +36,14 @@ public class TimeseriesBinaryFn
{ {
private final QueryGranularity gran; private final QueryGranularity gran;
private final List<AggregatorFactory> aggregations; private final List<AggregatorFactory> aggregations;
private final List<PostAggregator> postAggregations;
public TimeseriesBinaryFn( public TimeseriesBinaryFn(
QueryGranularity granularity, QueryGranularity granularity,
List<AggregatorFactory> aggregations, List<AggregatorFactory> aggregations
List<PostAggregator> postAggregations
) )
{ {
this.gran = granularity; this.gran = granularity;
this.aggregations = aggregations; this.aggregations = aggregations;
this.postAggregations = postAggregations;
} }
@Override @Override
@ -71,11 +67,6 @@ public class TimeseriesBinaryFn
retVal.put(metricName, factory.combine(arg1Val.getMetric(metricName), arg2Val.getMetric(metricName))); retVal.put(metricName, factory.combine(arg1Val.getMetric(metricName), arg2Val.getMetric(metricName)));
} }
for (PostAggregator pf : postAggregations) {
final String metricName = pf.getName();
retVal.put(metricName, pf.compute(retVal));
}
return (gran instanceof AllGranularity) ? return (gran instanceof AllGranularity) ?
new Result<TimeseriesResultValue>( new Result<TimeseriesResultValue>(
arg1.getTimestamp(), arg1.getTimestamp(),

View File

@ -74,10 +74,6 @@ public class TimeseriesQueryEngine
bob.addMetric(aggregator); bob.addMetric(aggregator);
} }
for (PostAggregator postAgg : postAggregatorSpecs) {
bob.addMetric(postAgg);
}
Result<TimeseriesResultValue> retVal = bob.build(); Result<TimeseriesResultValue> retVal = bob.build();
// cleanup // cleanup

View File

@ -101,8 +101,7 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<Timeser
TimeseriesQuery query = (TimeseriesQuery) input; TimeseriesQuery query = (TimeseriesQuery) input;
return new TimeseriesBinaryFn( return new TimeseriesBinaryFn(
query.getGranularity(), query.getGranularity(),
query.getAggregatorSpecs(), query.getAggregatorSpecs()
query.getPostAggregatorSpecs()
); );
} }
}; };
@ -147,7 +146,7 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<Timeser
values.put(agg.getName(), fn.manipulate(agg, holder.getMetric(agg.getName()))); values.put(agg.getName(), fn.manipulate(agg, holder.getMetric(agg.getName())));
} }
for (PostAggregator postAgg : query.getPostAggregatorSpecs()) { for (PostAggregator postAgg : query.getPostAggregatorSpecs()) {
values.put(postAgg.getName(), holder.getMetric(postAgg.getName())); values.put(postAgg.getName(), postAgg.compute(values));
} }
return new Result<TimeseriesResultValue>( return new Result<TimeseriesResultValue>(
result.getTimestamp(), result.getTimestamp(),
@ -169,7 +168,6 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<Timeser
return new CacheStrategy<Result<TimeseriesResultValue>, Object, TimeseriesQuery>() return new CacheStrategy<Result<TimeseriesResultValue>, Object, TimeseriesQuery>()
{ {
private final List<AggregatorFactory> aggs = query.getAggregatorSpecs(); private final List<AggregatorFactory> aggs = query.getAggregatorSpecs();
private final List<PostAggregator> postAggs = query.getPostAggregatorSpecs();
@Override @Override
public byte[] computeCacheKey(TimeseriesQuery query) public byte[] computeCacheKey(TimeseriesQuery query)
@ -238,10 +236,6 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<Timeser
retVal.put(factory.getName(), factory.deserialize(resultIter.next())); retVal.put(factory.getName(), factory.deserialize(resultIter.next()));
} }
for (PostAggregator postAgg : postAggs) {
retVal.put(postAgg.getName(), postAgg.compute(retVal));
}
return new Result<TimeseriesResultValue>( return new Result<TimeseriesResultValue>(
timestamp, timestamp,
new TimeseriesResultValue(retVal) new TimeseriesResultValue(retVal)

View File

@ -56,7 +56,6 @@ public class AggregateTopNMetricFirstAlgorithm implements TopNAlgorithm<int[], T
this.bufferPool = bufferPool; this.bufferPool = bufferPool;
} }
@Override @Override
public TopNParams makeInitParams( public TopNParams makeInitParams(
DimensionSelector dimSelector, Cursor cursor DimensionSelector dimSelector, Cursor cursor
@ -69,7 +68,12 @@ public class AggregateTopNMetricFirstAlgorithm implements TopNAlgorithm<int[], T
public TopNResultBuilder makeResultBuilder(TopNParams params) public TopNResultBuilder makeResultBuilder(TopNParams params)
{ {
return query.getTopNMetricSpec().getResultBuilder( return query.getTopNMetricSpec().getResultBuilder(
params.getCursor().getTime(), query.getDimensionSpec(), query.getThreshold(), comparator params.getCursor().getTime(),
query.getDimensionSpec(),
query.getThreshold(),
comparator,
query.getAggregatorSpecs(),
query.getPostAggregatorSpecs()
); );
} }

View File

@ -60,7 +60,12 @@ public class DimExtractionTopNAlgorithm extends BaseTopNAlgorithm<Aggregator[][]
public TopNResultBuilder makeResultBuilder(TopNParams params) public TopNResultBuilder makeResultBuilder(TopNParams params)
{ {
return query.getTopNMetricSpec().getResultBuilder( return query.getTopNMetricSpec().getResultBuilder(
params.getCursor().getTime(), query.getDimensionSpec(), query.getThreshold(), comparator params.getCursor().getTime(),
query.getDimensionSpec(),
query.getThreshold(),
comparator,
query.getAggregatorSpecs(),
query.getPostAggregatorSpecs()
); );
} }
@ -144,9 +149,7 @@ public class DimExtractionTopNAlgorithm extends BaseTopNAlgorithm<Aggregator[][]
resultBuilder.addEntry( resultBuilder.addEntry(
entry.getKey(), entry.getKey(),
entry.getKey(), entry.getKey(),
vals, vals
query.getAggregatorSpecs(),
query.getPostAggregatorSpecs()
); );
} }
} }

View File

@ -36,7 +36,6 @@ import java.util.List;
public class InvertedTopNMetricSpec implements TopNMetricSpec public class InvertedTopNMetricSpec implements TopNMetricSpec
{ {
private static final byte CACHE_TYPE_ID = 0x3; private static final byte CACHE_TYPE_ID = 0x3;
private final TopNMetricSpec delegate; private final TopNMetricSpec delegate;
@JsonCreator @JsonCreator
@ -76,10 +75,12 @@ public class InvertedTopNMetricSpec implements TopNMetricSpec
DateTime timestamp, DateTime timestamp,
DimensionSpec dimSpec, DimensionSpec dimSpec,
int threshold, int threshold,
Comparator comparator Comparator comparator,
List<AggregatorFactory> aggFactories,
List<PostAggregator> postAggs
) )
{ {
return delegate.getResultBuilder(timestamp, dimSpec, threshold, comparator); return delegate.getResultBuilder(timestamp, dimSpec, threshold, comparator, aggFactories, postAggs);
} }
@Override @Override
@ -102,15 +103,27 @@ public class InvertedTopNMetricSpec implements TopNMetricSpec
delegate.initTopNAlgorithmSelector(selector); delegate.initTopNAlgorithmSelector(selector);
} }
@Override
public String getMetricName(DimensionSpec dimSpec)
{
return delegate.getMetricName(dimSpec);
}
@Override @Override
public boolean equals(Object o) public boolean equals(Object o)
{ {
if (this == o) return true; if (this == o) {
if (o == null || getClass() != o.getClass()) return false; return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
InvertedTopNMetricSpec that = (InvertedTopNMetricSpec) o; InvertedTopNMetricSpec that = (InvertedTopNMetricSpec) o;
if (delegate != null ? !delegate.equals(that.delegate) : that.delegate != null) return false; if (delegate != null ? !delegate.equals(that.delegate) : that.delegate != null) {
return false;
}
return true; return true;
} }

View File

@ -80,10 +80,12 @@ public class LexicographicTopNMetricSpec implements TopNMetricSpec
DateTime timestamp, DateTime timestamp,
DimensionSpec dimSpec, DimensionSpec dimSpec,
int threshold, int threshold,
Comparator comparator Comparator comparator,
List<AggregatorFactory> aggFactories,
List<PostAggregator> postAggs
) )
{ {
return new TopNLexicographicResultBuilder(timestamp, dimSpec, threshold, previousStop, comparator); return new TopNLexicographicResultBuilder(timestamp, dimSpec, threshold, previousStop, comparator, aggFactories);
} }
@Override @Override
@ -111,6 +113,12 @@ public class LexicographicTopNMetricSpec implements TopNMetricSpec
selector.setAggregateAllMetrics(true); selector.setAggregateAllMetrics(true);
} }
@Override
public String getMetricName(DimensionSpec dimSpec)
{
return dimSpec.getOutputName();
}
@Override @Override
public String toString() public String toString()
{ {

View File

@ -121,10 +121,12 @@ public class NumericTopNMetricSpec implements TopNMetricSpec
DateTime timestamp, DateTime timestamp,
DimensionSpec dimSpec, DimensionSpec dimSpec,
int threshold, int threshold,
Comparator comparator Comparator comparator,
List<AggregatorFactory> aggFactories,
List<PostAggregator> postAggs
) )
{ {
return new TopNNumericResultBuilder(timestamp, dimSpec, metric, threshold, comparator); return new TopNNumericResultBuilder(timestamp, dimSpec, metric, threshold, comparator, aggFactories, postAggs);
} }
@Override @Override
@ -150,6 +152,12 @@ public class NumericTopNMetricSpec implements TopNMetricSpec
selector.setAggregateTopNMetricFirst(true); selector.setAggregateTopNMetricFirst(true);
} }
@Override
public String getMetricName(DimensionSpec dimSpec)
{
return metric;
}
@Override @Override
public String toString() public String toString()
{ {

View File

@ -35,7 +35,8 @@ import java.util.Comparator;
/** /**
*/ */
public class PooledTopNAlgorithm extends BaseTopNAlgorithm<int[], BufferAggregator[], PooledTopNAlgorithm.PooledTopNParams> public class PooledTopNAlgorithm
extends BaseTopNAlgorithm<int[], BufferAggregator[], PooledTopNAlgorithm.PooledTopNParams>
{ {
private final Capabilities capabilities; private final Capabilities capabilities;
private final TopNQuery query; private final TopNQuery query;
@ -117,7 +118,12 @@ public class PooledTopNAlgorithm extends BaseTopNAlgorithm<int[], BufferAggregat
public TopNResultBuilder makeResultBuilder(PooledTopNParams params) public TopNResultBuilder makeResultBuilder(PooledTopNParams params)
{ {
return query.getTopNMetricSpec().getResultBuilder( return query.getTopNMetricSpec().getResultBuilder(
params.getCursor().getTime(), query.getDimensionSpec(), query.getThreshold(), comparator params.getCursor().getTime(),
query.getDimensionSpec(),
query.getThreshold(),
comparator,
query.getAggregatorSpecs(),
query.getPostAggregatorSpecs()
); );
} }
@ -217,9 +223,7 @@ public class PooledTopNAlgorithm extends BaseTopNAlgorithm<int[], BufferAggregat
resultBuilder.addEntry( resultBuilder.addEntry(
dimSelector.lookupName(i), dimSelector.lookupName(i),
i, i,
vals, vals
query.getAggregatorSpecs(),
query.getPostAggregatorSpecs()
); );
} }
} }
@ -228,7 +232,7 @@ public class PooledTopNAlgorithm extends BaseTopNAlgorithm<int[], BufferAggregat
@Override @Override
protected void closeAggregators(BufferAggregator[] bufferAggregators) protected void closeAggregators(BufferAggregator[] bufferAggregators)
{ {
for(BufferAggregator agg : bufferAggregators) { for (BufferAggregator agg : bufferAggregators) {
agg.close(); agg.close();
} }
} }
@ -246,11 +250,6 @@ public class PooledTopNAlgorithm extends BaseTopNAlgorithm<int[], BufferAggregat
public static class PooledTopNParams extends TopNParams public static class PooledTopNParams extends TopNParams
{ {
public static Builder builder()
{
return new Builder();
}
private final ResourceHolder<ByteBuffer> resultsBufHolder; private final ResourceHolder<ByteBuffer> resultsBufHolder;
private final ByteBuffer resultsBuf; private final ByteBuffer resultsBuf;
private final int[] aggregatorSizes; private final int[] aggregatorSizes;
@ -278,6 +277,11 @@ public class PooledTopNAlgorithm extends BaseTopNAlgorithm<int[], BufferAggregat
this.arrayProvider = arrayProvider; this.arrayProvider = arrayProvider;
} }
public static Builder builder()
{
return new Builder();
}
public ResourceHolder<ByteBuffer> getResultsBufHolder() public ResourceHolder<ByteBuffer> getResultsBufHolder()
{ {
return resultsBufHolder; return resultsBufHolder;

View File

@ -24,6 +24,7 @@ import io.druid.granularity.AllGranularity;
import io.druid.granularity.QueryGranularity; import io.druid.granularity.QueryGranularity;
import io.druid.query.Result; import io.druid.query.Result;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.AggregatorUtil;
import io.druid.query.aggregation.PostAggregator; import io.druid.query.aggregation.PostAggregator;
import io.druid.query.dimension.DimensionSpec; import io.druid.query.dimension.DimensionSpec;
import org.joda.time.DateTime; import org.joda.time.DateTime;
@ -40,7 +41,7 @@ public class TopNBinaryFn implements BinaryFn<Result<TopNResultValue>, Result<To
private final TopNResultMerger merger; private final TopNResultMerger merger;
private final DimensionSpec dimSpec; private final DimensionSpec dimSpec;
private final QueryGranularity gran; private final QueryGranularity gran;
private final String dimension; private final DimensionSpec dimensionSpec;
private final TopNMetricSpec topNMetricSpec; private final TopNMetricSpec topNMetricSpec;
private final int threshold; private final int threshold;
private final List<AggregatorFactory> aggregations; private final List<AggregatorFactory> aggregations;
@ -63,9 +64,13 @@ public class TopNBinaryFn implements BinaryFn<Result<TopNResultValue>, Result<To
this.topNMetricSpec = topNMetricSpec; this.topNMetricSpec = topNMetricSpec;
this.threshold = threshold; this.threshold = threshold;
this.aggregations = aggregatorSpecs; this.aggregations = aggregatorSpecs;
this.postAggregations = postAggregatorSpecs; this.dimensionSpec = dimSpec;
this.postAggregations = AggregatorUtil.pruneDependentPostAgg(
postAggregatorSpecs,
this.topNMetricSpec.getMetricName(this.dimensionSpec)
);
this.dimension = dimSpec.getOutputName();
this.comparator = topNMetricSpec.getComparator(aggregatorSpecs, postAggregatorSpecs); this.comparator = topNMetricSpec.getComparator(aggregatorSpecs, postAggregatorSpecs);
} }
@ -79,11 +84,13 @@ public class TopNBinaryFn implements BinaryFn<Result<TopNResultValue>, Result<To
return merger.getResult(arg1, comparator); return merger.getResult(arg1, comparator);
} }
Map<String, DimensionAndMetricValueExtractor> retVals = new LinkedHashMap<String, DimensionAndMetricValueExtractor>();
TopNResultValue arg1Vals = arg1.getValue(); TopNResultValue arg1Vals = arg1.getValue();
TopNResultValue arg2Vals = arg2.getValue(); TopNResultValue arg2Vals = arg2.getValue();
Map<String, DimensionAndMetricValueExtractor> retVals = new LinkedHashMap<String, DimensionAndMetricValueExtractor>();
String dimension = dimensionSpec.getOutputName();
String topNMetricName = topNMetricSpec.getMetricName(dimensionSpec);
for (DimensionAndMetricValueExtractor arg1Val : arg1Vals) { for (DimensionAndMetricValueExtractor arg1Val : arg1Vals) {
retVals.put(arg1Val.getStringDimensionValue(dimension), arg1Val); retVals.put(arg1Val.getStringDimensionValue(dimension), arg1Val);
} }
@ -92,16 +99,16 @@ public class TopNBinaryFn implements BinaryFn<Result<TopNResultValue>, Result<To
DimensionAndMetricValueExtractor arg1Val = retVals.get(dimensionValue); DimensionAndMetricValueExtractor arg1Val = retVals.get(dimensionValue);
if (arg1Val != null) { if (arg1Val != null) {
Map<String, Object> retVal = new LinkedHashMap<String, Object>(); // size of map = aggregator + topNDim + postAgg (If sorting is done on post agg field)
Map<String, Object> retVal = new LinkedHashMap<String, Object>(aggregations.size() + 2);
retVal.put(dimension, dimensionValue); retVal.put(dimension, dimensionValue);
for (AggregatorFactory factory : aggregations) { for (AggregatorFactory factory : aggregations) {
final String metricName = factory.getName(); final String metricName = factory.getName();
retVal.put(metricName, factory.combine(arg1Val.getMetric(metricName), arg2Val.getMetric(metricName))); retVal.put(metricName, factory.combine(arg1Val.getMetric(metricName), arg2Val.getMetric(metricName)));
} }
for (PostAggregator postAgg : postAggregations) {
for (PostAggregator pf : postAggregations) { retVal.put(postAgg.getName(), postAgg.compute(retVal));
retVal.put(pf.getName(), pf.compute(retVal));
} }
retVals.put(dimensionValue, new DimensionAndMetricValueExtractor(retVal)); retVals.put(dimensionValue, new DimensionAndMetricValueExtractor(retVal));
@ -117,7 +124,14 @@ public class TopNBinaryFn implements BinaryFn<Result<TopNResultValue>, Result<To
timestamp = gran.toDateTime(gran.truncate(arg1.getTimestamp().getMillis())); timestamp = gran.toDateTime(gran.truncate(arg1.getTimestamp().getMillis()));
} }
TopNResultBuilder bob = topNMetricSpec.getResultBuilder(timestamp, dimSpec, threshold, comparator); TopNResultBuilder bob = topNMetricSpec.getResultBuilder(
timestamp,
dimSpec,
threshold,
comparator,
aggregations,
postAggregations
);
for (DimensionAndMetricValueExtractor extractor : retVals.values()) { for (DimensionAndMetricValueExtractor extractor : retVals.values()) {
bob.addEntry(extractor); bob.addEntry(extractor);
} }

View File

@ -40,7 +40,7 @@ public class TopNLexicographicResultBuilder implements TopNResultBuilder
private final DateTime timestamp; private final DateTime timestamp;
private final DimensionSpec dimSpec; private final DimensionSpec dimSpec;
private final String previousStop; private final String previousStop;
private final List<AggregatorFactory> aggFactories;
private MinMaxPriorityQueue<DimValHolder> pQueue = null; private MinMaxPriorityQueue<DimValHolder> pQueue = null;
public TopNLexicographicResultBuilder( public TopNLexicographicResultBuilder(
@ -48,12 +48,14 @@ public class TopNLexicographicResultBuilder implements TopNResultBuilder
DimensionSpec dimSpec, DimensionSpec dimSpec,
int threshold, int threshold,
String previousStop, String previousStop,
final Comparator comparator final Comparator comparator,
List<AggregatorFactory> aggFactories
) )
{ {
this.timestamp = timestamp; this.timestamp = timestamp;
this.dimSpec = dimSpec; this.dimSpec = dimSpec;
this.previousStop = previousStop; this.previousStop = previousStop;
this.aggFactories = aggFactories;
instantiatePQueue(threshold, comparator); instantiatePQueue(threshold, comparator);
} }
@ -62,9 +64,7 @@ public class TopNLexicographicResultBuilder implements TopNResultBuilder
public TopNResultBuilder addEntry( public TopNResultBuilder addEntry(
String dimName, String dimName,
Object dimValIndex, Object dimValIndex,
Object[] metricVals, Object[] metricVals
List<AggregatorFactory> aggFactories,
List<PostAggregator> postAggs
) )
{ {
Map<String, Object> metricValues = Maps.newLinkedHashMap(); Map<String, Object> metricValues = Maps.newLinkedHashMap();
@ -75,9 +75,6 @@ public class TopNLexicographicResultBuilder implements TopNResultBuilder
for (Object metricVal : metricVals) { for (Object metricVal : metricVals) {
metricValues.put(aggsIter.next().getName(), metricVal); metricValues.put(aggsIter.next().getName(), metricVal);
} }
for (PostAggregator postAgg : postAggs) {
metricValues.put(postAgg.getName(), postAgg.compute(metricValues));
}
pQueue.add(new DimValHolder.Builder().withDirName(dimName).withMetricValues(metricValues).build()); pQueue.add(new DimValHolder.Builder().withDirName(dimName).withMetricValues(metricValues).build());
} }

View File

@ -47,7 +47,9 @@ public interface TopNMetricSpec
DateTime timestamp, DateTime timestamp,
DimensionSpec dimSpec, DimensionSpec dimSpec,
int threshold, int threshold,
Comparator comparator Comparator comparator,
List<AggregatorFactory> aggFactories,
List<PostAggregator> postAggs
); );
public byte[] getCacheKey(); public byte[] getCacheKey();
@ -55,4 +57,6 @@ public interface TopNMetricSpec
public <T> TopNMetricSpecBuilder<T> configureOptimizer(TopNMetricSpecBuilder<T> builder); public <T> TopNMetricSpecBuilder<T> configureOptimizer(TopNMetricSpecBuilder<T> builder);
public void initTopNAlgorithmSelector(TopNAlgorithmSelector selector); public void initTopNAlgorithmSelector(TopNAlgorithmSelector selector);
public String getMetricName(DimensionSpec dimSpec);
} }

View File

@ -23,6 +23,7 @@ import com.google.common.collect.Maps;
import com.google.common.collect.MinMaxPriorityQueue; import com.google.common.collect.MinMaxPriorityQueue;
import io.druid.query.Result; import io.druid.query.Result;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.AggregatorUtil;
import io.druid.query.aggregation.PostAggregator; import io.druid.query.aggregation.PostAggregator;
import io.druid.query.dimension.DimensionSpec; import io.druid.query.dimension.DimensionSpec;
import org.joda.time.DateTime; import org.joda.time.DateTime;
@ -40,7 +41,8 @@ public class TopNNumericResultBuilder implements TopNResultBuilder
private final DateTime timestamp; private final DateTime timestamp;
private final DimensionSpec dimSpec; private final DimensionSpec dimSpec;
private final String metricName; private final String metricName;
private final List<AggregatorFactory> aggFactories;
private final List<PostAggregator> postAggs;
private MinMaxPriorityQueue<DimValHolder> pQueue = null; private MinMaxPriorityQueue<DimValHolder> pQueue = null;
public TopNNumericResultBuilder( public TopNNumericResultBuilder(
@ -48,12 +50,16 @@ public class TopNNumericResultBuilder implements TopNResultBuilder
DimensionSpec dimSpec, DimensionSpec dimSpec,
String metricName, String metricName,
int threshold, int threshold,
final Comparator comparator final Comparator comparator,
List<AggregatorFactory> aggFactories,
List<PostAggregator> postAggs
) )
{ {
this.timestamp = timestamp; this.timestamp = timestamp;
this.dimSpec = dimSpec; this.dimSpec = dimSpec;
this.metricName = metricName; this.metricName = metricName;
this.aggFactories = aggFactories;
this.postAggs = AggregatorUtil.pruneDependentPostAgg(postAggs, this.metricName);
instantiatePQueue(threshold, comparator); instantiatePQueue(threshold, comparator);
} }
@ -62,9 +68,7 @@ public class TopNNumericResultBuilder implements TopNResultBuilder
public TopNResultBuilder addEntry( public TopNResultBuilder addEntry(
String dimName, String dimName,
Object dimValIndex, Object dimValIndex,
Object[] metricVals, Object[] metricVals
List<AggregatorFactory> aggFactories,
List<PostAggregator> postAggs
) )
{ {
Map<String, Object> metricValues = Maps.newLinkedHashMap(); Map<String, Object> metricValues = Maps.newLinkedHashMap();
@ -75,6 +79,7 @@ public class TopNNumericResultBuilder implements TopNResultBuilder
for (Object metricVal : metricVals) { for (Object metricVal : metricVals) {
metricValues.put(aggFactoryIter.next().getName(), metricVal); metricValues.put(aggFactoryIter.next().getName(), metricVal);
} }
for (PostAggregator postAgg : postAggs) { for (PostAggregator postAgg : postAggs) {
metricValues.put(postAgg.getName(), postAgg.compute(metricValues)); metricValues.put(postAgg.getName(), postAgg.compute(metricValues));
} }

View File

@ -64,11 +64,13 @@ import java.util.Map;
public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultValue>, TopNQuery> public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultValue>, TopNQuery>
{ {
private static final byte TOPN_QUERY = 0x1; private static final byte TOPN_QUERY = 0x1;
private static final Joiner COMMA_JOIN = Joiner.on(","); private static final Joiner COMMA_JOIN = Joiner.on(",");
private static final TypeReference<Result<TopNResultValue>> TYPE_REFERENCE = new TypeReference<Result<TopNResultValue>>(){}; private static final TypeReference<Result<TopNResultValue>> TYPE_REFERENCE = new TypeReference<Result<TopNResultValue>>()
{
private static final TypeReference<Object> OBJECT_TYPE_REFERENCE = new TypeReference<Object>(){}; };
private static final TypeReference<Object> OBJECT_TYPE_REFERENCE = new TypeReference<Object>()
{
};
private final TopNQueryConfig config; private final TopNQueryConfig config;
@Inject @Inject
@ -161,7 +163,12 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
values.put(agg.getName(), fn.manipulate(agg, input.getMetric(agg.getName()))); values.put(agg.getName(), fn.manipulate(agg, input.getMetric(agg.getName())));
} }
for (PostAggregator postAgg : query.getPostAggregatorSpecs()) { for (PostAggregator postAgg : query.getPostAggregatorSpecs()) {
values.put(postAgg.getName(), input.getMetric(postAgg.getName())); Object calculatedPostAgg = input.getMetric(postAgg.getName());
if (calculatedPostAgg != null) {
values.put(postAgg.getName(), calculatedPostAgg);
} else {
values.put(postAgg.getName(), postAgg.compute(values));
}
} }
values.put(dimension, input.getDimensionValue(dimension)); values.put(dimension, input.getDimensionValue(dimension));
@ -281,10 +288,6 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
vals.put(factory.getName(), factory.deserialize(resultIter.next())); vals.put(factory.getName(), factory.deserialize(resultIter.next()));
} }
for (PostAggregator postAgg : postAggs) {
vals.put(postAgg.getName(), postAgg.compute(vals));
}
retVal.add(vals); retVal.add(vals);
} }
@ -313,6 +316,11 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
return new ThresholdAdjustingQueryRunner(runner, config.getMinTopNThreshold()); return new ThresholdAdjustingQueryRunner(runner, config.getMinTopNThreshold());
} }
public Ordering<Result<TopNResultValue>> getOrdering()
{
return Ordering.natural();
}
private static class ThresholdAdjustingQueryRunner implements QueryRunner<Result<TopNResultValue>> private static class ThresholdAdjustingQueryRunner implements QueryRunner<Result<TopNResultValue>>
{ {
private final QueryRunner<Result<TopNResultValue>> runner; private final QueryRunner<Result<TopNResultValue>> runner;
@ -397,9 +405,4 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
); );
} }
} }
public Ordering<Result<TopNResultValue>> getOrdering()
{
return Ordering.natural();
}
} }

View File

@ -33,9 +33,7 @@ public interface TopNResultBuilder
public TopNResultBuilder addEntry( public TopNResultBuilder addEntry(
String dimName, String dimName,
Object dimValIndex, Object dimValIndex,
Object[] metricVals, Object[] metricVals
List<AggregatorFactory> aggFactories,
List<PostAggregator> postAggs
); );
public TopNResultBuilder addEntry( public TopNResultBuilder addEntry(

View File

@ -60,6 +60,7 @@ public class QueryRunnerTestHelper
public static final String indexMetric = "index"; public static final String indexMetric = "index";
public static final String uniqueMetric = "uniques"; public static final String uniqueMetric = "uniques";
public static final String addRowsIndexConstantMetric = "addRowsIndexConstant"; public static final String addRowsIndexConstantMetric = "addRowsIndexConstant";
public static String dependentPostAggMetric = "dependentPostAgg";
public static final CountAggregatorFactory rowsCount = new CountAggregatorFactory("rows"); public static final CountAggregatorFactory rowsCount = new CountAggregatorFactory("rows");
public static final LongSumAggregatorFactory indexLongSum = new LongSumAggregatorFactory("index", "index"); public static final LongSumAggregatorFactory indexLongSum = new LongSumAggregatorFactory("index", "index");
public static final DoubleSumAggregatorFactory indexDoubleSum = new DoubleSumAggregatorFactory("index", "index"); public static final DoubleSumAggregatorFactory indexDoubleSum = new DoubleSumAggregatorFactory("index", "index");
@ -72,8 +73,19 @@ public class QueryRunnerTestHelper
public static final FieldAccessPostAggregator indexPostAgg = new FieldAccessPostAggregator("index", "index"); public static final FieldAccessPostAggregator indexPostAgg = new FieldAccessPostAggregator("index", "index");
public static final ArithmeticPostAggregator addRowsIndexConstant = public static final ArithmeticPostAggregator addRowsIndexConstant =
new ArithmeticPostAggregator( new ArithmeticPostAggregator(
"addRowsIndexConstant", "+", Lists.newArrayList(constant, rowsPostAgg, indexPostAgg) addRowsIndexConstantMetric, "+", Lists.newArrayList(constant, rowsPostAgg, indexPostAgg)
); );
// dependent on AddRowsIndexContact postAgg
public static final ArithmeticPostAggregator dependentPostAgg = new ArithmeticPostAggregator(
dependentPostAggMetric,
"+",
Lists.newArrayList(
constant,
new FieldAccessPostAggregator(addRowsIndexConstantMetric, addRowsIndexConstantMetric)
)
);
public static final List<AggregatorFactory> commonAggregators = Arrays.asList( public static final List<AggregatorFactory> commonAggregators = Arrays.asList(
rowsCount, rowsCount,
indexDoubleSum, indexDoubleSum,

View File

@ -0,0 +1,104 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.aggregation;
import com.google.common.collect.Lists;
import io.druid.query.aggregation.post.ArithmeticPostAggregator;
import io.druid.query.aggregation.post.ConstantPostAggregator;
import io.druid.query.aggregation.post.FieldAccessPostAggregator;
import org.junit.Assert;
import org.junit.Test;
import java.util.List;
public class AggregatorUtilTest
{
@Test
public void testPruneDependentPostAgg()
{
PostAggregator agg1 = new ArithmeticPostAggregator(
"abc", "+", Lists.<PostAggregator>newArrayList(
new ConstantPostAggregator("1", 1L, 1L), new ConstantPostAggregator("2", 2L, 2L)
)
);
PostAggregator dependency1 = new ArithmeticPostAggregator(
"dep1", "+", Lists.<PostAggregator>newArrayList(
new ConstantPostAggregator("1", 1L, 1L), new ConstantPostAggregator("4", 4L, 4L)
)
);
PostAggregator agg2 = new FieldAccessPostAggregator("def", "def");
PostAggregator dependency2 = new FieldAccessPostAggregator("dep2", "dep2");
PostAggregator aggregator = new ArithmeticPostAggregator(
"finalAgg",
"+",
Lists.<PostAggregator>newArrayList(
new FieldAccessPostAggregator("dep1", "dep1"),
new FieldAccessPostAggregator("dep2", "dep2")
)
);
List<PostAggregator> prunedAgg = AggregatorUtil.pruneDependentPostAgg(
Lists.newArrayList(
agg1,
dependency1,
agg2,
dependency2,
aggregator
), aggregator.getName()
);
Assert.assertEquals(Lists.newArrayList(dependency1, dependency2, aggregator), prunedAgg);
}
@Test
public void testOutOfOrderPruneDependentPostAgg()
{
PostAggregator agg1 = new ArithmeticPostAggregator(
"abc", "+", Lists.<PostAggregator>newArrayList(
new ConstantPostAggregator("1", 1L, 1L), new ConstantPostAggregator("2", 2L, 2L)
)
);
PostAggregator dependency1 = new ArithmeticPostAggregator(
"dep1", "+", Lists.<PostAggregator>newArrayList(
new ConstantPostAggregator("1", 1L, 1L), new ConstantPostAggregator("4", 4L, 4L)
)
);
PostAggregator agg2 = new FieldAccessPostAggregator("def", "def");
PostAggregator dependency2 = new FieldAccessPostAggregator("dep2", "dep2");
PostAggregator aggregator = new ArithmeticPostAggregator(
"finalAgg",
"+",
Lists.<PostAggregator>newArrayList(
new FieldAccessPostAggregator("dep1", "dep1"),
new FieldAccessPostAggregator("dep2", "dep2")
)
);
List<PostAggregator> prunedAgg = AggregatorUtil.pruneDependentPostAgg(
Lists.newArrayList(
agg1,
dependency1,
aggregator, // dependency is added later than the aggregator
agg2,
dependency2
), aggregator.getName()
);
Assert.assertEquals(Lists.newArrayList(dependency1, aggregator), prunedAgg);
}
}

View File

@ -20,16 +20,11 @@
package io.druid.query.timeseries; package io.druid.query.timeseries;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.druid.granularity.QueryGranularity; import io.druid.granularity.QueryGranularity;
import io.druid.query.Result; import io.druid.query.Result;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.aggregation.post.ArithmeticPostAggregator;
import io.druid.query.aggregation.post.ConstantPostAggregator;
import io.druid.query.aggregation.post.FieldAccessPostAggregator;
import junit.framework.Assert; import junit.framework.Assert;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.junit.Test; import org.junit.Test;
@ -43,21 +38,10 @@ public class TimeseriesBinaryFnTest
{ {
final CountAggregatorFactory rowsCount = new CountAggregatorFactory("rows"); final CountAggregatorFactory rowsCount = new CountAggregatorFactory("rows");
final LongSumAggregatorFactory indexLongSum = new LongSumAggregatorFactory("index", "index"); final LongSumAggregatorFactory indexLongSum = new LongSumAggregatorFactory("index", "index");
final ConstantPostAggregator constant = new ConstantPostAggregator("const", 1L, null);
final FieldAccessPostAggregator rowsPostAgg = new FieldAccessPostAggregator("rows", "rows");
final FieldAccessPostAggregator indexPostAgg = new FieldAccessPostAggregator("index", "index");
final ArithmeticPostAggregator addRowsIndexConstant = new ArithmeticPostAggregator(
"addRowsIndexConstant",
"+",
Lists.newArrayList(constant, rowsPostAgg, indexPostAgg)
);
final List<AggregatorFactory> aggregatorFactories = Arrays.asList( final List<AggregatorFactory> aggregatorFactories = Arrays.asList(
rowsCount, rowsCount,
indexLongSum indexLongSum
); );
final List<PostAggregator> postAggregators = Arrays.<PostAggregator>asList(
addRowsIndexConstant
);
final DateTime currTime = new DateTime(); final DateTime currTime = new DateTime();
@Test @Test
@ -87,16 +71,14 @@ public class TimeseriesBinaryFnTest
new TimeseriesResultValue( new TimeseriesResultValue(
ImmutableMap.<String, Object>of( ImmutableMap.<String, Object>of(
"rows", 3L, "rows", 3L,
"index", 5L, "index", 5L
"addRowsIndexConstant", 9.0
) )
) )
); );
Result<TimeseriesResultValue> actual = new TimeseriesBinaryFn( Result<TimeseriesResultValue> actual = new TimeseriesBinaryFn(
QueryGranularity.ALL, QueryGranularity.ALL,
aggregatorFactories, aggregatorFactories
postAggregators
).apply( ).apply(
result1, result1,
result2 result2
@ -131,16 +113,14 @@ public class TimeseriesBinaryFnTest
new TimeseriesResultValue( new TimeseriesResultValue(
ImmutableMap.<String, Object>of( ImmutableMap.<String, Object>of(
"rows", 3L, "rows", 3L,
"index", 5L, "index", 5L
"addRowsIndexConstant", 9.0
) )
) )
); );
Result<TimeseriesResultValue> actual = new TimeseriesBinaryFn( Result<TimeseriesResultValue> actual = new TimeseriesBinaryFn(
QueryGranularity.DAY, QueryGranularity.DAY,
aggregatorFactories, aggregatorFactories
postAggregators
).apply( ).apply(
result1, result1,
result2 result2
@ -166,8 +146,7 @@ public class TimeseriesBinaryFnTest
Result<TimeseriesResultValue> actual = new TimeseriesBinaryFn( Result<TimeseriesResultValue> actual = new TimeseriesBinaryFn(
QueryGranularity.ALL, QueryGranularity.ALL,
aggregatorFactories, aggregatorFactories
postAggregators
).apply( ).apply(
result1, result1,
result2 result2
@ -202,16 +181,14 @@ public class TimeseriesBinaryFnTest
new TimeseriesResultValue( new TimeseriesResultValue(
ImmutableMap.<String, Object>of( ImmutableMap.<String, Object>of(
"rows", 3L, "rows", 3L,
"index", 5L, "index", 5L
"addRowsIndexConstant", 9.0
) )
) )
); );
Result<TimeseriesResultValue> actual = new TimeseriesBinaryFn( Result<TimeseriesResultValue> actual = new TimeseriesBinaryFn(
QueryGranularity.ALL, QueryGranularity.ALL,
aggregatorFactories, aggregatorFactories
postAggregators
).apply( ).apply(
result1, result1,
result2 result2

View File

@ -0,0 +1,135 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.topn;
import com.google.caliper.Param;
import com.google.caliper.Runner;
import com.google.caliper.SimpleBenchmark;
import com.google.common.collect.Lists;
import io.druid.granularity.QueryGranularity;
import io.druid.query.Result;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.aggregation.post.ArithmeticPostAggregator;
import io.druid.query.aggregation.post.ConstantPostAggregator;
import io.druid.query.aggregation.post.FieldAccessPostAggregator;
import io.druid.query.dimension.DefaultDimensionSpec;
import org.joda.time.DateTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class TopNBinaryFnBenchmark extends SimpleBenchmark
{
@Param({"1", "5", "10", "15"})
int aggCount;
@Param({"1", "5", "10", "15"})
int postAggCount;
@Param({"1000", "10000"})
int threshold;
Result<TopNResultValue> result1;
Result<TopNResultValue> result2;
TopNBinaryFn fn;
public static void main(String[] args) throws Exception
{
Runner.main(TopNBinaryFnBenchmark.class, args);
}
@Override
protected void setUp() throws Exception
{
final ConstantPostAggregator constant = new ConstantPostAggregator("const", 1L, null);
final FieldAccessPostAggregator rowsPostAgg = new FieldAccessPostAggregator("rows", "rows");
final FieldAccessPostAggregator indexPostAgg = new FieldAccessPostAggregator("index", "index");
final List<AggregatorFactory> aggregatorFactories = new ArrayList<>();
aggregatorFactories.add(new CountAggregatorFactory("rows"));
aggregatorFactories.add(new LongSumAggregatorFactory("index", "index"));
for (int i = 1; i < aggCount; i++) {
aggregatorFactories.add(new CountAggregatorFactory("rows" + i));
}
final List<PostAggregator> postAggregators = new ArrayList<>();
for (int i = 0; i < postAggCount; i++) {
postAggregators.add(
new ArithmeticPostAggregator(
"addrowsindexconstant" + i,
"+",
Lists.newArrayList(constant, rowsPostAgg, indexPostAgg)
)
);
}
final DateTime currTime = new DateTime();
List<Map<String, Object>> list = new ArrayList<>();
for (int i = 0; i < threshold; i++) {
Map<String, Object> res = new HashMap<>();
res.put("testdim", "" + i);
res.put("rows", 1L);
for (int j = 0; j < aggCount; j++) {
res.put("rows" + j, 1L);
}
res.put("index", 1L);
list.add(res);
}
result1 = new Result<>(
currTime,
new TopNResultValue(list)
);
List<Map<String, Object>> list2 = new ArrayList<>();
for (int i = 0; i < threshold; i++) {
Map<String, Object> res = new HashMap<>();
res.put("testdim", "" + i);
res.put("rows", 2L);
for (int j = 0; j < aggCount; j++) {
res.put("rows" + j, 2L);
}
res.put("index", 2L);
list2.add(res);
}
result2 = new Result<>(
currTime,
new TopNResultValue(list2)
);
fn = new TopNBinaryFn(
TopNResultMerger.identity,
QueryGranularity.ALL,
new DefaultDimensionSpec("testdim", null),
new NumericTopNMetricSpec("index"),
100,
aggregatorFactories,
postAggregators
);
}
public void timeMerge(int nReps)
{
for (int i = 0; i < nReps; i++) {
fn.apply(result1, result2);
}
}
}

View File

@ -129,15 +129,13 @@ public class TopNBinaryFnTest
ImmutableMap.<String, Object>of( ImmutableMap.<String, Object>of(
"testdim", "1", "testdim", "1",
"rows", 3L, "rows", 3L,
"index", 5L, "index", 5L
"addrowsindexconstant", 9.0
), ),
ImmutableMap.<String, Object>of( ImmutableMap.<String, Object>of(
"testdim", "2", "testdim", "2",
"rows", 4L, "rows", 4L,
"index", 4L, "index", 4L
"addrowsindexconstant", 9.0
) )
) )
) )
@ -214,14 +212,12 @@ public class TopNBinaryFnTest
ImmutableMap.<String, Object>of( ImmutableMap.<String, Object>of(
"testdim", "1", "testdim", "1",
"rows", 3L, "rows", 3L,
"index", 5L, "index", 5L
"addrowsindexconstant", 9.0
), ),
ImmutableMap.<String, Object>of( ImmutableMap.<String, Object>of(
"testdim", "2", "testdim", "2",
"rows", 4L, "rows", 4L,
"index", 4L, "index", 4L
"addrowsindexconstant", 9.0
) )
) )
) )
@ -427,15 +423,12 @@ public class TopNBinaryFnTest
ImmutableMap.<String, Object>of( ImmutableMap.<String, Object>of(
"testdim", "1", "testdim", "1",
"rows", 3L, "rows", 3L,
"index", 5L, "index", 5L
"addrowsindexconstant", 9.0
), ),
ImmutableMap.<String, Object>of( ImmutableMap.<String, Object>of(
"testdim", "2", "testdim", "2",
"rows", 4L, "rows", 4L,
"index", 4L, "index", 4L )
"addrowsindexconstant", 9.0
)
) )
) )
); );

View File

@ -1176,4 +1176,75 @@ public class TopNQueryRunnerTest
TestHelper.assertExpectedResults(expectedResults, runner.run(query)); TestHelper.assertExpectedResults(expectedResults, runner.run(query));
} }
@Test
public void testTopNDependentPostAgg() {
TopNQuery query = new TopNQueryBuilder()
.dataSource(QueryRunnerTestHelper.dataSource)
.granularity(QueryRunnerTestHelper.allGran)
.dimension(providerDimension)
.metric(QueryRunnerTestHelper.dependentPostAggMetric)
.threshold(4)
.intervals(QueryRunnerTestHelper.fullOnInterval)
.aggregators(
Lists.<AggregatorFactory>newArrayList(
Iterables.concat(
QueryRunnerTestHelper.commonAggregators,
Lists.newArrayList(
new MaxAggregatorFactory("maxIndex", "index"),
new MinAggregatorFactory("minIndex", "index")
)
)
)
)
.postAggregators(
Arrays.<PostAggregator>asList(
QueryRunnerTestHelper.addRowsIndexConstant,
QueryRunnerTestHelper.dependentPostAgg
)
)
.build();
List<Result<TopNResultValue>> expectedResults = Arrays.asList(
new Result<TopNResultValue>(
new DateTime("2011-01-12T00:00:00.000Z"),
new TopNResultValue(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>builder()
.put(providerDimension, "total_market")
.put("rows", 186L)
.put("index", 215679.82879638672D)
.put("addRowsIndexConstant", 215866.82879638672D)
.put(QueryRunnerTestHelper.dependentPostAggMetric, 215867.82879638672D)
.put("uniques", QueryRunnerTestHelper.UNIQUES_2)
.put("maxIndex", 1743.9217529296875D)
.put("minIndex", 792.3260498046875D)
.build(),
ImmutableMap.<String, Object>builder()
.put(providerDimension, "upfront")
.put("rows", 186L)
.put("index", 192046.1060180664D)
.put("addRowsIndexConstant", 192233.1060180664D)
.put(QueryRunnerTestHelper.dependentPostAggMetric, 192234.1060180664D)
.put("uniques", QueryRunnerTestHelper.UNIQUES_2)
.put("maxIndex", 1870.06103515625D)
.put("minIndex", 545.9906005859375D)
.build(),
ImmutableMap.<String, Object>builder()
.put(providerDimension, "spot")
.put("rows", 837L)
.put("index", 95606.57232284546D)
.put("addRowsIndexConstant", 96444.57232284546D)
.put(QueryRunnerTestHelper.dependentPostAggMetric, 96445.57232284546D)
.put("uniques", QueryRunnerTestHelper.UNIQUES_9)
.put("maxIndex", 277.2735290527344D)
.put("minIndex", 59.02102279663086D)
.build()
)
)
)
);
TestHelper.assertExpectedResults(expectedResults, runner.run(query));
}
} }

View File

@ -47,6 +47,7 @@ import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.BySegmentResultValueClass; import io.druid.query.BySegmentResultValueClass;
import io.druid.query.DataSource; import io.druid.query.DataSource;
import io.druid.query.Druids; import io.druid.query.Druids;
import io.druid.query.FinalizeResultsQueryRunner;
import io.druid.query.MapQueryToolChestWarehouse; import io.druid.query.MapQueryToolChestWarehouse;
import io.druid.query.Query; import io.druid.query.Query;
import io.druid.query.QueryConfig; import io.druid.query.QueryConfig;
@ -115,17 +116,21 @@ import java.util.concurrent.Executor;
@RunWith(Parameterized.class) @RunWith(Parameterized.class)
public class CachingClusteredClientTest public class CachingClusteredClientTest
{ {
public static final ImmutableMap<String, Object> CONTEXT = ImmutableMap.of();
public static final MultipleIntervalSegmentSpec SEG_SPEC = new MultipleIntervalSegmentSpec(ImmutableList.<Interval>of());
public static final String DATA_SOURCE = "test";
protected static final DefaultObjectMapper jsonMapper = new DefaultObjectMapper(new SmileFactory());
static {
jsonMapper.getFactory().setCodec(jsonMapper);
}
/** /**
* We want a deterministic test, but we'd also like a bit of randomness for the distribution of segments * We want a deterministic test, but we'd also like a bit of randomness for the distribution of segments
* across servers. Thus, we loop multiple times and each time use a deterministically created Random instance. * across servers. Thus, we loop multiple times and each time use a deterministically created Random instance.
* Increase this value to increase exposure to random situations at the expense of test run time. * Increase this value to increase exposure to random situations at the expense of test run time.
*/ */
private static final int RANDOMNESS = 10; private static final int RANDOMNESS = 10;
public static final ImmutableMap<String, Object> CONTEXT = ImmutableMap.of();
public static final MultipleIntervalSegmentSpec SEG_SPEC = new MultipleIntervalSegmentSpec(ImmutableList.<Interval>of());
public static final String DATA_SOURCE = "test";
private static final List<AggregatorFactory> AGGS = Arrays.asList( private static final List<AggregatorFactory> AGGS = Arrays.asList(
new CountAggregatorFactory("rows"), new CountAggregatorFactory("rows"),
new LongSumAggregatorFactory("imps", "imps"), new LongSumAggregatorFactory("imps", "imps"),
@ -152,6 +157,17 @@ public class CachingClusteredClientTest
private static final DateTimeZone TIMEZONE = DateTimeZone.forID("America/Los_Angeles"); private static final DateTimeZone TIMEZONE = DateTimeZone.forID("America/Los_Angeles");
private static final QueryGranularity PT1H_TZ_GRANULARITY = new PeriodGranularity(new Period("PT1H"), null, TIMEZONE); private static final QueryGranularity PT1H_TZ_GRANULARITY = new PeriodGranularity(new Period("PT1H"), null, TIMEZONE);
private static final String TOP_DIM = "a_dim"; private static final String TOP_DIM = "a_dim";
private final Random random;
protected VersionedIntervalTimeline<String, ServerSelector> timeline;
protected TimelineServerView serverView;
protected Cache cache;
public CachingClusteredClient client;
DruidServer[] servers;
public CachingClusteredClientTest(int randomSeed)
{
this.random = new Random(randomSeed);
}
@Parameterized.Parameters @Parameterized.Parameters
public static Collection<?> constructorFeeder() throws IOException public static Collection<?> constructorFeeder() throws IOException
@ -169,28 +185,6 @@ public class CachingClusteredClientTest
); );
} }
protected static final DefaultObjectMapper jsonMapper = new DefaultObjectMapper(new SmileFactory());
static {
jsonMapper.getFactory().setCodec(jsonMapper);
}
private final Random random;
protected VersionedIntervalTimeline<String, ServerSelector> timeline;
protected TimelineServerView serverView;
protected Cache cache;
CachingClusteredClient client;
DruidServer[] servers;
public CachingClusteredClientTest(int randomSeed)
{
this.random = new Random(randomSeed);
}
@Before @Before
public void setUp() throws Exception public void setUp() throws Exception
{ {
@ -222,7 +216,10 @@ public class CachingClusteredClientTest
.postAggregators(POST_AGGS) .postAggregators(POST_AGGS)
.context(CONTEXT); .context(CONTEXT);
QueryRunner runner = new FinalizeResultsQueryRunner(client, new TimeseriesQueryQueryToolChest(new QueryConfig()));
testQueryCaching( testQueryCaching(
runner,
builder.build(), builder.build(),
new Interval("2011-01-01/2011-01-02"), makeTimeResults(new DateTime("2011-01-01"), 50, 5000), new Interval("2011-01-01/2011-01-02"), makeTimeResults(new DateTime("2011-01-01"), 50, 5000),
new Interval("2011-01-02/2011-01-03"), makeTimeResults(new DateTime("2011-01-02"), 30, 6000), new Interval("2011-01-02/2011-01-03"), makeTimeResults(new DateTime("2011-01-02"), 30, 6000),
@ -263,7 +260,7 @@ public class CachingClusteredClientTest
new DateTime("2011-01-09"), 18, 521, new DateTime("2011-01-09"), 18, 521,
new DateTime("2011-01-09T01"), 181, 52 new DateTime("2011-01-09T01"), 181, 52
), ),
client.run( runner.run(
builder.intervals("2011-01-01/2011-01-10") builder.intervals("2011-01-01/2011-01-10")
.aggregators(RENAMED_AGGS) .aggregators(RENAMED_AGGS)
.postAggregators(RENAMED_POST_AGGS) .postAggregators(RENAMED_POST_AGGS)
@ -285,7 +282,10 @@ public class CachingClusteredClientTest
.postAggregators(POST_AGGS) .postAggregators(POST_AGGS)
.context(CONTEXT); .context(CONTEXT);
QueryRunner runner = new FinalizeResultsQueryRunner(client, new TimeseriesQueryQueryToolChest(new QueryConfig()));
testQueryCaching( testQueryCaching(
runner,
builder.build(), builder.build(),
new Interval("2011-11-04/2011-11-08"), new Interval("2011-11-04/2011-11-08"),
makeTimeResults( makeTimeResults(
@ -303,7 +303,7 @@ public class CachingClusteredClientTest
new DateTime("2011-11-06", TIMEZONE), 23, 85312, new DateTime("2011-11-06", TIMEZONE), 23, 85312,
new DateTime("2011-11-07", TIMEZONE), 85, 102 new DateTime("2011-11-07", TIMEZONE), 85, 102
), ),
client.run( runner.run(
builder.intervals("2011-11-04/2011-11-08") builder.intervals("2011-11-04/2011-11-08")
.aggregators(RENAMED_AGGS) .aggregators(RENAMED_AGGS)
.postAggregators(RENAMED_POST_AGGS) .postAggregators(RENAMED_POST_AGGS)
@ -324,6 +324,7 @@ public class CachingClusteredClientTest
.postAggregators(POST_AGGS); .postAggregators(POST_AGGS);
testQueryCaching( testQueryCaching(
client,
1, 1,
true, true,
builder.context( builder.context(
@ -342,6 +343,7 @@ public class CachingClusteredClientTest
cache.close("0_0"); cache.close("0_0");
testQueryCaching( testQueryCaching(
client,
1, 1,
false, false,
builder.context( builder.context(
@ -358,6 +360,7 @@ public class CachingClusteredClientTest
Assert.assertEquals(0, cache.getStats().getNumMisses()); Assert.assertEquals(0, cache.getStats().getNumMisses());
testQueryCaching( testQueryCaching(
client,
1, 1,
false, false,
builder.context( builder.context(
@ -390,7 +393,10 @@ public class CachingClusteredClientTest
.postAggregators(POST_AGGS) .postAggregators(POST_AGGS)
.context(CONTEXT); .context(CONTEXT);
QueryRunner runner = new FinalizeResultsQueryRunner(client, new TopNQueryQueryToolChest(new TopNQueryConfig()));
testQueryCaching( testQueryCaching(
runner,
builder.build(), builder.build(),
new Interval("2011-01-01/2011-01-02"), new Interval("2011-01-01/2011-01-02"),
makeTopNResults(new DateTime("2011-01-01"), "a", 50, 5000, "b", 50, 4999, "c", 50, 4998), makeTopNResults(new DateTime("2011-01-01"), "a", 50, 5000, "b", 50, 4999, "c", 50, 4998),
@ -432,7 +438,7 @@ public class CachingClusteredClientTest
new DateTime("2011-01-09"), "a", 50, 4985, "b", 50, 4984, "c", 50, 4983, new DateTime("2011-01-09"), "a", 50, 4985, "b", 50, 4984, "c", 50, 4983,
new DateTime("2011-01-09T01"), "a", 50, 4985, "b", 50, 4984, "c", 50, 4983 new DateTime("2011-01-09T01"), "a", 50, 4985, "b", 50, 4984, "c", 50, 4983
), ),
client.run( runner.run(
builder.intervals("2011-01-01/2011-01-10") builder.intervals("2011-01-01/2011-01-10")
.metric("imps") .metric("imps")
.aggregators(RENAMED_AGGS) .aggregators(RENAMED_AGGS)
@ -458,7 +464,10 @@ public class CachingClusteredClientTest
.postAggregators(POST_AGGS) .postAggregators(POST_AGGS)
.context(CONTEXT); .context(CONTEXT);
QueryRunner runner = new FinalizeResultsQueryRunner(client, new TopNQueryQueryToolChest(new TopNQueryConfig()));
testQueryCaching( testQueryCaching(
runner,
builder.build(), builder.build(),
new Interval("2011-11-04/2011-11-08"), new Interval("2011-11-04/2011-11-08"),
makeTopNResults( makeTopNResults(
@ -477,7 +486,7 @@ public class CachingClusteredClientTest
new DateTime("2011-11-06", TIMEZONE), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989, new DateTime("2011-11-06", TIMEZONE), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989,
new DateTime("2011-11-07", TIMEZONE), "a", 50, 4988, "b", 50, 4987, "c", 50, 4986 new DateTime("2011-11-07", TIMEZONE), "a", 50, 4988, "b", 50, 4987, "c", 50, 4986
), ),
client.run( runner.run(
builder.intervals("2011-11-04/2011-11-08") builder.intervals("2011-11-04/2011-11-08")
.metric("imps") .metric("imps")
.aggregators(RENAMED_AGGS) .aggregators(RENAMED_AGGS)
@ -503,7 +512,9 @@ public class CachingClusteredClientTest
.postAggregators(POST_AGGS) .postAggregators(POST_AGGS)
.context(CONTEXT); .context(CONTEXT);
QueryRunner runner = new FinalizeResultsQueryRunner(client, new TopNQueryQueryToolChest(new TopNQueryConfig()));
testQueryCaching( testQueryCaching(
runner,
builder.build(), builder.build(),
new Interval("2011-01-01/2011-01-02"), new Interval("2011-01-01/2011-01-02"),
makeTopNResults(), makeTopNResults(),
@ -530,6 +541,7 @@ public class CachingClusteredClientTest
) )
); );
TestHelper.assertExpectedResults( TestHelper.assertExpectedResults(
makeRenamedTopNResults( makeRenamedTopNResults(
new DateTime("2011-01-05"), "a", 50, 4994, "b", 50, 4993, "c", 50, 4992, new DateTime("2011-01-05"), "a", 50, 4994, "b", 50, 4993, "c", 50, 4992,
@ -543,7 +555,7 @@ public class CachingClusteredClientTest
new DateTime("2011-01-09"), "a", 50, 4985, "b", 50, 4984, "c", 50, 4983, new DateTime("2011-01-09"), "a", 50, 4985, "b", 50, 4984, "c", 50, 4983,
new DateTime("2011-01-09T01"), "a", 50, 4985, "b", 50, 4984, "c", 50, 4983 new DateTime("2011-01-09T01"), "a", 50, 4985, "b", 50, 4984, "c", 50, 4983
), ),
client.run( runner.run(
builder.intervals("2011-01-01/2011-01-10") builder.intervals("2011-01-01/2011-01-10")
.metric("imps") .metric("imps")
.aggregators(RENAMED_AGGS) .aggregators(RENAMED_AGGS)
@ -557,6 +569,7 @@ public class CachingClusteredClientTest
public void testSearchCaching() throws Exception public void testSearchCaching() throws Exception
{ {
testQueryCaching( testQueryCaching(
client,
new SearchQuery( new SearchQuery(
new TableDataSource(DATA_SOURCE), new TableDataSource(DATA_SOURCE),
DIM_FILTER, DIM_FILTER,
@ -594,13 +607,14 @@ public class CachingClusteredClientTest
); );
} }
public void testQueryCaching(final Query query, Object... args) public void testQueryCaching(QueryRunner runner, final Query query, Object... args)
{ {
testQueryCaching(3, true, query, args); testQueryCaching(runner, 3, true, query, args);
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void testQueryCaching( public void testQueryCaching(
final QueryRunner runner,
final int numTimesToQuery, final int numTimesToQuery,
boolean expectBySegment, boolean expectBySegment,
final Query query, Object... args // does this assume query intervals must be ordered? final Query query, Object... args // does this assume query intervals must be ordered?
@ -754,7 +768,7 @@ public class CachingClusteredClientTest
} }
) )
), ),
client.run( runner.run(
query.withQuerySegmentSpec( query.withQuerySegmentSpec(
new MultipleIntervalSegmentSpec( new MultipleIntervalSegmentSpec(
Arrays.asList( Arrays.asList(
@ -774,7 +788,7 @@ public class CachingClusteredClientTest
for (Capture queryCapture : queryCaptures) { for (Capture queryCapture : queryCaptures) {
Query capturedQuery = (Query) queryCapture.getValue(); Query capturedQuery = (Query) queryCapture.getValue();
if (expectBySegment) { if (expectBySegment) {
Assert.assertEquals(true, capturedQuery.<Boolean>getContextValue("bySegment")); Assert.assertEquals(true, capturedQuery.getContextValue("bySegment"));
} else { } else {
Assert.assertTrue( Assert.assertTrue(
capturedQuery.getContextValue("bySegment") == null || capturedQuery.getContextValue("bySegment") == null ||
@ -1253,6 +1267,8 @@ public class CachingClusteredClientTest
private class MyDataSegment extends DataSegment private class MyDataSegment extends DataSegment
{ {
private final DataSegment baseSegment = segment;
private MyDataSegment() private MyDataSegment()
{ {
super( super(
@ -1268,8 +1284,6 @@ public class CachingClusteredClientTest
); );
} }
private final DataSegment baseSegment = segment;
@Override @Override
@JsonProperty @JsonProperty
public String getDataSource() public String getDataSource()
@ -1370,7 +1384,6 @@ public class CachingClusteredClientTest
{ {
private final DruidServer server; private final DruidServer server;
private final QueryRunner queryRunner; private final QueryRunner queryRunner;
private final List<ServerExpectation> expectations = Lists.newArrayList(); private final List<ServerExpectation> expectations = Lists.newArrayList();
public ServerExpectations( public ServerExpectations(