Compute postage only when required

This commit is contained in:
nishantmonu51 2014-04-07 23:17:25 +05:30
parent 8138d96f9d
commit db35009acd
10 changed files with 191 additions and 34 deletions

View File

@ -36,7 +36,6 @@ import java.util.List;
public class InvertedTopNMetricSpec implements TopNMetricSpec public class InvertedTopNMetricSpec implements TopNMetricSpec
{ {
private static final byte CACHE_TYPE_ID = 0x3; private static final byte CACHE_TYPE_ID = 0x3;
private final TopNMetricSpec delegate; private final TopNMetricSpec delegate;
@JsonCreator @JsonCreator
@ -102,15 +101,27 @@ public class InvertedTopNMetricSpec implements TopNMetricSpec
delegate.initTopNAlgorithmSelector(selector); delegate.initTopNAlgorithmSelector(selector);
} }
@Override
public String getMetricName(DimensionSpec dimSpec)
{
return delegate.getMetricName(dimSpec);
}
@Override @Override
public boolean equals(Object o) public boolean equals(Object o)
{ {
if (this == o) return true; if (this == o) {
if (o == null || getClass() != o.getClass()) return false; return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
InvertedTopNMetricSpec that = (InvertedTopNMetricSpec) o; InvertedTopNMetricSpec that = (InvertedTopNMetricSpec) o;
if (delegate != null ? !delegate.equals(that.delegate) : that.delegate != null) return false; if (delegate != null ? !delegate.equals(that.delegate) : that.delegate != null) {
return false;
}
return true; return true;
} }

View File

@ -111,6 +111,12 @@ public class LexicographicTopNMetricSpec implements TopNMetricSpec
selector.setAggregateAllMetrics(true); selector.setAggregateAllMetrics(true);
} }
@Override
public String getMetricName(DimensionSpec dimSpec)
{
return dimSpec.getOutputName();
}
@Override @Override
public String toString() public String toString()
{ {

View File

@ -150,6 +150,12 @@ public class NumericTopNMetricSpec implements TopNMetricSpec
selector.setAggregateTopNMetricFirst(true); selector.setAggregateTopNMetricFirst(true);
} }
@Override
public String getMetricName(DimensionSpec dimSpec)
{
return metric;
}
@Override @Override
public String toString() public String toString()
{ {

View File

@ -40,7 +40,7 @@ public class TopNBinaryFn implements BinaryFn<Result<TopNResultValue>, Result<To
private final TopNResultMerger merger; private final TopNResultMerger merger;
private final DimensionSpec dimSpec; private final DimensionSpec dimSpec;
private final QueryGranularity gran; private final QueryGranularity gran;
private final String dimension; private final DimensionSpec dimensionSpec;
private final TopNMetricSpec topNMetricSpec; private final TopNMetricSpec topNMetricSpec;
private final int threshold; private final int threshold;
private final List<AggregatorFactory> aggregations; private final List<AggregatorFactory> aggregations;
@ -65,7 +65,7 @@ public class TopNBinaryFn implements BinaryFn<Result<TopNResultValue>, Result<To
this.aggregations = aggregatorSpecs; this.aggregations = aggregatorSpecs;
this.postAggregations = postAggregatorSpecs; this.postAggregations = postAggregatorSpecs;
this.dimension = dimSpec.getOutputName(); this.dimensionSpec = dimSpec;
this.comparator = topNMetricSpec.getComparator(aggregatorSpecs, postAggregatorSpecs); this.comparator = topNMetricSpec.getComparator(aggregatorSpecs, postAggregatorSpecs);
} }
@ -79,11 +79,13 @@ public class TopNBinaryFn implements BinaryFn<Result<TopNResultValue>, Result<To
return merger.getResult(arg1, comparator); return merger.getResult(arg1, comparator);
} }
Map<String, DimensionAndMetricValueExtractor> retVals = new LinkedHashMap<String, DimensionAndMetricValueExtractor>();
TopNResultValue arg1Vals = arg1.getValue(); TopNResultValue arg1Vals = arg1.getValue();
TopNResultValue arg2Vals = arg2.getValue(); TopNResultValue arg2Vals = arg2.getValue();
Map<String, DimensionAndMetricValueExtractor> retVals = new LinkedHashMap<String, DimensionAndMetricValueExtractor>();
String dimension = dimensionSpec.getOutputName();
String topNMetricName = topNMetricSpec.getMetricName(dimensionSpec);
for (DimensionAndMetricValueExtractor arg1Val : arg1Vals) { for (DimensionAndMetricValueExtractor arg1Val : arg1Vals) {
retVals.put(arg1Val.getStringDimensionValue(dimension), arg1Val); retVals.put(arg1Val.getStringDimensionValue(dimension), arg1Val);
} }
@ -92,16 +94,17 @@ public class TopNBinaryFn implements BinaryFn<Result<TopNResultValue>, Result<To
DimensionAndMetricValueExtractor arg1Val = retVals.get(dimensionValue); DimensionAndMetricValueExtractor arg1Val = retVals.get(dimensionValue);
if (arg1Val != null) { if (arg1Val != null) {
Map<String, Object> retVal = new LinkedHashMap<String, Object>(); Map<String, Object> retVal = new LinkedHashMap<String, Object>(aggregations.size() + 2);
retVal.put(dimension, dimensionValue); retVal.put(dimension, dimensionValue);
for (AggregatorFactory factory : aggregations) { for (AggregatorFactory factory : aggregations) {
final String metricName = factory.getName(); final String metricName = factory.getName();
retVal.put(metricName, factory.combine(arg1Val.getMetric(metricName), arg2Val.getMetric(metricName))); retVal.put(metricName, factory.combine(arg1Val.getMetric(metricName), arg2Val.getMetric(metricName)));
} }
for (PostAggregator postAgg : postAggregations) {
for (PostAggregator pf : postAggregations) { if (postAgg.getName().equals(topNMetricName)) {
retVal.put(pf.getName(), pf.compute(retVal)); retVal.put(postAgg.getName(), postAgg.compute(retVal));
}
} }
retVals.put(dimensionValue, new DimensionAndMetricValueExtractor(retVal)); retVals.put(dimensionValue, new DimensionAndMetricValueExtractor(retVal));

View File

@ -75,9 +75,6 @@ public class TopNLexicographicResultBuilder implements TopNResultBuilder
for (Object metricVal : metricVals) { for (Object metricVal : metricVals) {
metricValues.put(aggsIter.next().getName(), metricVal); metricValues.put(aggsIter.next().getName(), metricVal);
} }
for (PostAggregator postAgg : postAggs) {
metricValues.put(postAgg.getName(), postAgg.compute(metricValues));
}
pQueue.add(new DimValHolder.Builder().withDirName(dimName).withMetricValues(metricValues).build()); pQueue.add(new DimValHolder.Builder().withDirName(dimName).withMetricValues(metricValues).build());
} }

