diff --git a/processing/src/main/java/io/druid/query/FinalizeResultsQueryRunner.java b/processing/src/main/java/io/druid/query/FinalizeResultsQueryRunner.java index dee05888470..0eb925f3327 100644 --- a/processing/src/main/java/io/druid/query/FinalizeResultsQueryRunner.java +++ b/processing/src/main/java/io/druid/query/FinalizeResultsQueryRunner.java @@ -50,12 +50,12 @@ public class FinalizeResultsQueryRunner implements QueryRunner { final boolean isBySegment = query.getContextBySegment(false); final boolean shouldFinalize = query.getContextFinalize(true); + Function finalizerFn; if (shouldFinalize) { - Function finalizerFn; if (isBySegment) { finalizerFn = new Function() { - final Function baseFinalizer = toolChest.makeMetricManipulatorFn( + final Function baseFinalizer = toolChest.makePostComputeManipulatorFn( query, new MetricManipulationFn() { @@ -85,7 +85,7 @@ public class FinalizeResultsQueryRunner implements QueryRunner } }; } else { - finalizerFn = toolChest.makeMetricManipulatorFn( + finalizerFn = toolChest.makePostComputeManipulatorFn( query, new MetricManipulationFn() { @@ -97,12 +97,25 @@ public class FinalizeResultsQueryRunner implements QueryRunner } ); } - - return Sequences.map( - baseRunner.run(query.withOverriddenContext(ImmutableMap.of("finalize", false))), - finalizerFn + } else { + // finalize is false here. + finalizerFn = toolChest.makePostComputeManipulatorFn( + query, + new MetricManipulationFn() + { + @Override + public Object manipulate(AggregatorFactory factory, Object object) + { + return object; + } + } ); } - return baseRunner.run(query); + + return Sequences.map( + baseRunner.run(query.withOverriddenContext(ImmutableMap.of("finalize", false))), + finalizerFn + ); + } } diff --git a/processing/src/main/java/io/druid/query/QueryToolChest.java b/processing/src/main/java/io/druid/query/QueryToolChest.java index 304d3e1eb14..8299ecaad0a 100644 --- a/processing/src/main/java/io/druid/query/QueryToolChest.java +++ b/processing/src/main/java/io/druid/query/QueryToolChest.java @@ -44,8 +44,16 @@ public abstract class QueryToolChest mergeSequences(Sequence> seqOfSequences); + public abstract ServiceMetricEvent.Builder makeMetricBuilder(QueryType query); - public abstract Function makeMetricManipulatorFn(QueryType query, MetricManipulationFn fn); + + public abstract Function makePreComputeManipulatorFn(QueryType query, MetricManipulationFn fn); + + public Function makePostComputeManipulatorFn(QueryType query, MetricManipulationFn fn) + { + return makePreComputeManipulatorFn(query, fn); + } + public abstract TypeReference getResultTypeReference(); public CacheStrategy getCacheStrategy(QueryType query) { diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java index 1b77f2299ba..ab70b34db05 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -173,7 +173,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest makeMetricManipulatorFn(final GroupByQuery query, final MetricManipulationFn fn) + public Function makePreComputeManipulatorFn(final GroupByQuery query, final MetricManipulationFn fn) { return new Function() { diff --git a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java index 0bc9f22b1dd..98eb896476d 100644 --- a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java @@ -155,7 +155,7 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest makeMetricManipulatorFn( + public Function makePreComputeManipulatorFn( SegmentMetadataQuery query, MetricManipulationFn fn ) { diff --git a/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java index f559829d593..a8429ad2726 100644 --- a/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java @@ -129,7 +129,7 @@ public class SearchQueryQueryToolChest extends QueryToolChest, Result> makeMetricManipulatorFn( + public Function, Result> makePreComputeManipulatorFn( SearchQuery query, MetricManipulationFn fn ) { diff --git a/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java index 03bccfdeed5..f3bbe028ed8 100644 --- a/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java @@ -131,7 +131,7 @@ public class SelectQueryQueryToolChest extends QueryToolChest, Result> makeMetricManipulatorFn( + public Function, Result> makePreComputeManipulatorFn( final SelectQuery query, final MetricManipulationFn fn ) { diff --git a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java index 95715bb7511..7186b1a6e38 100644 --- a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java @@ -123,7 +123,7 @@ public class TimeBoundaryQueryQueryToolChest } @Override - public Function, Result> makeMetricManipulatorFn( + public Function, Result> makePreComputeManipulatorFn( TimeBoundaryQuery query, MetricManipulationFn fn ) { diff --git a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java index 72db385e8dc..51a0c0ebbe0 100644 --- a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java @@ -131,9 +131,16 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest, Result> makeMetricManipulatorFn( + public Function, Result> makePreComputeManipulatorFn( final TimeseriesQuery query, final MetricManipulationFn fn ) + { + return makeComputeManipulatorFn(query, fn, false); + } + + private Function, Result> makeComputeManipulatorFn( + final TimeseriesQuery query, final MetricManipulationFn fn, final boolean calculatePostAggs + ) { return new Function, Result>() { @@ -142,12 +149,15 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest values = Maps.newHashMap(); final TimeseriesResultValue holder = result.getValue(); + if (calculatePostAggs) { + for (PostAggregator postAgg : query.getPostAggregatorSpecs()) { + values.put(postAgg.getName(), postAgg.compute(holder.getBaseObject())); + } + } for (AggregatorFactory agg : query.getAggregatorSpecs()) { values.put(agg.getName(), fn.manipulate(agg, holder.getMetric(agg.getName()))); } - for (PostAggregator postAgg : query.getPostAggregatorSpecs()) { - values.put(postAgg.getName(), postAgg.compute(values)); - } + return new Result( result.getTimestamp(), new TimeseriesResultValue(values) @@ -262,4 +272,14 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest, Result> makePostComputeManipulatorFn( + TimeseriesQuery query, MetricManipulationFn fn + ) + { + return makeComputeManipulatorFn(query, fn, true); + } + + } diff --git a/processing/src/main/java/io/druid/query/topn/TopNBinaryFn.java b/processing/src/main/java/io/druid/query/topn/TopNBinaryFn.java index c6ab513c726..18d7fa0d833 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNBinaryFn.java +++ b/processing/src/main/java/io/druid/query/topn/TopNBinaryFn.java @@ -90,7 +90,6 @@ public class TopNBinaryFn implements BinaryFn, Result retVals = new LinkedHashMap(); String dimension = dimensionSpec.getOutputName(); - String topNMetricName = topNMetricSpec.getMetricName(dimensionSpec); for (DimensionAndMetricValueExtractor arg1Val : arg1Vals) { retVals.put(arg1Val.getStringDimensionValue(dimension), arg1Val); } diff --git a/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java index 14fd5f5c41f..376574733c8 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java @@ -46,6 +46,7 @@ import io.druid.query.Result; import io.druid.query.ResultGranularTimestampComparator; import io.druid.query.ResultMergeQueryRunner; import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.AggregatorUtil; import io.druid.query.aggregation.MetricManipulationFn; import io.druid.query.aggregation.PostAggregator; import io.druid.query.filter.DimFilter; @@ -139,7 +140,7 @@ public class TopNQueryQueryToolChest extends QueryToolChest, Result> makeMetricManipulatorFn( + public Function, Result> makePreComputeManipulatorFn( final TopNQuery query, final MetricManipulationFn fn ) { @@ -162,7 +163,7 @@ public class TopNQueryQueryToolChest extends QueryToolChest, Result> makePostComputeManipulatorFn( + final TopNQuery query, final MetricManipulationFn fn + ) + { + return new Function, Result>() + { + private String dimension = query.getDimensionSpec().getOutputName(); + + @Override + public Result apply(@Nullable Result result) + { + List> serializedValues = Lists.newArrayList( + Iterables.transform( + result.getValue(), + new Function>() + { + @Override + public Map apply(@Nullable DimensionAndMetricValueExtractor input) + { + final Map values = Maps.newHashMap(); + // compute all post aggs + for (PostAggregator postAgg : query.getPostAggregatorSpecs()) { + Object calculatedPostAgg = input.getMetric(postAgg.getName()); + if (calculatedPostAgg != null) { + values.put(postAgg.getName(), calculatedPostAgg); + } else { + values.put(postAgg.getName(), postAgg.compute(input.getBaseObject())); + } + } + for (AggregatorFactory agg : query.getAggregatorSpecs()) { + values.put(agg.getName(), fn.manipulate(agg, input.getMetric(agg.getName()))); + } + + values.put(dimension, input.getDimensionValue(dimension)); + + return values; + } + } + ) + ); + + return new Result( + result.getTimestamp(), + new TopNResultValue(serializedValues) + ); + } + }; + } + @Override public TypeReference> getResultTypeReference() { @@ -405,4 +456,12 @@ public class TopNQueryQueryToolChest extends QueryToolChest prunePostAggregators(TopNQuery query) + { + return AggregatorUtil.pruneDependentPostAgg( + query.getPostAggregatorSpecs(), + query.getTopNMetricSpec().getMetricName(query.getDimensionSpec()) + ); + } } diff --git a/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java b/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java index fe0e5af8c66..44799c8ea2f 100644 --- a/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java +++ b/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java @@ -25,6 +25,7 @@ import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.query.aggregation.DoubleSumAggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory; +import io.druid.query.aggregation.hyperloglog.HyperUniqueFinalizingPostAggregator; import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; import io.druid.query.aggregation.post.ArithmeticPostAggregator; import io.druid.query.aggregation.post.ConstantPostAggregator; @@ -86,6 +87,12 @@ public class QueryRunnerTestHelper ) ); + public static final String hyperUniqueFinalizingPostAggMetric = "hyperUniqueFinalizingPostAggMetric"; + public static ArithmeticPostAggregator hyperUniqueFinalizingPostAgg = new ArithmeticPostAggregator( + hyperUniqueFinalizingPostAggMetric, + "+", + Lists.newArrayList(new HyperUniqueFinalizingPostAggregator(uniqueMetric), new ConstantPostAggregator(null, 1, 1)) + ); public static final List commonAggregators = Arrays.asList( rowsCount, diff --git a/processing/src/test/java/io/druid/query/aggregation/AggregatorUtilTest.java b/processing/src/test/java/io/druid/query/aggregation/AggregatorUtilTest.java index 90e1f449a96..08c51f9438b 100644 --- a/processing/src/test/java/io/druid/query/aggregation/AggregatorUtilTest.java +++ b/processing/src/test/java/io/druid/query/aggregation/AggregatorUtilTest.java @@ -145,4 +145,73 @@ public class AggregatorUtilTest } + @Test + public void testNullPostAggregatorNames() + { + AggregatorFactory agg1 = new DoubleSumAggregatorFactory("agg1", "value"); + AggregatorFactory agg2 = new DoubleSumAggregatorFactory("agg2", "count"); + PostAggregator postAgg1 = new ArithmeticPostAggregator( + null, "*", Lists.newArrayList( + new FieldAccessPostAggregator( + null, + "agg1" + ), new FieldAccessPostAggregator(null, "agg2") + ) + ); + + PostAggregator postAgg2 = new ArithmeticPostAggregator( + "postAgg", + "/", + Lists.newArrayList( + new FieldAccessPostAggregator( + null, + "agg1" + ), new FieldAccessPostAggregator(null, "agg2") + ) + ); + + Assert.assertEquals( + new Pair(Lists.newArrayList(agg1, agg2), Lists.newArrayList(postAgg2)), AggregatorUtil.condensedAggregators( + Lists.newArrayList(agg1, agg2), + Lists.newArrayList(postAgg1, postAgg2), + "postAgg" + ) + ); + + } + + @Test + public void testCasing() + { + AggregatorFactory agg1 = new DoubleSumAggregatorFactory("Agg1", "value"); + AggregatorFactory agg2 = new DoubleSumAggregatorFactory("Agg2", "count"); + PostAggregator postAgg1 = new ArithmeticPostAggregator( + null, "*", Lists.newArrayList( + new FieldAccessPostAggregator( + null, + "Agg1" + ), new FieldAccessPostAggregator(null, "Agg2") + ) + ); + + PostAggregator postAgg2 = new ArithmeticPostAggregator( + "postAgg", + "/", + Lists.newArrayList( + new FieldAccessPostAggregator( + null, + "Agg1" + ), new FieldAccessPostAggregator(null, "Agg2") + ) + ); + + Assert.assertEquals( + new Pair(Lists.newArrayList(agg1, agg2), Lists.newArrayList(postAgg2)), AggregatorUtil.condensedAggregators( + Lists.newArrayList(agg1, agg2), + Lists.newArrayList(postAgg1, postAgg2), + "postAgg" + ) + ); + } + } diff --git a/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerTest.java b/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerTest.java index e5a540f1e3e..540b5f2b5c5 100644 --- a/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerTest.java @@ -1200,7 +1200,8 @@ public class TopNQueryRunnerTest .postAggregators( Arrays.asList( QueryRunnerTestHelper.addRowsIndexConstant, - QueryRunnerTestHelper.dependentPostAgg + QueryRunnerTestHelper.dependentPostAgg, + QueryRunnerTestHelper.hyperUniqueFinalizingPostAgg ) ) .build(); @@ -1219,6 +1220,10 @@ public class TopNQueryRunnerTest .put("uniques", QueryRunnerTestHelper.UNIQUES_2) .put("maxIndex", 1743.9217529296875D) .put("minIndex", 792.3260498046875D) + .put( + QueryRunnerTestHelper.hyperUniqueFinalizingPostAggMetric, + QueryRunnerTestHelper.UNIQUES_2 + 1.0 + ) .build(), ImmutableMap.builder() .put(providerDimension, "upfront") @@ -1229,6 +1234,10 @@ public class TopNQueryRunnerTest .put("uniques", QueryRunnerTestHelper.UNIQUES_2) .put("maxIndex", 1870.06103515625D) .put("minIndex", 545.9906005859375D) + .put( + QueryRunnerTestHelper.hyperUniqueFinalizingPostAggMetric, + QueryRunnerTestHelper.UNIQUES_2 + 1.0 + ) .build(), ImmutableMap.builder() .put(providerDimension, "spot") @@ -1237,6 +1246,10 @@ public class TopNQueryRunnerTest .put("addRowsIndexConstant", 96444.57232284546D) .put(QueryRunnerTestHelper.dependentPostAggMetric, 97282.57232284546D) .put("uniques", QueryRunnerTestHelper.UNIQUES_9) + .put( + QueryRunnerTestHelper.hyperUniqueFinalizingPostAggMetric, + QueryRunnerTestHelper.UNIQUES_9 + 1.0 + ) .put("maxIndex", 277.2735290527344D) .put("minIndex", 59.02102279663086D) .build() diff --git a/server/src/main/java/io/druid/client/CachingClusteredClient.java b/server/src/main/java/io/druid/client/CachingClusteredClient.java index c1efc018fa5..da1857bea47 100644 --- a/server/src/main/java/io/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/io/druid/client/CachingClusteredClient.java @@ -357,7 +357,7 @@ public class CachingClusteredClient implements QueryRunner } } ), - toolChest.makeMetricManipulatorFn( + toolChest.makePreComputeManipulatorFn( rewrittenQuery, new MetricManipulationFn() { diff --git a/server/src/main/java/io/druid/client/DirectDruidClient.java b/server/src/main/java/io/druid/client/DirectDruidClient.java index 76c842029b8..0e21f5da793 100644 --- a/server/src/main/java/io/druid/client/DirectDruidClient.java +++ b/server/src/main/java/io/druid/client/DirectDruidClient.java @@ -218,7 +218,7 @@ public class DirectDruidClient implements QueryRunner if (!isBySegment) { retVal = Sequences.map( retVal, - toolChest.makeMetricManipulatorFn( + toolChest.makePreComputeManipulatorFn( query, new MetricManipulationFn() { @Override diff --git a/server/src/test/java/io/druid/client/CachingClusteredClientTest.java b/server/src/test/java/io/druid/client/CachingClusteredClientTest.java index a2e26f3a532..964e87b8b52 100644 --- a/server/src/test/java/io/druid/client/CachingClusteredClientTest.java +++ b/server/src/test/java/io/druid/client/CachingClusteredClientTest.java @@ -116,7 +116,8 @@ import java.util.concurrent.Executor; @RunWith(Parameterized.class) public class CachingClusteredClientTest { - public static final ImmutableMap CONTEXT = ImmutableMap.of(); + public static final ImmutableMap CONTEXT = ImmutableMap.of("finalize", false); + public static final MultipleIntervalSegmentSpec SEG_SPEC = new MultipleIntervalSegmentSpec(ImmutableList.of()); public static final String DATA_SOURCE = "test"; protected static final DefaultObjectMapper jsonMapper = new DefaultObjectMapper(new SmileFactory()); @@ -321,10 +322,11 @@ public class CachingClusteredClientTest .filters(DIM_FILTER) .granularity(GRANULARITY) .aggregators(AGGS) - .postAggregators(POST_AGGS); - + .postAggregators(POST_AGGS) + .context(CONTEXT); + QueryRunner runner = new FinalizeResultsQueryRunner(client, new TimeseriesQueryQueryToolChest(new QueryConfig())); testQueryCaching( - client, + runner, 1, true, builder.context( @@ -343,7 +345,7 @@ public class CachingClusteredClientTest cache.close("0_0"); testQueryCaching( - client, + runner, 1, false, builder.context( diff --git a/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java b/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java index 223c720c7d8..431fdce8318 100644 --- a/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java +++ b/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java @@ -578,7 +578,7 @@ public class ServerManagerTest } @Override - public Function makeMetricManipulatorFn(QueryType query, MetricManipulationFn fn) + public Function makePreComputeManipulatorFn(QueryType query, MetricManipulationFn fn) { return Functions.identity(); }