diff --git a/processing/src/main/java/io/druid/query/topn/InvertedTopNMetricSpec.java b/processing/src/main/java/io/druid/query/topn/InvertedTopNMetricSpec.java index e5a7fdfedde..ab5caa60a81 100644 --- a/processing/src/main/java/io/druid/query/topn/InvertedTopNMetricSpec.java +++ b/processing/src/main/java/io/druid/query/topn/InvertedTopNMetricSpec.java @@ -36,7 +36,6 @@ import java.util.List; public class InvertedTopNMetricSpec implements TopNMetricSpec { private static final byte CACHE_TYPE_ID = 0x3; - private final TopNMetricSpec delegate; @JsonCreator @@ -102,15 +101,27 @@ public class InvertedTopNMetricSpec implements TopNMetricSpec delegate.initTopNAlgorithmSelector(selector); } + @Override + public String getMetricName(DimensionSpec dimSpec) + { + return delegate.getMetricName(dimSpec); + } + @Override public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } InvertedTopNMetricSpec that = (InvertedTopNMetricSpec) o; - if (delegate != null ? !delegate.equals(that.delegate) : that.delegate != null) return false; + if (delegate != null ? !delegate.equals(that.delegate) : that.delegate != null) { + return false; + } return true; } diff --git a/processing/src/main/java/io/druid/query/topn/LexicographicTopNMetricSpec.java b/processing/src/main/java/io/druid/query/topn/LexicographicTopNMetricSpec.java index 0f2c8344b14..5c1b9ab41e8 100644 --- a/processing/src/main/java/io/druid/query/topn/LexicographicTopNMetricSpec.java +++ b/processing/src/main/java/io/druid/query/topn/LexicographicTopNMetricSpec.java @@ -111,6 +111,12 @@ public class LexicographicTopNMetricSpec implements TopNMetricSpec selector.setAggregateAllMetrics(true); } + @Override + public String getMetricName(DimensionSpec dimSpec) + { + return dimSpec.getOutputName(); + } + @Override public String toString() { diff --git a/processing/src/main/java/io/druid/query/topn/NumericTopNMetricSpec.java b/processing/src/main/java/io/druid/query/topn/NumericTopNMetricSpec.java index 76f1a9341ac..fe5bce241a6 100644 --- a/processing/src/main/java/io/druid/query/topn/NumericTopNMetricSpec.java +++ b/processing/src/main/java/io/druid/query/topn/NumericTopNMetricSpec.java @@ -150,6 +150,12 @@ public class NumericTopNMetricSpec implements TopNMetricSpec selector.setAggregateTopNMetricFirst(true); } + @Override + public String getMetricName(DimensionSpec dimSpec) + { + return metric; + } + @Override public String toString() { diff --git a/processing/src/main/java/io/druid/query/topn/TopNBinaryFn.java b/processing/src/main/java/io/druid/query/topn/TopNBinaryFn.java index 437c28f640f..f848f3c9e06 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNBinaryFn.java +++ b/processing/src/main/java/io/druid/query/topn/TopNBinaryFn.java @@ -40,7 +40,7 @@ public class TopNBinaryFn implements BinaryFn, Result aggregations; @@ -65,7 +65,7 @@ public class TopNBinaryFn implements BinaryFn, Result, Result retVals = new LinkedHashMap(); TopNResultValue arg1Vals = arg1.getValue(); TopNResultValue arg2Vals = arg2.getValue(); + Map retVals = new LinkedHashMap(); + String dimension = dimensionSpec.getOutputName(); + String topNMetricName = topNMetricSpec.getMetricName(dimensionSpec); for (DimensionAndMetricValueExtractor arg1Val : arg1Vals) { retVals.put(arg1Val.getStringDimensionValue(dimension), arg1Val); } @@ -92,16 +94,17 @@ public class TopNBinaryFn implements BinaryFn, Result retVal = new LinkedHashMap(); + Map retVal = new LinkedHashMap(aggregations.size() + 2); retVal.put(dimension, dimensionValue); for (AggregatorFactory factory : aggregations) { final String metricName = factory.getName(); retVal.put(metricName, factory.combine(arg1Val.getMetric(metricName), arg2Val.getMetric(metricName))); } - - for (PostAggregator pf : postAggregations) { - retVal.put(pf.getName(), pf.compute(retVal)); + for (PostAggregator postAgg : postAggregations) { + if (postAgg.getName().equals(topNMetricName)) { + retVal.put(postAgg.getName(), postAgg.compute(retVal)); + } } retVals.put(dimensionValue, new DimensionAndMetricValueExtractor(retVal)); diff --git a/processing/src/main/java/io/druid/query/topn/TopNLexicographicResultBuilder.java b/processing/src/main/java/io/druid/query/topn/TopNLexicographicResultBuilder.java index 37360dfb1cd..b12f4c85b51 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNLexicographicResultBuilder.java +++ b/processing/src/main/java/io/druid/query/topn/TopNLexicographicResultBuilder.java @@ -75,9 +75,6 @@ public class TopNLexicographicResultBuilder implements TopNResultBuilder for (Object metricVal : metricVals) { metricValues.put(aggsIter.next().getName(), metricVal); } - for (PostAggregator postAgg : postAggs) { - metricValues.put(postAgg.getName(), postAgg.compute(metricValues)); - } pQueue.add(new DimValHolder.Builder().withDirName(dimName).withMetricValues(metricValues).build()); } diff --git a/processing/src/main/java/io/druid/query/topn/TopNMetricSpec.java b/processing/src/main/java/io/druid/query/topn/TopNMetricSpec.java index c2baf13e3eb..44103f93708 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNMetricSpec.java +++ b/processing/src/main/java/io/druid/query/topn/TopNMetricSpec.java @@ -55,4 +55,6 @@ public interface TopNMetricSpec public TopNMetricSpecBuilder configureOptimizer(TopNMetricSpecBuilder builder); public void initTopNAlgorithmSelector(TopNAlgorithmSelector selector); + + public String getMetricName(DimensionSpec dimSpec); } diff --git a/processing/src/main/java/io/druid/query/topn/TopNNumericResultBuilder.java b/processing/src/main/java/io/druid/query/topn/TopNNumericResultBuilder.java index 9f6479baee4..a806c7966ec 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNNumericResultBuilder.java +++ b/processing/src/main/java/io/druid/query/topn/TopNNumericResultBuilder.java @@ -40,7 +40,6 @@ public class TopNNumericResultBuilder implements TopNResultBuilder private final DateTime timestamp; private final DimensionSpec dimSpec; private final String metricName; - private MinMaxPriorityQueue pQueue = null; public TopNNumericResultBuilder( @@ -75,8 +74,12 @@ public class TopNNumericResultBuilder implements TopNResultBuilder for (Object metricVal : metricVals) { metricValues.put(aggFactoryIter.next().getName(), metricVal); } + for (PostAggregator postAgg : postAggs) { - metricValues.put(postAgg.getName(), postAgg.compute(metricValues)); + if (postAgg.getName().equals(metricName)) { + metricValues.put(postAgg.getName(), postAgg.compute(metricValues)); + break; + } } Object topNMetricVal = metricValues.get(metricName); diff --git a/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java index a7d77fde396..0c78f6305fe 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java @@ -161,7 +161,12 @@ public class TopNQueryQueryToolChest extends QueryToolChest result1; + Result result2; + TopNBinaryFn fn; + + public static void main(String[] args) throws Exception + { + Runner.main(TopNBinaryFnBenchmark.class, args); + } + + @Override + protected void setUp() throws Exception + { + + final ConstantPostAggregator constant = new ConstantPostAggregator("const", 1L, null); + final FieldAccessPostAggregator rowsPostAgg = new FieldAccessPostAggregator("rows", "rows"); + final FieldAccessPostAggregator indexPostAgg = new FieldAccessPostAggregator("index", "index"); + + + final List aggregatorFactories = new ArrayList<>(); + aggregatorFactories.add(new CountAggregatorFactory("rows")); + aggregatorFactories.add(new LongSumAggregatorFactory("index", "index")); + for (int i = 1; i < aggCount; i++) { + aggregatorFactories.add(new CountAggregatorFactory("rows" + i)); + } + final List postAggregators = new ArrayList<>(); + for (int i = 0; i < postAggCount; i++) { + postAggregators.add( + new ArithmeticPostAggregator( + "addrowsindexconstant" + i, + "+", + Lists.newArrayList(constant, rowsPostAgg, indexPostAgg) + ) + ); + } + final DateTime currTime = new DateTime(); + List> list = new ArrayList<>(); + for (int i = 0; i < threshold; i++) { + Map res = new HashMap<>(); + res.put("testdim", "" + i); + res.put("rows", 1L); + for (int j = 0; j < aggCount; j++) { + res.put("rows" + j, 1L); + } + res.put("index", 1L); + list.add(res); + } + result1 = new Result<>( + currTime, + new TopNResultValue(list) + ); + + List> list2 = new ArrayList<>(); + for (int i = 0; i < threshold; i++) { + Map res = new HashMap<>(); + res.put("testdim", "" + i); + res.put("rows", 2L); + for (int j = 0; j < aggCount; j++) { + res.put("rows" + j, 2L); + } + res.put("index", 2L); + list2.add(res); + } + result2 = new Result<>( + currTime, + new TopNResultValue(list2) + ); + fn = new TopNBinaryFn( + TopNResultMerger.identity, + QueryGranularity.ALL, + new DefaultDimensionSpec("testdim", null), + new NumericTopNMetricSpec("index"), + 100, + aggregatorFactories, + postAggregators + ); + } + + public void timeMerge(int nReps) + { + for (int i = 0; i < nReps; i++) { + fn.apply(result1, result2); + } + } + +} diff --git a/processing/src/test/java/io/druid/query/topn/TopNBinaryFnTest.java b/processing/src/test/java/io/druid/query/topn/TopNBinaryFnTest.java index 0eba9778c4e..cb3089d6397 100644 --- a/processing/src/test/java/io/druid/query/topn/TopNBinaryFnTest.java +++ b/processing/src/test/java/io/druid/query/topn/TopNBinaryFnTest.java @@ -129,15 +129,13 @@ public class TopNBinaryFnTest ImmutableMap.of( "testdim", "1", "rows", 3L, - "index", 5L, - "addrowsindexconstant", 9.0 + "index", 5L ), ImmutableMap.of( "testdim", "2", "rows", 4L, - "index", 4L, - "addrowsindexconstant", 9.0 + "index", 4L ) ) ) @@ -214,14 +212,12 @@ public class TopNBinaryFnTest ImmutableMap.of( "testdim", "1", "rows", 3L, - "index", 5L, - "addrowsindexconstant", 9.0 + "index", 5L ), ImmutableMap.of( "testdim", "2", "rows", 4L, - "index", 4L, - "addrowsindexconstant", 9.0 + "index", 4L ) ) ) @@ -427,15 +423,12 @@ public class TopNBinaryFnTest ImmutableMap.of( "testdim", "1", "rows", 3L, - "index", 5L, - "addrowsindexconstant", 9.0 + "index", 5L ), ImmutableMap.of( "testdim", "2", "rows", 4L, - "index", 4L, - "addrowsindexconstant", 9.0 - ) + "index", 4L ) ) ) );