View File

@ -55,4 +55,6 @@ public interface TopNMetricSpec
public <T> TopNMetricSpecBuilder<T> configureOptimizer(TopNMetricSpecBuilder<T> builder); public <T> TopNMetricSpecBuilder<T> configureOptimizer(TopNMetricSpecBuilder<T> builder);
public void initTopNAlgorithmSelector(TopNAlgorithmSelector selector); public void initTopNAlgorithmSelector(TopNAlgorithmSelector selector);
public String getMetricName(DimensionSpec dimSpec);
} }

View File

@ -40,7 +40,6 @@ public class TopNNumericResultBuilder implements TopNResultBuilder
private final DateTime timestamp; private final DateTime timestamp;
private final DimensionSpec dimSpec; private final DimensionSpec dimSpec;
private final String metricName; private final String metricName;
private MinMaxPriorityQueue<DimValHolder> pQueue = null; private MinMaxPriorityQueue<DimValHolder> pQueue = null;
public TopNNumericResultBuilder( public TopNNumericResultBuilder(
@ -75,8 +74,12 @@ public class TopNNumericResultBuilder implements TopNResultBuilder
for (Object metricVal : metricVals) { for (Object metricVal : metricVals) {
metricValues.put(aggFactoryIter.next().getName(), metricVal); metricValues.put(aggFactoryIter.next().getName(), metricVal);
} }
for (PostAggregator postAgg : postAggs) { for (PostAggregator postAgg : postAggs) {
metricValues.put(postAgg.getName(), postAgg.compute(metricValues)); if (postAgg.getName().equals(metricName)) {
metricValues.put(postAgg.getName(), postAgg.compute(metricValues));
break;
}
} }
Object topNMetricVal = metricValues.get(metricName); Object topNMetricVal = metricValues.get(metricName);

View File

@ -161,7 +161,12 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
values.put(agg.getName(), fn.manipulate(agg, input.getMetric(agg.getName()))); values.put(agg.getName(), fn.manipulate(agg, input.getMetric(agg.getName())));
} }
for (PostAggregator postAgg : query.getPostAggregatorSpecs()) { for (PostAggregator postAgg : query.getPostAggregatorSpecs()) {
values.put(postAgg.getName(), input.getMetric(postAgg.getName())); Object calculatedPostAgg = input.getMetric(postAgg.getName());
if (calculatedPostAgg != null) {
values.put(postAgg.getName(), input.getMetric(postAgg.getName()));
} else {
values.put(postAgg.getName(), postAgg.compute(values));
}
} }
values.put(dimension, input.getDimensionValue(dimension)); values.put(dimension, input.getDimensionValue(dimension));
@ -281,10 +286,6 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
vals.put(factory.getName(), factory.deserialize(resultIter.next())); vals.put(factory.getName(), factory.deserialize(resultIter.next()));
} }
for (PostAggregator postAgg : postAggs) {
vals.put(postAgg.getName(), postAgg.compute(vals));
}
retVal.add(vals); retVal.add(vals);
} }

View File

