From 68e8cda2dacf4949861d4e902f535570cb3c69c9 Mon Sep 17 00:00:00 2001 From: nishantmonu51 Date: Sat, 19 Apr 2014 04:24:07 +0530 Subject: [PATCH 1/4] fix post aggregator --- .../query/FinalizeResultsQueryRunner.java | 29 +++++--- .../java/io/druid/query/QueryToolChest.java | 10 ++- .../groupby/GroupByQueryQueryToolChest.java | 2 +- .../SegmentMetadataQueryQueryToolChest.java | 2 +- .../search/SearchQueryQueryToolChest.java | 2 +- .../select/SelectQueryQueryToolChest.java | 2 +- .../TimeBoundaryQueryQueryToolChest.java | 2 +- .../TimeseriesQueryQueryToolChest.java | 28 ++++++-- .../io/druid/query/topn/TopNBinaryFn.java | 1 - .../query/topn/TopNQueryQueryToolChest.java | 63 ++++++++++++++++- .../io/druid/query/QueryRunnerTestHelper.java | 7 ++ .../query/aggregation/AggregatorUtilTest.java | 69 +++++++++++++++++++ .../druid/query/topn/TopNQueryRunnerTest.java | 15 +++- .../druid/client/CachingClusteredClient.java | 2 +- .../io/druid/client/DirectDruidClient.java | 2 +- .../client/CachingClusteredClientTest.java | 12 ++-- .../coordination/ServerManagerTest.java | 2 +- 17 files changed, 220 insertions(+), 30 deletions(-) diff --git a/processing/src/main/java/io/druid/query/FinalizeResultsQueryRunner.java b/processing/src/main/java/io/druid/query/FinalizeResultsQueryRunner.java index dee05888470..0eb925f3327 100644 --- a/processing/src/main/java/io/druid/query/FinalizeResultsQueryRunner.java +++ b/processing/src/main/java/io/druid/query/FinalizeResultsQueryRunner.java @@ -50,12 +50,12 @@ public class FinalizeResultsQueryRunner implements QueryRunner { final boolean isBySegment = query.getContextBySegment(false); final boolean shouldFinalize = query.getContextFinalize(true); + Function finalizerFn; if (shouldFinalize) { - Function finalizerFn; if (isBySegment) { finalizerFn = new Function() { - final Function baseFinalizer = toolChest.makeMetricManipulatorFn( + final Function baseFinalizer = toolChest.makePostComputeManipulatorFn( query, new MetricManipulationFn() { @@ -85,7 +85,7 @@ public class FinalizeResultsQueryRunner implements QueryRunner } }; } else { - finalizerFn = toolChest.makeMetricManipulatorFn( + finalizerFn = toolChest.makePostComputeManipulatorFn( query, new MetricManipulationFn() { @@ -97,12 +97,25 @@ public class FinalizeResultsQueryRunner implements QueryRunner } ); } - - return Sequences.map( - baseRunner.run(query.withOverriddenContext(ImmutableMap.of("finalize", false))), - finalizerFn + } else { + // finalize is false here. + finalizerFn = toolChest.makePostComputeManipulatorFn( + query, + new MetricManipulationFn() + { + @Override + public Object manipulate(AggregatorFactory factory, Object object) + { + return object; + } + } ); } - return baseRunner.run(query); + + return Sequences.map( + baseRunner.run(query.withOverriddenContext(ImmutableMap.of("finalize", false))), + finalizerFn + ); + } } diff --git a/processing/src/main/java/io/druid/query/QueryToolChest.java b/processing/src/main/java/io/druid/query/QueryToolChest.java index 304d3e1eb14..8299ecaad0a 100644 --- a/processing/src/main/java/io/druid/query/QueryToolChest.java +++ b/processing/src/main/java/io/druid/query/QueryToolChest.java @@ -44,8 +44,16 @@ public abstract class QueryToolChest mergeSequences(Sequence> seqOfSequences); + public abstract ServiceMetricEvent.Builder makeMetricBuilder(QueryType query); - public abstract Function makeMetricManipulatorFn(QueryType query, MetricManipulationFn fn); + + public abstract Function makePreComputeManipulatorFn(QueryType query, MetricManipulationFn fn); + + public Function makePostComputeManipulatorFn(QueryType query, MetricManipulationFn fn) + { + return makePreComputeManipulatorFn(query, fn); + } + public abstract TypeReference getResultTypeReference(); public CacheStrategy getCacheStrategy(QueryType query) { diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java index 1b77f2299ba..ab70b34db05 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -173,7 +173,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest makeMetricManipulatorFn(final GroupByQuery query, final MetricManipulationFn fn) + public Function makePreComputeManipulatorFn(final GroupByQuery query, final MetricManipulationFn fn) { return new Function() { diff --git a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java index 0bc9f22b1dd..98eb896476d 100644 --- a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java @@ -155,7 +155,7 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest makeMetricManipulatorFn( + public Function makePreComputeManipulatorFn( SegmentMetadataQuery query, MetricManipulationFn fn ) { diff --git a/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java index f559829d593..a8429ad2726 100644 --- a/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java @@ -129,7 +129,7 @@ public class SearchQueryQueryToolChest extends QueryToolChest, Result> makeMetricManipulatorFn( + public Function, Result> makePreComputeManipulatorFn( SearchQuery query, MetricManipulationFn fn ) { diff --git a/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java index 03bccfdeed5..f3bbe028ed8 100644 --- a/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java @@ -131,7 +131,7 @@ public class SelectQueryQueryToolChest extends QueryToolChest, Result> makeMetricManipulatorFn( + public Function, Result> makePreComputeManipulatorFn( final SelectQuery query, final MetricManipulationFn fn ) { diff --git a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java index 95715bb7511..7186b1a6e38 100644 --- a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java @@ -123,7 +123,7 @@ public class TimeBoundaryQueryQueryToolChest } @Override - public Function, Result> makeMetricManipulatorFn( + public Function, Result> makePreComputeManipulatorFn( TimeBoundaryQuery query, MetricManipulationFn fn ) { diff --git a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java index 72db385e8dc..51a0c0ebbe0 100644 --- a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java @@ -131,9 +131,16 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest, Result> makeMetricManipulatorFn( + public Function, Result> makePreComputeManipulatorFn( final TimeseriesQuery query, final MetricManipulationFn fn ) + { + return makeComputeManipulatorFn(query, fn, false); + } + + private Function, Result> makeComputeManipulatorFn( + final TimeseriesQuery query, final MetricManipulationFn fn, final boolean calculatePostAggs + ) { return new Function, Result>() { @@ -142,12 +149,15 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest values = Maps.newHashMap(); final TimeseriesResultValue holder = result.getValue(); + if (calculatePostAggs) { + for (PostAggregator postAgg : query.getPostAggregatorSpecs()) { + values.put(postAgg.getName(), postAgg.compute(holder.getBaseObject())); + } + } for (AggregatorFactory agg : query.getAggregatorSpecs()) { values.put(agg.getName(), fn.manipulate(agg, holder.getMetric(agg.getName()))); } - for (PostAggregator postAgg : query.getPostAggregatorSpecs()) { - values.put(postAgg.getName(), postAgg.compute(values)); - } + return new Result( result.getTimestamp(), new TimeseriesResultValue(values) @@ -262,4 +272,14 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest, Result> makePostComputeManipulatorFn( + TimeseriesQuery query, MetricManipulationFn fn + ) + { + return makeComputeManipulatorFn(query, fn, true); + } + + } diff --git a/processing/src/main/java/io/druid/query/topn/TopNBinaryFn.java b/processing/src/main/java/io/druid/query/topn/TopNBinaryFn.java index c6ab513c726..18d7fa0d833 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNBinaryFn.java +++ b/processing/src/main/java/io/druid/query/topn/TopNBinaryFn.java @@ -90,7 +90,6 @@ public class TopNBinaryFn implements BinaryFn, Result retVals = new LinkedHashMap(); String dimension = dimensionSpec.getOutputName(); - String topNMetricName = topNMetricSpec.getMetricName(dimensionSpec); for (DimensionAndMetricValueExtractor arg1Val : arg1Vals) { retVals.put(arg1Val.getStringDimensionValue(dimension), arg1Val); } diff --git a/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java index 14fd5f5c41f..376574733c8 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java @@ -46,6 +46,7 @@ import io.druid.query.Result; import io.druid.query.ResultGranularTimestampComparator; import io.druid.query.ResultMergeQueryRunner; import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.AggregatorUtil; import io.druid.query.aggregation.MetricManipulationFn; import io.druid.query.aggregation.PostAggregator; import io.druid.query.filter.DimFilter; @@ -139,7 +140,7 @@ public class TopNQueryQueryToolChest extends QueryToolChest, Result> makeMetricManipulatorFn( + public Function, Result> makePreComputeManipulatorFn( final TopNQuery query, final MetricManipulationFn fn ) { @@ -162,7 +163,7 @@ public class TopNQueryQueryToolChest extends QueryToolChest, Result> makePostComputeManipulatorFn( + final TopNQuery query, final MetricManipulationFn fn + ) + { + return new Function, Result>() + { + private String dimension = query.getDimensionSpec().getOutputName(); + + @Override + public Result apply(@Nullable Result result) + { + List> serializedValues = Lists.newArrayList( + Iterables.transform( + result.getValue(), + new Function>() + { + @Override + public Map apply(@Nullable DimensionAndMetricValueExtractor input) + { + final Map values = Maps.newHashMap(); + // compute all post aggs + for (PostAggregator postAgg : query.getPostAggregatorSpecs()) { + Object calculatedPostAgg = input.getMetric(postAgg.getName()); + if (calculatedPostAgg != null) { + values.put(postAgg.getName(), calculatedPostAgg); + } else { + values.put(postAgg.getName(), postAgg.compute(input.getBaseObject())); + } + } + for (AggregatorFactory agg : query.getAggregatorSpecs()) { + values.put(agg.getName(), fn.manipulate(agg, input.getMetric(agg.getName()))); + } + + values.put(dimension, input.getDimensionValue(dimension)); + + return values; + } + } + ) + ); + + return new Result( + result.getTimestamp(), + new TopNResultValue(serializedValues) + ); + } + }; + } + @Override public TypeReference> getResultTypeReference() { @@ -405,4 +456,12 @@ public class TopNQueryQueryToolChest extends QueryToolChest prunePostAggregators(TopNQuery query) + { + return AggregatorUtil.pruneDependentPostAgg( + query.getPostAggregatorSpecs(), + query.getTopNMetricSpec().getMetricName(query.getDimensionSpec()) + ); + } } diff --git a/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java b/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java index fe0e5af8c66..44799c8ea2f 100644 --- a/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java +++ b/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java @@ -25,6 +25,7 @@ import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.query.aggregation.DoubleSumAggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory; +import io.druid.query.aggregation.hyperloglog.HyperUniqueFinalizingPostAggregator; import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; import io.druid.query.aggregation.post.ArithmeticPostAggregator; import io.druid.query.aggregation.post.ConstantPostAggregator; @@ -86,6 +87,12 @@ public class QueryRunnerTestHelper ) ); + public static final String hyperUniqueFinalizingPostAggMetric = "hyperUniqueFinalizingPostAggMetric"; + public static ArithmeticPostAggregator hyperUniqueFinalizingPostAgg = new ArithmeticPostAggregator( + hyperUniqueFinalizingPostAggMetric, + "+", + Lists.newArrayList(new HyperUniqueFinalizingPostAggregator(uniqueMetric), new ConstantPostAggregator(null, 1, 1)) + ); public static final List commonAggregators = Arrays.asList( rowsCount, diff --git a/processing/src/test/java/io/druid/query/aggregation/AggregatorUtilTest.java b/processing/src/test/java/io/druid/query/aggregation/AggregatorUtilTest.java index 90e1f449a96..08c51f9438b 100644 --- a/processing/src/test/java/io/druid/query/aggregation/AggregatorUtilTest.java +++ b/processing/src/test/java/io/druid/query/aggregation/AggregatorUtilTest.java @@ -145,4 +145,73 @@ public class AggregatorUtilTest } + @Test + public void testNullPostAggregatorNames() + { + AggregatorFactory agg1 = new DoubleSumAggregatorFactory("agg1", "value"); + AggregatorFactory agg2 = new DoubleSumAggregatorFactory("agg2", "count"); + PostAggregator postAgg1 = new ArithmeticPostAggregator( + null, "*", Lists.newArrayList( + new FieldAccessPostAggregator( + null, + "agg1" + ), new FieldAccessPostAggregator(null, "agg2") + ) + ); + + PostAggregator postAgg2 = new ArithmeticPostAggregator( + "postAgg", + "/", + Lists.newArrayList( + new FieldAccessPostAggregator( + null, + "agg1" + ), new FieldAccessPostAggregator(null, "agg2") + ) + ); + + Assert.assertEquals( + new Pair(Lists.newArrayList(agg1, agg2), Lists.newArrayList(postAgg2)), AggregatorUtil.condensedAggregators( + Lists.newArrayList(agg1, agg2), + Lists.newArrayList(postAgg1, postAgg2), + "postAgg" + ) + ); + + } + + @Test + public void testCasing() + { + AggregatorFactory agg1 = new DoubleSumAggregatorFactory("Agg1", "value"); + AggregatorFactory agg2 = new DoubleSumAggregatorFactory("Agg2", "count"); + PostAggregator postAgg1 = new ArithmeticPostAggregator( + null, "*", Lists.newArrayList( + new FieldAccessPostAggregator( + null, + "Agg1" + ), new FieldAccessPostAggregator(null, "Agg2") + ) + ); + + PostAggregator postAgg2 = new ArithmeticPostAggregator( + "postAgg", + "/", + Lists.newArrayList( + new FieldAccessPostAggregator( + null, + "Agg1" + ), new FieldAccessPostAggregator(null, "Agg2") + ) + ); + + Assert.assertEquals( + new Pair(Lists.newArrayList(agg1, agg2), Lists.newArrayList(postAgg2)), AggregatorUtil.condensedAggregators( + Lists.newArrayList(agg1, agg2), + Lists.newArrayList(postAgg1, postAgg2), + "postAgg" + ) + ); + } + } diff --git a/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerTest.java b/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerTest.java index e5a540f1e3e..540b5f2b5c5 100644 --- a/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerTest.java @@ -1200,7 +1200,8 @@ public class TopNQueryRunnerTest .postAggregators( Arrays.asList( QueryRunnerTestHelper.addRowsIndexConstant, - QueryRunnerTestHelper.dependentPostAgg + QueryRunnerTestHelper.dependentPostAgg, + QueryRunnerTestHelper.hyperUniqueFinalizingPostAgg ) ) .build(); @@ -1219,6 +1220,10 @@ public class TopNQueryRunnerTest .put("uniques", QueryRunnerTestHelper.UNIQUES_2) .put("maxIndex", 1743.9217529296875D) .put("minIndex", 792.3260498046875D) + .put( + QueryRunnerTestHelper.hyperUniqueFinalizingPostAggMetric, + QueryRunnerTestHelper.UNIQUES_2 + 1.0 + ) .build(), ImmutableMap.builder() .put(providerDimension, "upfront") @@ -1229,6 +1234,10 @@ public class TopNQueryRunnerTest .put("uniques", QueryRunnerTestHelper.UNIQUES_2) .put("maxIndex", 1870.06103515625D) .put("minIndex", 545.9906005859375D) + .put( + QueryRunnerTestHelper.hyperUniqueFinalizingPostAggMetric, + QueryRunnerTestHelper.UNIQUES_2 + 1.0 + ) .build(), ImmutableMap.builder() .put(providerDimension, "spot") @@ -1237,6 +1246,10 @@ public class TopNQueryRunnerTest .put("addRowsIndexConstant", 96444.57232284546D) .put(QueryRunnerTestHelper.dependentPostAggMetric, 97282.57232284546D) .put("uniques", QueryRunnerTestHelper.UNIQUES_9) + .put( + QueryRunnerTestHelper.hyperUniqueFinalizingPostAggMetric, + QueryRunnerTestHelper.UNIQUES_9 + 1.0 + ) .put("maxIndex", 277.2735290527344D) .put("minIndex", 59.02102279663086D) .build() diff --git a/server/src/main/java/io/druid/client/CachingClusteredClient.java b/server/src/main/java/io/druid/client/CachingClusteredClient.java index c1efc018fa5..da1857bea47 100644 --- a/server/src/main/java/io/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/io/druid/client/CachingClusteredClient.java @@ -357,7 +357,7 @@ public class CachingClusteredClient implements QueryRunner } } ), - toolChest.makeMetricManipulatorFn( + toolChest.makePreComputeManipulatorFn( rewrittenQuery, new MetricManipulationFn() { diff --git a/server/src/main/java/io/druid/client/DirectDruidClient.java b/server/src/main/java/io/druid/client/DirectDruidClient.java index 76c842029b8..0e21f5da793 100644 --- a/server/src/main/java/io/druid/client/DirectDruidClient.java +++ b/server/src/main/java/io/druid/client/DirectDruidClient.java @@ -218,7 +218,7 @@ public class DirectDruidClient implements QueryRunner if (!isBySegment) { retVal = Sequences.map( retVal, - toolChest.makeMetricManipulatorFn( + toolChest.makePreComputeManipulatorFn( query, new MetricManipulationFn() { @Override diff --git a/server/src/test/java/io/druid/client/CachingClusteredClientTest.java b/server/src/test/java/io/druid/client/CachingClusteredClientTest.java index a2e26f3a532..964e87b8b52 100644 --- a/server/src/test/java/io/druid/client/CachingClusteredClientTest.java +++ b/server/src/test/java/io/druid/client/CachingClusteredClientTest.java @@ -116,7 +116,8 @@ import java.util.concurrent.Executor; @RunWith(Parameterized.class) public class CachingClusteredClientTest { - public static final ImmutableMap CONTEXT = ImmutableMap.of(); + public static final ImmutableMap CONTEXT = ImmutableMap.of("finalize", false); + public static final MultipleIntervalSegmentSpec SEG_SPEC = new MultipleIntervalSegmentSpec(ImmutableList.of()); public static final String DATA_SOURCE = "test"; protected static final DefaultObjectMapper jsonMapper = new DefaultObjectMapper(new SmileFactory()); @@ -321,10 +322,11 @@ public class CachingClusteredClientTest .filters(DIM_FILTER) .granularity(GRANULARITY) .aggregators(AGGS) - .postAggregators(POST_AGGS); - + .postAggregators(POST_AGGS) + .context(CONTEXT); + QueryRunner runner = new FinalizeResultsQueryRunner(client, new TimeseriesQueryQueryToolChest(new QueryConfig())); testQueryCaching( - client, + runner, 1, true, builder.context( @@ -343,7 +345,7 @@ public class CachingClusteredClientTest cache.close("0_0"); testQueryCaching( - client, + runner, 1, false, builder.context( diff --git a/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java b/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java index 223c720c7d8..431fdce8318 100644 --- a/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java +++ b/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java @@ -578,7 +578,7 @@ public class ServerManagerTest } @Override - public Function makeMetricManipulatorFn(QueryType query, MetricManipulationFn fn) + public Function makePreComputeManipulatorFn(QueryType query, MetricManipulationFn fn) { return Functions.identity(); } From 597e55ab50b27d4bd97689378d121cba87d56791 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Fri, 18 Apr 2014 16:26:21 -0700 Subject: [PATCH 2/4] don't override context when not finalizing --- .../io/druid/query/FinalizeResultsQueryRunner.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/processing/src/main/java/io/druid/query/FinalizeResultsQueryRunner.java b/processing/src/main/java/io/druid/query/FinalizeResultsQueryRunner.java index 0eb925f3327..57f76b20966 100644 --- a/processing/src/main/java/io/druid/query/FinalizeResultsQueryRunner.java +++ b/processing/src/main/java/io/druid/query/FinalizeResultsQueryRunner.java @@ -50,8 +50,13 @@ public class FinalizeResultsQueryRunner implements QueryRunner { final boolean isBySegment = query.getContextBySegment(false); final boolean shouldFinalize = query.getContextFinalize(true); - Function finalizerFn; + + final Query queryToRun; + final Function finalizerFn; + if (shouldFinalize) { + queryToRun = query.withOverriddenContext(ImmutableMap.of("finalize", false)); + if (isBySegment) { finalizerFn = new Function() { @@ -99,6 +104,7 @@ public class FinalizeResultsQueryRunner implements QueryRunner } } else { // finalize is false here. + queryToRun = query; finalizerFn = toolChest.makePostComputeManipulatorFn( query, new MetricManipulationFn() @@ -113,7 +119,7 @@ public class FinalizeResultsQueryRunner implements QueryRunner } return Sequences.map( - baseRunner.run(query.withOverriddenContext(ImmutableMap.of("finalize", false))), + baseRunner.run(queryToRun), finalizerFn ); From a0c8d9d413b48aa581fe0bf9f17e5a7f9e667b9a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Fri, 18 Apr 2014 16:45:06 -0700 Subject: [PATCH 3/4] restore previous behavior of not optimizing Timeseries post-aggregators --- .../TimeseriesQueryQueryToolChest.java | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java index 51a0c0ebbe0..661f83d3a4f 100644 --- a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java @@ -135,11 +135,11 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest, Result> makeComputeManipulatorFn( - final TimeseriesQuery query, final MetricManipulationFn fn, final boolean calculatePostAggs + private Function, Result> makeManipulatorFn( + final TimeseriesQuery query, final MetricManipulationFn fn ) { return new Function, Result>() @@ -149,14 +149,12 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest values = Maps.newHashMap(); final TimeseriesResultValue holder = result.getValue(); - if (calculatePostAggs) { - for (PostAggregator postAgg : query.getPostAggregatorSpecs()) { - values.put(postAgg.getName(), postAgg.compute(holder.getBaseObject())); - } - } for (AggregatorFactory agg : query.getAggregatorSpecs()) { values.put(agg.getName(), fn.manipulate(agg, holder.getMetric(agg.getName()))); } + for (PostAggregator postAgg : query.getPostAggregatorSpecs()) { + values.put(postAgg.getName(), postAgg.compute(values)); + } return new Result( result.getTimestamp(), @@ -278,7 +276,7 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest Date: Fri, 18 Apr 2014 17:41:47 -0700 Subject: [PATCH 4/4] clean up code --- .../query/FinalizeResultsQueryRunner.java | 32 ++------- .../FinalizeMetricManipulationFn.java | 31 ++++++++ .../IdentityMetricManipulationFn.java | 31 ++++++++ .../TimeseriesQueryQueryToolChest.java | 70 +++++++++---------- .../query/topn/TopNQueryQueryToolChest.java | 15 ++-- .../io/druid/client/DirectDruidClient.java | 22 +++--- 6 files changed, 120 insertions(+), 81 deletions(-) create mode 100644 processing/src/main/java/io/druid/query/aggregation/FinalizeMetricManipulationFn.java create mode 100644 processing/src/main/java/io/druid/query/aggregation/IdentityMetricManipulationFn.java diff --git a/processing/src/main/java/io/druid/query/FinalizeResultsQueryRunner.java b/processing/src/main/java/io/druid/query/FinalizeResultsQueryRunner.java index 57f76b20966..83edcf0a991 100644 --- a/processing/src/main/java/io/druid/query/FinalizeResultsQueryRunner.java +++ b/processing/src/main/java/io/druid/query/FinalizeResultsQueryRunner.java @@ -25,6 +25,8 @@ import com.google.common.collect.Lists; import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.FinalizeMetricManipulationFn; +import io.druid.query.aggregation.IdentityMetricManipulationFn; import io.druid.query.aggregation.MetricManipulationFn; import javax.annotation.Nullable; @@ -62,14 +64,7 @@ public class FinalizeResultsQueryRunner implements QueryRunner { final Function baseFinalizer = toolChest.makePostComputeManipulatorFn( query, - new MetricManipulationFn() - { - @Override - public Object manipulate(AggregatorFactory factory, Object object) - { - return factory.finalizeComputation(factory.deserialize(object)); - } - } + new FinalizeMetricManipulationFn() ); @Override @@ -90,31 +85,14 @@ public class FinalizeResultsQueryRunner implements QueryRunner } }; } else { - finalizerFn = toolChest.makePostComputeManipulatorFn( - query, - new MetricManipulationFn() - { - @Override - public Object manipulate(AggregatorFactory factory, Object object) - { - return factory.finalizeComputation(object); - } - } - ); + finalizerFn = toolChest.makePostComputeManipulatorFn(query, new FinalizeMetricManipulationFn()); } } else { // finalize is false here. queryToRun = query; finalizerFn = toolChest.makePostComputeManipulatorFn( query, - new MetricManipulationFn() - { - @Override - public Object manipulate(AggregatorFactory factory, Object object) - { - return object; - } - } + new IdentityMetricManipulationFn() ); } diff --git a/processing/src/main/java/io/druid/query/aggregation/FinalizeMetricManipulationFn.java b/processing/src/main/java/io/druid/query/aggregation/FinalizeMetricManipulationFn.java new file mode 100644 index 00000000000..e532421e572 --- /dev/null +++ b/processing/src/main/java/io/druid/query/aggregation/FinalizeMetricManipulationFn.java @@ -0,0 +1,31 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.query.aggregation; + +/** + */ +public class FinalizeMetricManipulationFn implements MetricManipulationFn +{ + @Override + public Object manipulate(AggregatorFactory factory, Object object) + { + return factory.finalizeComputation(object); + } +} diff --git a/processing/src/main/java/io/druid/query/aggregation/IdentityMetricManipulationFn.java b/processing/src/main/java/io/druid/query/aggregation/IdentityMetricManipulationFn.java new file mode 100644 index 00000000000..6b99838700a --- /dev/null +++ b/processing/src/main/java/io/druid/query/aggregation/IdentityMetricManipulationFn.java @@ -0,0 +1,31 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.query.aggregation; + +/** + */ +public class IdentityMetricManipulationFn implements MetricManipulationFn +{ + @Override + public Object manipulate(AggregatorFactory factory, Object object) + { + return object; + } +} diff --git a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java index 661f83d3a4f..6fe55153a59 100644 --- a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java @@ -130,40 +130,6 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest, Result> makePreComputeManipulatorFn( - final TimeseriesQuery query, final MetricManipulationFn fn - ) - { - return makeManipulatorFn(query, fn); - } - - private Function, Result> makeManipulatorFn( - final TimeseriesQuery query, final MetricManipulationFn fn - ) - { - return new Function, Result>() - { - @Override - public Result apply(Result result) - { - final Map values = Maps.newHashMap(); - final TimeseriesResultValue holder = result.getValue(); - for (AggregatorFactory agg : query.getAggregatorSpecs()) { - values.put(agg.getName(), fn.manipulate(agg, holder.getMetric(agg.getName()))); - } - for (PostAggregator postAgg : query.getPostAggregatorSpecs()) { - values.put(postAgg.getName(), postAgg.compute(values)); - } - - return new Result( - result.getTimestamp(), - new TimeseriesResultValue(values) - ); - } - }; - } - @Override public TypeReference> getResultTypeReference() { @@ -271,13 +237,47 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest, Result> makePreComputeManipulatorFn( + final TimeseriesQuery query, final MetricManipulationFn fn + ) + { + return makeComputeManipulatorFn(query, fn, false); + } + @Override public Function, Result> makePostComputeManipulatorFn( TimeseriesQuery query, MetricManipulationFn fn ) { - return makeManipulatorFn(query, fn); + return makeComputeManipulatorFn(query, fn, true); } + private Function, Result> makeComputeManipulatorFn( + final TimeseriesQuery query, final MetricManipulationFn fn, final boolean calculatePostAggs + ) + { + return new Function, Result>() + { + @Override + public Result apply(Result result) + { + final Map values = Maps.newHashMap(); + final TimeseriesResultValue holder = result.getValue(); + if (calculatePostAggs) { + for (PostAggregator postAgg : query.getPostAggregatorSpecs()) { + values.put(postAgg.getName(), postAgg.compute(holder.getBaseObject())); + } + } + for (AggregatorFactory agg : query.getAggregatorSpecs()) { + values.put(agg.getName(), fn.manipulate(agg, holder.getMetric(agg.getName()))); + } + return new Result( + result.getTimestamp(), + new TimeseriesResultValue(values) + ); + } + }; + } } diff --git a/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java index 376574733c8..f290fc29e8a 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java @@ -149,7 +149,7 @@ public class TopNQueryQueryToolChest extends QueryToolChest apply(@Nullable Result result) + public Result apply(Result result) { List> serializedValues = Lists.newArrayList( Iterables.transform( @@ -157,7 +157,7 @@ public class TopNQueryQueryToolChest extends QueryToolChest>() { @Override - public Map apply(@Nullable DimensionAndMetricValueExtractor input) + public Map apply(DimensionAndMetricValueExtractor input) { final Map values = Maps.newHashMap(); for (AggregatorFactory agg : query.getAggregatorSpecs()) { @@ -197,7 +197,7 @@ public class TopNQueryQueryToolChest extends QueryToolChest apply(@Nullable Result result) + public Result apply(Result result) { List> serializedValues = Lists.newArrayList( Iterables.transform( @@ -205,7 +205,7 @@ public class TopNQueryQueryToolChest extends QueryToolChest>() { @Override - public Map apply(@Nullable DimensionAndMetricValueExtractor input) + public Map apply(DimensionAndMetricValueExtractor input) { final Map values = Maps.newHashMap(); // compute all post aggs @@ -249,7 +249,6 @@ public class TopNQueryQueryToolChest extends QueryToolChest, Object, TopNQuery>() { private final List aggs = query.getAggregatorSpecs(); - private final List postAggs = query.getPostAggregatorSpecs(); @Override public byte[] computeCacheKey(TopNQuery query) @@ -289,7 +288,7 @@ public class TopNQueryQueryToolChest extends QueryToolChest, Object>() { @Override - public Object apply(@Nullable final Result input) + public Object apply(final Result input) { List results = Lists.newArrayList(input.getValue()); final List retVal = Lists.newArrayListWithCapacity(results.size() + 1); @@ -317,7 +316,7 @@ public class TopNQueryQueryToolChest extends QueryToolChest apply(@Nullable Object input) + public Result apply(Object input) { List results = (List) input; List> retVal = Lists.newArrayListWithCapacity(results.size()); @@ -418,7 +417,7 @@ public class TopNQueryQueryToolChest extends QueryToolChest, Result>() { @Override - public Result apply(@Nullable Result input) + public Result apply(Result input) { return new Result( input.getTimestamp(), diff --git a/server/src/main/java/io/druid/client/DirectDruidClient.java b/server/src/main/java/io/druid/client/DirectDruidClient.java index 0e21f5da793..1194acf8ce1 100644 --- a/server/src/main/java/io/druid/client/DirectDruidClient.java +++ b/server/src/main/java/io/druid/client/DirectDruidClient.java @@ -122,8 +122,7 @@ public class DirectDruidClient implements QueryRunner final JavaType typeRef; if (isBySegment) { typeRef = types.rhs; - } - else { + } else { typeRef = types.lhs; } @@ -219,14 +218,15 @@ public class DirectDruidClient implements QueryRunner retVal = Sequences.map( retVal, toolChest.makePreComputeManipulatorFn( - query, new MetricManipulationFn() - { - @Override - public Object manipulate(AggregatorFactory factory, Object object) - { - return factory.deserialize(object); - } - } + query, + new MetricManipulationFn() + { + @Override + public Object manipulate(AggregatorFactory factory, Object object) + { + return factory.deserialize(object); + } + } ) ); } @@ -313,7 +313,7 @@ public class DirectDruidClient implements QueryRunner @Override public void close() throws IOException { - if(jp != null) { + if (jp != null) { jp.close(); } }