@ -0,0 +1,135 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.topn;
import com.google.caliper.Param;
import com.google.caliper.Runner;
import com.google.caliper.SimpleBenchmark;
import com.google.common.collect.Lists;
import io.druid.granularity.QueryGranularity;
import io.druid.query.Result;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.aggregation.post.ArithmeticPostAggregator;
import io.druid.query.aggregation.post.ConstantPostAggregator;
import io.druid.query.aggregation.post.FieldAccessPostAggregator;
import io.druid.query.dimension.DefaultDimensionSpec;
import org.joda.time.DateTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class TopNBinaryFnBenchmark extends SimpleBenchmark
{
@Param({"1", "5", "10", "15"})
int aggCount;
@Param({"1", "5", "10", "15"})
int postAggCount;
@Param({"1000", "10000"})
int threshold;
Result<TopNResultValue> result1;
Result<TopNResultValue> result2;
TopNBinaryFn fn;
public static void main(String[] args) throws Exception
{
Runner.main(TopNBinaryFnBenchmark.class, args);
}
@Override
protected void setUp() throws Exception
{
final ConstantPostAggregator constant = new ConstantPostAggregator("const", 1L, null);
final FieldAccessPostAggregator rowsPostAgg = new FieldAccessPostAggregator("rows", "rows");
final FieldAccessPostAggregator indexPostAgg = new FieldAccessPostAggregator("index", "index");
final List<AggregatorFactory> aggregatorFactories = new ArrayList<>();
aggregatorFactories.add(new CountAggregatorFactory("rows"));
aggregatorFactories.add(new LongSumAggregatorFactory("index", "index"));
for (int i = 1; i < aggCount; i++) {
aggregatorFactories.add(new CountAggregatorFactory("rows" + i));
}
final List<PostAggregator> postAggregators = new ArrayList<>();
for (int i = 0; i < postAggCount; i++) {
postAggregators.add(
new ArithmeticPostAggregator(
"addrowsindexconstant" + i,
"+",
Lists.newArrayList(constant, rowsPostAgg, indexPostAgg)
)
);
}
final DateTime currTime = new DateTime();
List<Map<String, Object>> list = new ArrayList<>();
for (int i = 0; i < threshold; i++) {
Map<String, Object> res = new HashMap<>();
res.put("testdim", "" + i);
res.put("rows", 1L);
for (int j = 0; j < aggCount; j++) {
res.put("rows" + j, 1L);
}
res.put("index", 1L);
list.add(res);
}
result1 = new Result<>(
currTime,
new TopNResultValue(list)
);
List<Map<String, Object>> list2 = new ArrayList<>();
for (int i = 0; i < threshold; i++) {
Map<String, Object> res = new HashMap<>();
res.put("testdim", "" + i);
res.put("rows", 2L);
for (int j = 0; j < aggCount; j++) {
res.put("rows" + j, 2L);
}
res.put("index", 2L);
list2.add(res);
}
result2 = new Result<>(
currTime,
new TopNResultValue(list2)
);
fn = new TopNBinaryFn(
TopNResultMerger.identity,
QueryGranularity.ALL,
new DefaultDimensionSpec("testdim", null),
new NumericTopNMetricSpec("index"),
100,
aggregatorFactories,
postAggregators
);
}
public void timeMerge(int nReps)
{
for (int i = 0; i < nReps; i++) {
fn.apply(result1, result2);
}
}
}

View File

@ -129,15 +129,13 @@ public class TopNBinaryFnTest
ImmutableMap.<String, Object>of( ImmutableMap.<String, Object>of(
"testdim", "1", "testdim", "1",
"rows", 3L, "rows", 3L,
"index", 5L, "index", 5L
"addrowsindexconstant", 9.0
), ),
ImmutableMap.<String, Object>of( ImmutableMap.<String, Object>of(
"testdim", "2", "testdim", "2",
"rows", 4L, "rows", 4L,
"index", 4L, "index", 4L
"addrowsindexconstant", 9.0
) )
) )
) )
@ -214,14 +212,12 @@ public class TopNBinaryFnTest
ImmutableMap.<String, Object>of( ImmutableMap.<String, Object>of(
"testdim", "1", "testdim", "1",
"rows", 3L, "rows", 3L,
"index", 5L, "index", 5L
"addrowsindexconstant", 9.0
), ),
ImmutableMap.<String, Object>of( ImmutableMap.<String, Object>of(
"testdim", "2", "testdim", "2",
"rows", 4L, "rows", 4L,
"index", 4L, "index", 4L
"addrowsindexconstant", 9.0
) )
) )
) )
@ -427,15 +423,12 @@ public class TopNBinaryFnTest
ImmutableMap.<String, Object>of( ImmutableMap.<String, Object>of(
"testdim", "1", "testdim", "1",
"rows", 3L, "rows", 3L,
"index", 5L, "index", 5L
"addrowsindexconstant", 9.0
), ),
ImmutableMap.<String, Object>of( ImmutableMap.<String, Object>of(
"testdim", "2", "testdim", "2",
"rows", 4L, "rows", 4L,
"index", 4L, "index", 4L )
"addrowsindexconstant", 9.0
)
) )
) )
); );