From db35009acdc1b35ca411462c798e3b3230c7ddcc Mon Sep 17 00:00:00 2001 From: nishantmonu51 Date: Mon, 7 Apr 2014 23:17:25 +0530 Subject: [PATCH 01/15] Compute postage only when required --- .../query/topn/InvertedTopNMetricSpec.java | 19 ++- .../topn/LexicographicTopNMetricSpec.java | 6 + .../query/topn/NumericTopNMetricSpec.java | 6 + .../io/druid/query/topn/TopNBinaryFn.java | 17 ++- .../topn/TopNLexicographicResultBuilder.java | 3 - .../io/druid/query/topn/TopNMetricSpec.java | 2 + .../query/topn/TopNNumericResultBuilder.java | 7 +- .../query/topn/TopNQueryQueryToolChest.java | 11 +- .../query/topn/TopNBinaryFnBenchmark.java | 135 ++++++++++++++++++ .../io/druid/query/topn/TopNBinaryFnTest.java | 19 +-- 10 files changed, 191 insertions(+), 34 deletions(-) create mode 100644 processing/src/test/java/io/druid/query/topn/TopNBinaryFnBenchmark.java diff --git a/processing/src/main/java/io/druid/query/topn/InvertedTopNMetricSpec.java b/processing/src/main/java/io/druid/query/topn/InvertedTopNMetricSpec.java index e5a7fdfedde..ab5caa60a81 100644 --- a/processing/src/main/java/io/druid/query/topn/InvertedTopNMetricSpec.java +++ b/processing/src/main/java/io/druid/query/topn/InvertedTopNMetricSpec.java @@ -36,7 +36,6 @@ import java.util.List; public class InvertedTopNMetricSpec implements TopNMetricSpec { private static final byte CACHE_TYPE_ID = 0x3; - private final TopNMetricSpec delegate; @JsonCreator @@ -102,15 +101,27 @@ public class InvertedTopNMetricSpec implements TopNMetricSpec delegate.initTopNAlgorithmSelector(selector); } + @Override + public String getMetricName(DimensionSpec dimSpec) + { + return delegate.getMetricName(dimSpec); + } + @Override public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } InvertedTopNMetricSpec that = (InvertedTopNMetricSpec) o; - if (delegate != null ? !delegate.equals(that.delegate) : that.delegate != null) return false; + if (delegate != null ? !delegate.equals(that.delegate) : that.delegate != null) { + return false; + } return true; } diff --git a/processing/src/main/java/io/druid/query/topn/LexicographicTopNMetricSpec.java b/processing/src/main/java/io/druid/query/topn/LexicographicTopNMetricSpec.java index 0f2c8344b14..5c1b9ab41e8 100644 --- a/processing/src/main/java/io/druid/query/topn/LexicographicTopNMetricSpec.java +++ b/processing/src/main/java/io/druid/query/topn/LexicographicTopNMetricSpec.java @@ -111,6 +111,12 @@ public class LexicographicTopNMetricSpec implements TopNMetricSpec selector.setAggregateAllMetrics(true); } + @Override + public String getMetricName(DimensionSpec dimSpec) + { + return dimSpec.getOutputName(); + } + @Override public String toString() { diff --git a/processing/src/main/java/io/druid/query/topn/NumericTopNMetricSpec.java b/processing/src/main/java/io/druid/query/topn/NumericTopNMetricSpec.java index 76f1a9341ac..fe5bce241a6 100644 --- a/processing/src/main/java/io/druid/query/topn/NumericTopNMetricSpec.java +++ b/processing/src/main/java/io/druid/query/topn/NumericTopNMetricSpec.java @@ -150,6 +150,12 @@ public class NumericTopNMetricSpec implements TopNMetricSpec selector.setAggregateTopNMetricFirst(true); } + @Override + public String getMetricName(DimensionSpec dimSpec) + { + return metric; + } + @Override public String toString() { diff --git a/processing/src/main/java/io/druid/query/topn/TopNBinaryFn.java b/processing/src/main/java/io/druid/query/topn/TopNBinaryFn.java index 437c28f640f..f848f3c9e06 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNBinaryFn.java +++ b/processing/src/main/java/io/druid/query/topn/TopNBinaryFn.java @@ -40,7 +40,7 @@ public class TopNBinaryFn implements BinaryFn, Result aggregations; @@ -65,7 +65,7 @@ public class TopNBinaryFn implements BinaryFn, Result, Result retVals = new LinkedHashMap(); TopNResultValue arg1Vals = arg1.getValue(); TopNResultValue arg2Vals = arg2.getValue(); + Map retVals = new LinkedHashMap(); + String dimension = dimensionSpec.getOutputName(); + String topNMetricName = topNMetricSpec.getMetricName(dimensionSpec); for (DimensionAndMetricValueExtractor arg1Val : arg1Vals) { retVals.put(arg1Val.getStringDimensionValue(dimension), arg1Val); } @@ -92,16 +94,17 @@ public class TopNBinaryFn implements BinaryFn, Result retVal = new LinkedHashMap(); + Map retVal = new LinkedHashMap(aggregations.size() + 2); retVal.put(dimension, dimensionValue); for (AggregatorFactory factory : aggregations) { final String metricName = factory.getName(); retVal.put(metricName, factory.combine(arg1Val.getMetric(metricName), arg2Val.getMetric(metricName))); } - - for (PostAggregator pf : postAggregations) { - retVal.put(pf.getName(), pf.compute(retVal)); + for (PostAggregator postAgg : postAggregations) { + if (postAgg.getName().equals(topNMetricName)) { + retVal.put(postAgg.getName(), postAgg.compute(retVal)); + } } retVals.put(dimensionValue, new DimensionAndMetricValueExtractor(retVal)); diff --git a/processing/src/main/java/io/druid/query/topn/TopNLexicographicResultBuilder.java b/processing/src/main/java/io/druid/query/topn/TopNLexicographicResultBuilder.java index 37360dfb1cd..b12f4c85b51 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNLexicographicResultBuilder.java +++ b/processing/src/main/java/io/druid/query/topn/TopNLexicographicResultBuilder.java @@ -75,9 +75,6 @@ public class TopNLexicographicResultBuilder implements TopNResultBuilder for (Object metricVal : metricVals) { metricValues.put(aggsIter.next().getName(), metricVal); } - for (PostAggregator postAgg : postAggs) { - metricValues.put(postAgg.getName(), postAgg.compute(metricValues)); - } pQueue.add(new DimValHolder.Builder().withDirName(dimName).withMetricValues(metricValues).build()); } diff --git a/processing/src/main/java/io/druid/query/topn/TopNMetricSpec.java b/processing/src/main/java/io/druid/query/topn/TopNMetricSpec.java index c2baf13e3eb..44103f93708 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNMetricSpec.java +++ b/processing/src/main/java/io/druid/query/topn/TopNMetricSpec.java @@ -55,4 +55,6 @@ public interface TopNMetricSpec public TopNMetricSpecBuilder configureOptimizer(TopNMetricSpecBuilder builder); public void initTopNAlgorithmSelector(TopNAlgorithmSelector selector); + + public String getMetricName(DimensionSpec dimSpec); } diff --git a/processing/src/main/java/io/druid/query/topn/TopNNumericResultBuilder.java b/processing/src/main/java/io/druid/query/topn/TopNNumericResultBuilder.java index 9f6479baee4..a806c7966ec 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNNumericResultBuilder.java +++ b/processing/src/main/java/io/druid/query/topn/TopNNumericResultBuilder.java @@ -40,7 +40,6 @@ public class TopNNumericResultBuilder implements TopNResultBuilder private final DateTime timestamp; private final DimensionSpec dimSpec; private final String metricName; - private MinMaxPriorityQueue pQueue = null; public TopNNumericResultBuilder( @@ -75,8 +74,12 @@ public class TopNNumericResultBuilder implements TopNResultBuilder for (Object metricVal : metricVals) { metricValues.put(aggFactoryIter.next().getName(), metricVal); } + for (PostAggregator postAgg : postAggs) { - metricValues.put(postAgg.getName(), postAgg.compute(metricValues)); + if (postAgg.getName().equals(metricName)) { + metricValues.put(postAgg.getName(), postAgg.compute(metricValues)); + break; + } } Object topNMetricVal = metricValues.get(metricName); diff --git a/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java index a7d77fde396..0c78f6305fe 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java @@ -161,7 +161,12 @@ public class TopNQueryQueryToolChest extends QueryToolChest result1; + Result result2; + TopNBinaryFn fn; + + public static void main(String[] args) throws Exception + { + Runner.main(TopNBinaryFnBenchmark.class, args); + } + + @Override + protected void setUp() throws Exception + { + + final ConstantPostAggregator constant = new ConstantPostAggregator("const", 1L, null); + final FieldAccessPostAggregator rowsPostAgg = new FieldAccessPostAggregator("rows", "rows"); + final FieldAccessPostAggregator indexPostAgg = new FieldAccessPostAggregator("index", "index"); + + + final List aggregatorFactories = new ArrayList<>(); + aggregatorFactories.add(new CountAggregatorFactory("rows")); + aggregatorFactories.add(new LongSumAggregatorFactory("index", "index")); + for (int i = 1; i < aggCount; i++) { + aggregatorFactories.add(new CountAggregatorFactory("rows" + i)); + } + final List postAggregators = new ArrayList<>(); + for (int i = 0; i < postAggCount; i++) { + postAggregators.add( + new ArithmeticPostAggregator( + "addrowsindexconstant" + i, + "+", + Lists.newArrayList(constant, rowsPostAgg, indexPostAgg) + ) + ); + } + final DateTime currTime = new DateTime(); + List> list = new ArrayList<>(); + for (int i = 0; i < threshold; i++) { + Map res = new HashMap<>(); + res.put("testdim", "" + i); + res.put("rows", 1L); + for (int j = 0; j < aggCount; j++) { + res.put("rows" + j, 1L); + } + res.put("index", 1L); + list.add(res); + } + result1 = new Result<>( + currTime, + new TopNResultValue(list) + ); + + List> list2 = new ArrayList<>(); + for (int i = 0; i < threshold; i++) { + Map res = new HashMap<>(); + res.put("testdim", "" + i); + res.put("rows", 2L); + for (int j = 0; j < aggCount; j++) { + res.put("rows" + j, 2L); + } + res.put("index", 2L); + list2.add(res); + } + result2 = new Result<>( + currTime, + new TopNResultValue(list2) + ); + fn = new TopNBinaryFn( + TopNResultMerger.identity, + QueryGranularity.ALL, + new DefaultDimensionSpec("testdim", null), + new NumericTopNMetricSpec("index"), + 100, + aggregatorFactories, + postAggregators + ); + } + + public void timeMerge(int nReps) + { + for (int i = 0; i < nReps; i++) { + fn.apply(result1, result2); + } + } + +} diff --git a/processing/src/test/java/io/druid/query/topn/TopNBinaryFnTest.java b/processing/src/test/java/io/druid/query/topn/TopNBinaryFnTest.java index 0eba9778c4e..cb3089d6397 100644 --- a/processing/src/test/java/io/druid/query/topn/TopNBinaryFnTest.java +++ b/processing/src/test/java/io/druid/query/topn/TopNBinaryFnTest.java @@ -129,15 +129,13 @@ public class TopNBinaryFnTest ImmutableMap.of( "testdim", "1", "rows", 3L, - "index", 5L, - "addrowsindexconstant", 9.0 + "index", 5L ), ImmutableMap.of( "testdim", "2", "rows", 4L, - "index", 4L, - "addrowsindexconstant", 9.0 + "index", 4L ) ) ) @@ -214,14 +212,12 @@ public class TopNBinaryFnTest ImmutableMap.of( "testdim", "1", "rows", 3L, - "index", 5L, - "addrowsindexconstant", 9.0 + "index", 5L ), ImmutableMap.of( "testdim", "2", "rows", 4L, - "index", 4L, - "addrowsindexconstant", 9.0 + "index", 4L ) ) ) @@ -427,15 +423,12 @@ public class TopNBinaryFnTest ImmutableMap.of( "testdim", "1", "rows", 3L, - "index", 5L, - "addrowsindexconstant", 9.0 + "index", 5L ), ImmutableMap.of( "testdim", "2", "rows", 4L, - "index", 4L, - "addrowsindexconstant", 9.0 - ) + "index", 4L ) ) ) ); From 4bb36dd453c490db3ee51ede40c8301fb441dd22 Mon Sep 17 00:00:00 2001 From: nishantmonu51 Date: Tue, 8 Apr 2014 02:37:57 +0530 Subject: [PATCH 02/15] fix tests --- .../query/topn/TopNQueryQueryToolChest.java | 22 +- .../client/CachingClusteredClientTest.java | 217 ++++++++++-------- 2 files changed, 131 insertions(+), 108 deletions(-) diff --git a/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java index 0c78f6305fe..8eb77874bf4 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java @@ -64,11 +64,13 @@ import java.util.Map; public class TopNQueryQueryToolChest extends QueryToolChest, TopNQuery> { private static final byte TOPN_QUERY = 0x1; - private static final Joiner COMMA_JOIN = Joiner.on(","); - private static final TypeReference> TYPE_REFERENCE = new TypeReference>(){}; - - private static final TypeReference OBJECT_TYPE_REFERENCE = new TypeReference(){}; + private static final TypeReference> TYPE_REFERENCE = new TypeReference>() + { + }; + private static final TypeReference OBJECT_TYPE_REFERENCE = new TypeReference() + { + }; private final TopNQueryConfig config; @Inject @@ -163,7 +165,7 @@ public class TopNQueryQueryToolChest extends QueryToolChest> getOrdering() + { + return Ordering.natural(); + } + private static class ThresholdAdjustingQueryRunner implements QueryRunner> { private final QueryRunner> runner; @@ -398,9 +405,4 @@ public class TopNQueryQueryToolChest extends QueryToolChest> getOrdering() - { - return Ordering.natural(); - } } diff --git a/server/src/test/java/io/druid/client/CachingClusteredClientTest.java b/server/src/test/java/io/druid/client/CachingClusteredClientTest.java index cfea29f9a8e..07bec13f16b 100644 --- a/server/src/test/java/io/druid/client/CachingClusteredClientTest.java +++ b/server/src/test/java/io/druid/client/CachingClusteredClientTest.java @@ -47,6 +47,7 @@ import io.druid.jackson.DefaultObjectMapper; import io.druid.query.BySegmentResultValueClass; import io.druid.query.DataSource; import io.druid.query.Druids; +import io.druid.query.FinalizeResultsQueryRunner; import io.druid.query.MapQueryToolChestWarehouse; import io.druid.query.Query; import io.druid.query.QueryConfig; @@ -115,17 +116,21 @@ import java.util.concurrent.Executor; @RunWith(Parameterized.class) public class CachingClusteredClientTest { + public static final ImmutableMap CONTEXT = ImmutableMap.of(); + public static final MultipleIntervalSegmentSpec SEG_SPEC = new MultipleIntervalSegmentSpec(ImmutableList.of()); + public static final String DATA_SOURCE = "test"; + protected static final DefaultObjectMapper jsonMapper = new DefaultObjectMapper(new SmileFactory()); + + static { + jsonMapper.getFactory().setCodec(jsonMapper); + } + /** * We want a deterministic test, but we'd also like a bit of randomness for the distribution of segments * across servers. Thus, we loop multiple times and each time use a deterministically created Random instance. * Increase this value to increase exposure to random situations at the expense of test run time. */ private static final int RANDOMNESS = 10; - - public static final ImmutableMap CONTEXT = ImmutableMap.of(); - public static final MultipleIntervalSegmentSpec SEG_SPEC = new MultipleIntervalSegmentSpec(ImmutableList.of()); - public static final String DATA_SOURCE = "test"; - private static final List AGGS = Arrays.asList( new CountAggregatorFactory("rows"), new LongSumAggregatorFactory("imps", "imps"), @@ -152,6 +157,17 @@ public class CachingClusteredClientTest private static final DateTimeZone TIMEZONE = DateTimeZone.forID("America/Los_Angeles"); private static final QueryGranularity PT1H_TZ_GRANULARITY = new PeriodGranularity(new Period("PT1H"), null, TIMEZONE); private static final String TOP_DIM = "a_dim"; + private final Random random; + protected VersionedIntervalTimeline timeline; + protected TimelineServerView serverView; + protected Cache cache; + public CachingClusteredClient client; + DruidServer[] servers; + + public CachingClusteredClientTest(int randomSeed) + { + this.random = new Random(randomSeed); + } @Parameterized.Parameters public static Collection constructorFeeder() throws IOException @@ -169,28 +185,6 @@ public class CachingClusteredClientTest ); } - - protected static final DefaultObjectMapper jsonMapper = new DefaultObjectMapper(new SmileFactory()); - - static { - jsonMapper.getFactory().setCodec(jsonMapper); - } - - private final Random random; - - protected VersionedIntervalTimeline timeline; - protected TimelineServerView serverView; - protected Cache cache; - - CachingClusteredClient client; - - DruidServer[] servers; - - public CachingClusteredClientTest(int randomSeed) - { - this.random = new Random(randomSeed); - } - @Before public void setUp() throws Exception { @@ -214,15 +208,16 @@ public class CachingClusteredClientTest public void testTimeseriesCaching() throws Exception { final Druids.TimeseriesQueryBuilder builder = Druids.newTimeseriesQueryBuilder() - .dataSource(DATA_SOURCE) - .intervals(SEG_SPEC) - .filters(DIM_FILTER) - .granularity(GRANULARITY) - .aggregators(AGGS) - .postAggregators(POST_AGGS) - .context(CONTEXT); + .dataSource(DATA_SOURCE) + .intervals(SEG_SPEC) + .filters(DIM_FILTER) + .granularity(GRANULARITY) + .aggregators(AGGS) + .postAggregators(POST_AGGS) + .context(CONTEXT); testQueryCaching( + client, builder.build(), new Interval("2011-01-01/2011-01-02"), makeTimeResults(new DateTime("2011-01-01"), 50, 5000), new Interval("2011-01-02/2011-01-03"), makeTimeResults(new DateTime("2011-01-02"), 30, 6000), @@ -265,9 +260,9 @@ public class CachingClusteredClientTest ), client.run( builder.intervals("2011-01-01/2011-01-10") - .aggregators(RENAMED_AGGS) - .postAggregators(RENAMED_POST_AGGS) - .build() + .aggregators(RENAMED_AGGS) + .postAggregators(RENAMED_POST_AGGS) + .build() ) ); } @@ -277,15 +272,16 @@ public class CachingClusteredClientTest public void testTimeseriesCachingTimeZone() throws Exception { final Druids.TimeseriesQueryBuilder builder = Druids.newTimeseriesQueryBuilder() - .dataSource(DATA_SOURCE) - .intervals(SEG_SPEC) - .filters(DIM_FILTER) - .granularity(PT1H_TZ_GRANULARITY) - .aggregators(AGGS) - .postAggregators(POST_AGGS) - .context(CONTEXT); + .dataSource(DATA_SOURCE) + .intervals(SEG_SPEC) + .filters(DIM_FILTER) + .granularity(PT1H_TZ_GRANULARITY) + .aggregators(AGGS) + .postAggregators(POST_AGGS) + .context(CONTEXT); testQueryCaching( + client, builder.build(), new Interval("2011-11-04/2011-11-08"), makeTimeResults( @@ -305,9 +301,9 @@ public class CachingClusteredClientTest ), client.run( builder.intervals("2011-11-04/2011-11-08") - .aggregators(RENAMED_AGGS) - .postAggregators(RENAMED_POST_AGGS) - .build() + .aggregators(RENAMED_AGGS) + .postAggregators(RENAMED_POST_AGGS) + .build() ) ); } @@ -316,18 +312,23 @@ public class CachingClusteredClientTest public void testDisableUseCache() throws Exception { final Druids.TimeseriesQueryBuilder builder = Druids.newTimeseriesQueryBuilder() - .dataSource(DATA_SOURCE) - .intervals(SEG_SPEC) - .filters(DIM_FILTER) - .granularity(GRANULARITY) - .aggregators(AGGS) - .postAggregators(POST_AGGS); + .dataSource(DATA_SOURCE) + .intervals(SEG_SPEC) + .filters(DIM_FILTER) + .granularity(GRANULARITY) + .aggregators(AGGS) + .postAggregators(POST_AGGS); testQueryCaching( + client, 1, true, - builder.context(ImmutableMap.of("useCache", "false", - "populateCache", "true")).build(), + builder.context( + ImmutableMap.of( + "useCache", "false", + "populateCache", "true" + ) + ).build(), new Interval("2011-01-01/2011-01-02"), makeTimeResults(new DateTime("2011-01-01"), 50, 5000) ); @@ -338,10 +339,15 @@ public class CachingClusteredClientTest cache.close("0_0"); testQueryCaching( + client, 1, false, - builder.context(ImmutableMap.of("useCache", "false", - "populateCache", "false")).build(), + builder.context( + ImmutableMap.of( + "useCache", "false", + "populateCache", "false" + ) + ).build(), new Interval("2011-01-01/2011-01-02"), makeTimeResults(new DateTime("2011-01-01"), 50, 5000) ); @@ -350,10 +356,15 @@ public class CachingClusteredClientTest Assert.assertEquals(0, cache.getStats().getNumMisses()); testQueryCaching( + client, 1, false, - builder.context(ImmutableMap.of("useCache", "true", - "populateCache", "false")).build(), + builder.context( + ImmutableMap.of( + "useCache", "true", + "populateCache", "false" + ) + ).build(), new Interval("2011-01-01/2011-01-02"), makeTimeResults(new DateTime("2011-01-01"), 50, 5000) ); @@ -378,7 +389,10 @@ public class CachingClusteredClientTest .postAggregators(POST_AGGS) .context(CONTEXT); + QueryRunner runner = new FinalizeResultsQueryRunner(client, new TopNQueryQueryToolChest(new TopNQueryConfig())); + testQueryCaching( + runner, builder.build(), new Interval("2011-01-01/2011-01-02"), makeTopNResults(new DateTime("2011-01-01"), "a", 50, 5000, "b", 50, 4999, "c", 50, 4998), @@ -420,12 +434,12 @@ public class CachingClusteredClientTest new DateTime("2011-01-09"), "a", 50, 4985, "b", 50, 4984, "c", 50, 4983, new DateTime("2011-01-09T01"), "a", 50, 4985, "b", 50, 4984, "c", 50, 4983 ), - client.run( + runner.run( builder.intervals("2011-01-01/2011-01-10") - .metric("imps") - .aggregators(RENAMED_AGGS) - .postAggregators(RENAMED_POST_AGGS) - .build() + .metric("imps") + .aggregators(RENAMED_AGGS) + .postAggregators(RENAMED_POST_AGGS) + .build() ) ); } @@ -446,7 +460,10 @@ public class CachingClusteredClientTest .postAggregators(POST_AGGS) .context(CONTEXT); + QueryRunner runner = new FinalizeResultsQueryRunner(client, new TopNQueryQueryToolChest(new TopNQueryConfig())); + testQueryCaching( + runner, builder.build(), new Interval("2011-11-04/2011-11-08"), makeTopNResults( @@ -465,12 +482,12 @@ public class CachingClusteredClientTest new DateTime("2011-11-06", TIMEZONE), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989, new DateTime("2011-11-07", TIMEZONE), "a", 50, 4988, "b", 50, 4987, "c", 50, 4986 ), - client.run( + runner.run( builder.intervals("2011-11-04/2011-11-08") - .metric("imps") - .aggregators(RENAMED_AGGS) - .postAggregators(RENAMED_POST_AGGS) - .build() + .metric("imps") + .aggregators(RENAMED_AGGS) + .postAggregators(RENAMED_POST_AGGS) + .build() ) ); } @@ -491,7 +508,9 @@ public class CachingClusteredClientTest .postAggregators(POST_AGGS) .context(CONTEXT); + QueryRunner runner = new FinalizeResultsQueryRunner(client, new TopNQueryQueryToolChest(new TopNQueryConfig())); testQueryCaching( + runner, builder.build(), new Interval("2011-01-01/2011-01-02"), makeTopNResults(), @@ -518,6 +537,7 @@ public class CachingClusteredClientTest ) ); + TestHelper.assertExpectedResults( makeRenamedTopNResults( new DateTime("2011-01-05"), "a", 50, 4994, "b", 50, 4993, "c", 50, 4992, @@ -531,12 +551,12 @@ public class CachingClusteredClientTest new DateTime("2011-01-09"), "a", 50, 4985, "b", 50, 4984, "c", 50, 4983, new DateTime("2011-01-09T01"), "a", 50, 4985, "b", 50, 4984, "c", 50, 4983 ), - client.run( + runner.run( builder.intervals("2011-01-01/2011-01-10") - .metric("imps") - .aggregators(RENAMED_AGGS) - .postAggregators(RENAMED_POST_AGGS) - .build() + .metric("imps") + .aggregators(RENAMED_AGGS) + .postAggregators(RENAMED_POST_AGGS) + .build() ) ); } @@ -545,6 +565,7 @@ public class CachingClusteredClientTest public void testSearchCaching() throws Exception { testQueryCaching( + client, new SearchQuery( new TableDataSource(DATA_SOURCE), DIM_FILTER, @@ -582,13 +603,14 @@ public class CachingClusteredClientTest ); } - public void testQueryCaching(final Query query, Object... args) + public void testQueryCaching(QueryRunner runner, final Query query, Object... args) { - testQueryCaching(3, true, query, args); + testQueryCaching(runner, 3, true, query, args); } @SuppressWarnings("unchecked") public void testQueryCaching( + final QueryRunner runner, final int numTimesToQuery, boolean expectBySegment, final Query query, Object... args // does this assume query intervals must be ordered? @@ -638,8 +660,8 @@ public class CachingClusteredClientTest EasyMock.expect(serverView.getQueryRunner(server)) - .andReturn(expectations.getQueryRunner()) - .once(); + .andReturn(expectations.getQueryRunner()) + .once(); final Capture capture = new Capture(); queryCaptures.add(capture); @@ -656,8 +678,8 @@ public class CachingClusteredClientTest } EasyMock.expect(queryable.run(EasyMock.capture(capture))) - .andReturn(toQueryableTimeseriesResults(expectBySegment, segmentIds, intervals, results)) - .once(); + .andReturn(toQueryableTimeseriesResults(expectBySegment, segmentIds, intervals, results)) + .once(); } else if (query instanceof TopNQuery) { List segmentIds = Lists.newArrayList(); @@ -669,8 +691,8 @@ public class CachingClusteredClientTest results.add(expectation.getResults()); } EasyMock.expect(queryable.run(EasyMock.capture(capture))) - .andReturn(toQueryableTopNResults(segmentIds, intervals, results)) - .once(); + .andReturn(toQueryableTopNResults(segmentIds, intervals, results)) + .once(); } else if (query instanceof SearchQuery) { List segmentIds = Lists.newArrayList(); List intervals = Lists.newArrayList(); @@ -681,8 +703,8 @@ public class CachingClusteredClientTest results.add(expectation.getResults()); } EasyMock.expect(queryable.run(EasyMock.capture(capture))) - .andReturn(toQueryableSearchResults(segmentIds, intervals, results)) - .once(); + .andReturn(toQueryableSearchResults(segmentIds, intervals, results)) + .once(); } else if (query instanceof TimeBoundaryQuery) { List segmentIds = Lists.newArrayList(); List intervals = Lists.newArrayList(); @@ -693,8 +715,8 @@ public class CachingClusteredClientTest results.add(expectation.getResults()); } EasyMock.expect(queryable.run(EasyMock.capture(capture))) - .andReturn(toQueryableTimeBoundaryResults(segmentIds, intervals, results)) - .once(); + .andReturn(toQueryableTimeBoundaryResults(segmentIds, intervals, results)) + .once(); } else { throw new ISE("Unknown query type[%s]", query.getClass()); } @@ -742,7 +764,7 @@ public class CachingClusteredClientTest } ) ), - client.run( + runner.run( query.withQuerySegmentSpec( new MultipleIntervalSegmentSpec( Arrays.asList( @@ -766,7 +788,7 @@ public class CachingClusteredClientTest } else { Assert.assertTrue( capturedQuery.getContextValue("bySegment") == null || - capturedQuery.getContextValue("bySegment").equals("false") + capturedQuery.getContextValue("bySegment").equals("false") ); } } @@ -1160,13 +1182,13 @@ public class CachingClusteredClientTest return new CachingClusteredClient( new MapQueryToolChestWarehouse( ImmutableMap., QueryToolChest>builder() - .put( - TimeseriesQuery.class, - new TimeseriesQueryQueryToolChest(new QueryConfig()) - ) - .put(TopNQuery.class, new TopNQueryQueryToolChest(new TopNQueryConfig())) - .put(SearchQuery.class, new SearchQueryQueryToolChest(new SearchQueryConfig())) - .build() + .put( + TimeseriesQuery.class, + new TimeseriesQueryQueryToolChest(new QueryConfig()) + ) + .put(TopNQuery.class, new TopNQueryQueryToolChest(new TopNQueryConfig())) + .put(SearchQuery.class, new SearchQueryQueryToolChest(new SearchQueryConfig())) + .build() ), new TimelineServerView() { @@ -1241,6 +1263,8 @@ public class CachingClusteredClientTest private class MyDataSegment extends DataSegment { + private final DataSegment baseSegment = segment; + private MyDataSegment() { super( @@ -1256,8 +1280,6 @@ public class CachingClusteredClientTest ); } - private final DataSegment baseSegment = segment; - @Override @JsonProperty public String getDataSource() @@ -1358,7 +1380,6 @@ public class CachingClusteredClientTest { private final DruidServer server; private final QueryRunner queryRunner; - private final List expectations = Lists.newArrayList(); public ServerExpectations( From c322f44863fd34322b244e10e0759d71c1cbcdee Mon Sep 17 00:00:00 2001 From: nishantmonu51 Date: Tue, 8 Apr 2014 03:17:15 +0530 Subject: [PATCH 03/15] time series post Aggregation improved move post aggregation to finalise results --- .../query/timeseries/TimeseriesBinaryFn.java | 11 +----- .../timeseries/TimeseriesQueryEngine.java | 4 -- .../TimeseriesQueryQueryToolChest.java | 15 ++++---- .../timeseries/TimeseriesBinaryFnTest.java | 37 ++++--------------- .../client/CachingClusteredClientTest.java | 10 +++-- 5 files changed, 21 insertions(+), 56 deletions(-) diff --git a/processing/src/main/java/io/druid/query/timeseries/TimeseriesBinaryFn.java b/processing/src/main/java/io/druid/query/timeseries/TimeseriesBinaryFn.java index f8530ce334f..7cecb554a92 100644 --- a/processing/src/main/java/io/druid/query/timeseries/TimeseriesBinaryFn.java +++ b/processing/src/main/java/io/druid/query/timeseries/TimeseriesBinaryFn.java @@ -24,7 +24,6 @@ import io.druid.granularity.AllGranularity; import io.druid.granularity.QueryGranularity; import io.druid.query.Result; import io.druid.query.aggregation.AggregatorFactory; -import io.druid.query.aggregation.PostAggregator; import java.util.LinkedHashMap; import java.util.List; @@ -37,17 +36,14 @@ public class TimeseriesBinaryFn { private final QueryGranularity gran; private final List aggregations; - private final List postAggregations; public TimeseriesBinaryFn( QueryGranularity granularity, - List aggregations, - List postAggregations + List aggregations ) { this.gran = granularity; this.aggregations = aggregations; - this.postAggregations = postAggregations; } @Override @@ -71,11 +67,6 @@ public class TimeseriesBinaryFn retVal.put(metricName, factory.combine(arg1Val.getMetric(metricName), arg2Val.getMetric(metricName))); } - for (PostAggregator pf : postAggregations) { - final String metricName = pf.getName(); - retVal.put(metricName, pf.compute(retVal)); - } - return (gran instanceof AllGranularity) ? new Result( arg1.getTimestamp(), diff --git a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryEngine.java b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryEngine.java index b57105aa654..3062b66e5a7 100644 --- a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryEngine.java +++ b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryEngine.java @@ -74,10 +74,6 @@ public class TimeseriesQueryEngine bob.addMetric(aggregator); } - for (PostAggregator postAgg : postAggregatorSpecs) { - bob.addMetric(postAgg); - } - Result retVal = bob.build(); // cleanup diff --git a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java index b4752266944..b692123a5e0 100644 --- a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java @@ -101,8 +101,7 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest( result.getTimestamp(), @@ -169,7 +173,6 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest, Object, TimeseriesQuery>() { private final List aggs = query.getAggregatorSpecs(); - private final List postAggs = query.getPostAggregatorSpecs(); @Override public byte[] computeCacheKey(TimeseriesQuery query) @@ -238,10 +241,6 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest( timestamp, new TimeseriesResultValue(retVal) diff --git a/processing/src/test/java/io/druid/query/timeseries/TimeseriesBinaryFnTest.java b/processing/src/test/java/io/druid/query/timeseries/TimeseriesBinaryFnTest.java index e574cbd4b44..ff3266f303c 100644 --- a/processing/src/test/java/io/druid/query/timeseries/TimeseriesBinaryFnTest.java +++ b/processing/src/test/java/io/druid/query/timeseries/TimeseriesBinaryFnTest.java @@ -20,16 +20,11 @@ package io.druid.query.timeseries; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Lists; import io.druid.granularity.QueryGranularity; import io.druid.query.Result; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory; -import io.druid.query.aggregation.PostAggregator; -import io.druid.query.aggregation.post.ArithmeticPostAggregator; -import io.druid.query.aggregation.post.ConstantPostAggregator; -import io.druid.query.aggregation.post.FieldAccessPostAggregator; import junit.framework.Assert; import org.joda.time.DateTime; import org.junit.Test; @@ -43,21 +38,10 @@ public class TimeseriesBinaryFnTest { final CountAggregatorFactory rowsCount = new CountAggregatorFactory("rows"); final LongSumAggregatorFactory indexLongSum = new LongSumAggregatorFactory("index", "index"); - final ConstantPostAggregator constant = new ConstantPostAggregator("const", 1L, null); - final FieldAccessPostAggregator rowsPostAgg = new FieldAccessPostAggregator("rows", "rows"); - final FieldAccessPostAggregator indexPostAgg = new FieldAccessPostAggregator("index", "index"); - final ArithmeticPostAggregator addRowsIndexConstant = new ArithmeticPostAggregator( - "addRowsIndexConstant", - "+", - Lists.newArrayList(constant, rowsPostAgg, indexPostAgg) - ); final List aggregatorFactories = Arrays.asList( rowsCount, indexLongSum ); - final List postAggregators = Arrays.asList( - addRowsIndexConstant - ); final DateTime currTime = new DateTime(); @Test @@ -87,16 +71,14 @@ public class TimeseriesBinaryFnTest new TimeseriesResultValue( ImmutableMap.of( "rows", 3L, - "index", 5L, - "addRowsIndexConstant", 9.0 + "index", 5L ) ) ); Result actual = new TimeseriesBinaryFn( QueryGranularity.ALL, - aggregatorFactories, - postAggregators + aggregatorFactories ).apply( result1, result2 @@ -131,16 +113,14 @@ public class TimeseriesBinaryFnTest new TimeseriesResultValue( ImmutableMap.of( "rows", 3L, - "index", 5L, - "addRowsIndexConstant", 9.0 + "index", 5L ) ) ); Result actual = new TimeseriesBinaryFn( QueryGranularity.DAY, - aggregatorFactories, - postAggregators + aggregatorFactories ).apply( result1, result2 @@ -166,8 +146,7 @@ public class TimeseriesBinaryFnTest Result actual = new TimeseriesBinaryFn( QueryGranularity.ALL, - aggregatorFactories, - postAggregators + aggregatorFactories ).apply( result1, result2 @@ -202,16 +181,14 @@ public class TimeseriesBinaryFnTest new TimeseriesResultValue( ImmutableMap.of( "rows", 3L, - "index", 5L, - "addRowsIndexConstant", 9.0 + "index", 5L ) ) ); Result actual = new TimeseriesBinaryFn( QueryGranularity.ALL, - aggregatorFactories, - postAggregators + aggregatorFactories ).apply( result1, result2 diff --git a/server/src/test/java/io/druid/client/CachingClusteredClientTest.java b/server/src/test/java/io/druid/client/CachingClusteredClientTest.java index 07bec13f16b..710511c02a4 100644 --- a/server/src/test/java/io/druid/client/CachingClusteredClientTest.java +++ b/server/src/test/java/io/druid/client/CachingClusteredClientTest.java @@ -216,8 +216,9 @@ public class CachingClusteredClientTest .postAggregators(POST_AGGS) .context(CONTEXT); + QueryRunner runner = new FinalizeResultsQueryRunner(client, new TimeseriesQueryQueryToolChest(new QueryConfig())); testQueryCaching( - client, + runner, builder.build(), new Interval("2011-01-01/2011-01-02"), makeTimeResults(new DateTime("2011-01-01"), 50, 5000), new Interval("2011-01-02/2011-01-03"), makeTimeResults(new DateTime("2011-01-02"), 30, 6000), @@ -258,7 +259,7 @@ public class CachingClusteredClientTest new DateTime("2011-01-09"), 18, 521, new DateTime("2011-01-09T01"), 181, 52 ), - client.run( + runner.run( builder.intervals("2011-01-01/2011-01-10") .aggregators(RENAMED_AGGS) .postAggregators(RENAMED_POST_AGGS) @@ -279,9 +280,10 @@ public class CachingClusteredClientTest .aggregators(AGGS) .postAggregators(POST_AGGS) .context(CONTEXT); + QueryRunner runner = new FinalizeResultsQueryRunner(client, new TimeseriesQueryQueryToolChest(new QueryConfig())); testQueryCaching( - client, + runner, builder.build(), new Interval("2011-11-04/2011-11-08"), makeTimeResults( @@ -299,7 +301,7 @@ public class CachingClusteredClientTest new DateTime("2011-11-06", TIMEZONE), 23, 85312, new DateTime("2011-11-07", TIMEZONE), 85, 102 ), - client.run( + runner.run( builder.intervals("2011-11-04/2011-11-08") .aggregators(RENAMED_AGGS) .postAggregators(RENAMED_POST_AGGS) From ecfa6bd1b16a9e2a5a80cf4dd3ff1146c48353c2 Mon Sep 17 00:00:00 2001 From: nishantmonu51 Date: Tue, 8 Apr 2014 17:26:51 +0530 Subject: [PATCH 04/15] fix casing and add comment --- processing/src/main/java/io/druid/query/topn/TopNBinaryFn.java | 3 ++- .../java/io/druid/query/topn/TopNNumericResultBuilder.java | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/processing/src/main/java/io/druid/query/topn/TopNBinaryFn.java b/processing/src/main/java/io/druid/query/topn/TopNBinaryFn.java index f848f3c9e06..8c91b8f7b53 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNBinaryFn.java +++ b/processing/src/main/java/io/druid/query/topn/TopNBinaryFn.java @@ -94,6 +94,7 @@ public class TopNBinaryFn implements BinaryFn, Result retVal = new LinkedHashMap(aggregations.size() + 2); retVal.put(dimension, dimensionValue); @@ -102,7 +103,7 @@ public class TopNBinaryFn implements BinaryFn, Result Date: Fri, 11 Apr 2014 12:18:11 +0530 Subject: [PATCH 05/15] persist-n-merge in separate pool persist-n-merge in separate pool, do not block intermediate persists --- .../realtime/plumber/RealtimePlumber.java | 88 +++++++++++-------- 1 file changed, 52 insertions(+), 36 deletions(-) diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java index 80bd59235c5..e9b0ca388a2 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java @@ -68,7 +68,6 @@ import java.util.concurrent.ScheduledExecutorService; public class RealtimePlumber implements Plumber { private static final EmittingLogger log = new EmittingLogger(RealtimePlumber.class); - private final Period windowPeriod; private final File basePersistDirectory; private final IndexGranularity segmentGranularity; @@ -84,16 +83,15 @@ public class RealtimePlumber implements Plumber private final SegmentPublisher segmentPublisher; private final ServerView serverView; private final int maxPendingPersists; - private final Object handoffCondition = new Object(); private final Map sinks = Maps.newConcurrentMap(); private final VersionedIntervalTimeline sinkTimeline = new VersionedIntervalTimeline( String.CASE_INSENSITIVE_ORDER ); - private volatile boolean shuttingDown = false; private volatile boolean stopped = false; private volatile ExecutorService persistExecutor = null; + private volatile ExecutorService mergeExecutor = null; private volatile ScheduledExecutorService scheduledExecutor = null; public RealtimePlumber( @@ -306,7 +304,7 @@ public class RealtimePlumber implements Plumber final String threadName = String.format( "%s-%s-persist-n-merge", schema.getDataSource(), new DateTime(truncatedTime) ); - persistExecutor.execute( + mergeExecutor.execute( new ThreadRenamingRunnable(threadName) { @Override @@ -315,10 +313,10 @@ public class RealtimePlumber implements Plumber final Interval interval = sink.getInterval(); for (FireHydrant hydrant : sink) { - if (!hydrant.hasSwapped()) { - log.info("Hydrant[%s] hasn't swapped yet, swapping. Sink[%s]", hydrant, sink); - final int rowCount = persistHydrant(hydrant, schema, interval); - metrics.incrementRowOutputCount(rowCount); + if (!hydrant.hasSwapped()) { + log.info("Hydrant[%s] hasn't swapped yet, swapping. Sink[%s]", hydrant, sink); + final int rowCount = persistHydrant(hydrant, schema, interval); + metrics.incrementRowOutputCount(rowCount); } } @@ -431,6 +429,13 @@ public class RealtimePlumber implements Plumber "plumber_persist_%d", maxPendingPersists ); } + if (persistExecutor == null) { + // use a blocking single threaded executor to throttle the firehose when write to disk is slow + mergeExecutor = Execs.newBlockingSingleThreaded( + "plumber_persist_%d", maxPendingPersists + ); + } + if (scheduledExecutor == null) { scheduledExecutor = Executors.newScheduledThreadPool( 1, @@ -592,7 +597,11 @@ public class RealtimePlumber implements Plumber log.info("Adding entry[%s] for merge and push.", entry); sinksToPush.add(entry); } else { - log.warn("[%s] < [%s] Skipping persist and merge.", new DateTime(intervalStart), minTimestampAsDate); + log.warn( + "[%s] < [%s] Skipping persist and merge.", + new DateTime(intervalStart), + minTimestampAsDate + ); } } @@ -660,39 +669,46 @@ public class RealtimePlumber implements Plumber */ protected int persistHydrant(FireHydrant indexToPersist, Schema schema, Interval interval) { - if (indexToPersist.hasSwapped()) { + synchronized (indexToPersist) { + if (indexToPersist.hasSwapped()) { + log.info( + "DataSource[%s], Interval[%s], Hydrant[%s] already swapped. Ignoring request to persist.", + schema.getDataSource(), interval, indexToPersist + ); + return 0; + } + log.info( - "DataSource[%s], Interval[%s], Hydrant[%s] already swapped. Ignoring request to persist.", - schema.getDataSource(), interval, indexToPersist + "DataSource[%s], Interval[%s], persisting Hydrant[%s]", + schema.getDataSource(), + interval, + indexToPersist ); - return 0; - } + try { + int numRows = indexToPersist.getIndex().size(); - log.info("DataSource[%s], Interval[%s], persisting Hydrant[%s]", schema.getDataSource(), interval, indexToPersist); - try { - int numRows = indexToPersist.getIndex().size(); + File persistedFile = IndexMerger.persist( + indexToPersist.getIndex(), + new File(computePersistDir(schema, interval), String.valueOf(indexToPersist.getCount())) + ); - File persistedFile = IndexMerger.persist( - indexToPersist.getIndex(), - new File(computePersistDir(schema, interval), String.valueOf(indexToPersist.getCount())) - ); + indexToPersist.swapSegment( + new QueryableIndexSegment( + indexToPersist.getSegment().getIdentifier(), + IndexIO.loadIndex(persistedFile) + ) + ); - indexToPersist.swapSegment( - new QueryableIndexSegment( - indexToPersist.getSegment().getIdentifier(), - IndexIO.loadIndex(persistedFile) - ) - ); + return numRows; + } + catch (IOException e) { + log.makeAlert("dataSource[%s] -- incremental persist failed", schema.getDataSource()) + .addData("interval", interval) + .addData("count", indexToPersist.getCount()) + .emit(); - return numRows; - } - catch (IOException e) { - log.makeAlert("dataSource[%s] -- incremental persist failed", schema.getDataSource()) - .addData("interval", interval) - .addData("count", indexToPersist.getCount()) - .emit(); - - throw Throwables.propagate(e); + throw Throwables.propagate(e); + } } } From 595ad7d21d2c320b6a69d886a256723ac1d95cd4 Mon Sep 17 00:00:00 2001 From: nishantmonu51 Date: Fri, 11 Apr 2014 12:34:17 +0530 Subject: [PATCH 06/15] rename thread --- .../segment/realtime/plumber/RealtimePlumber.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java index e9b0ca388a2..265015a990c 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java @@ -313,10 +313,10 @@ public class RealtimePlumber implements Plumber final Interval interval = sink.getInterval(); for (FireHydrant hydrant : sink) { - if (!hydrant.hasSwapped()) { - log.info("Hydrant[%s] hasn't swapped yet, swapping. Sink[%s]", hydrant, sink); - final int rowCount = persistHydrant(hydrant, schema, interval); - metrics.incrementRowOutputCount(rowCount); + if (!hydrant.hasSwapped()) { + log.info("Hydrant[%s] hasn't swapped yet, swapping. Sink[%s]", hydrant, sink); + final int rowCount = persistHydrant(hydrant, schema, interval); + metrics.incrementRowOutputCount(rowCount); } } @@ -429,10 +429,10 @@ public class RealtimePlumber implements Plumber "plumber_persist_%d", maxPendingPersists ); } - if (persistExecutor == null) { + if (mergeExecutor == null) { // use a blocking single threaded executor to throttle the firehose when write to disk is slow mergeExecutor = Execs.newBlockingSingleThreaded( - "plumber_persist_%d", maxPendingPersists + "plumber_merge_%d", 1 ); } From 077dd7c589dd9933bc9ef83eeb41389d683d59f0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Mon, 14 Apr 2014 16:11:12 -0700 Subject: [PATCH 07/15] enable cache monitor on computes by default --- services/src/main/java/io/druid/cli/CliHistorical.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/services/src/main/java/io/druid/cli/CliHistorical.java b/services/src/main/java/io/druid/cli/CliHistorical.java index 8f0c8769397..9692375e264 100644 --- a/services/src/main/java/io/druid/cli/CliHistorical.java +++ b/services/src/main/java/io/druid/cli/CliHistorical.java @@ -26,6 +26,7 @@ import com.metamx.common.logger.Logger; import io.airlift.command.Command; import io.druid.client.cache.Cache; import io.druid.client.cache.CacheConfig; +import io.druid.client.cache.CacheMonitor; import io.druid.client.cache.CacheProvider; import io.druid.guice.Jerseys; import io.druid.guice.JsonConfigProvider; @@ -38,6 +39,7 @@ import io.druid.server.QueryResource; import io.druid.server.coordination.ServerManager; import io.druid.server.coordination.ZkCoordinator; import io.druid.server.initialization.JettyServerInitializer; +import io.druid.server.metrics.MetricsModule; import org.eclipse.jetty.server.Server; import java.util.List; @@ -77,10 +79,11 @@ public class CliHistorical extends ServerRunnable LifecycleModule.register(binder, ZkCoordinator.class); LifecycleModule.register(binder, Server.class); + binder.bind(Cache.class).toProvider(CacheProvider.class).in(ManageLifecycle.class); JsonConfigProvider.bind(binder, "druid.historical.cache", CacheProvider.class); JsonConfigProvider.bind(binder, "druid.historical.cache", CacheConfig.class); - + MetricsModule.register(binder, CacheMonitor.class); } } ); From 16bffe05884b05833c7906d5adf4dc8ac4f85ed3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Mon, 14 Apr 2014 16:23:54 -0700 Subject: [PATCH 08/15] default cache size to zero --- .../src/main/java/io/druid/client/cache/LocalCacheProvider.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/io/druid/client/cache/LocalCacheProvider.java b/server/src/main/java/io/druid/client/cache/LocalCacheProvider.java index 716f5abab93..62cd3d9caaa 100644 --- a/server/src/main/java/io/druid/client/cache/LocalCacheProvider.java +++ b/server/src/main/java/io/druid/client/cache/LocalCacheProvider.java @@ -29,7 +29,7 @@ public class LocalCacheProvider implements CacheProvider { @JsonProperty @Min(0) - private long sizeInBytes = 10485760; + private long sizeInBytes = 0; @JsonProperty @Min(0) From ce250222bf3de0c35ab46bdc00061421d4c2d115 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Mon, 14 Apr 2014 16:33:08 -0700 Subject: [PATCH 09/15] update docs --- docs/content/Broker-Config.md | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/docs/content/Broker-Config.md b/docs/content/Broker-Config.md index 552320ac583..44ece979580 100644 --- a/docs/content/Broker-Config.md +++ b/docs/content/Broker-Config.md @@ -81,7 +81,7 @@ druid.server.http.numThreads=50 druid.request.logging.type=emitter druid.request.logging.feed=druid_requests -druid.monitoring.monitors=["com.metamx.metrics.SysMonitor","com.metamx.metrics.JvmMonitor", "io.druid.client.cache.CacheMonitor"] +druid.monitoring.monitors=["com.metamx.metrics.SysMonitor","com.metamx.metrics.JvmMonitor"] # Emit metrics over http druid.emitter=http @@ -106,16 +106,16 @@ The broker module uses several of the default modules in [Configuration](Configu |Property|Description|Default| |--------|-----------|-------| -|`druid.broker.cache.sizeInBytes`|Maximum size of the cache. If this is zero, cache is disabled.|10485760 (10MB)| -|`druid.broker.cache.initialSize`|The initial size of the cache in bytes.|500000| -|`druid.broker.cache.logEvictionCount`|If this is non-zero, there will be an eviction of entries.|0| +|`druid.broker.cache.sizeInBytes`|Maximum cache size in bytes. Zero disables caching.|0| +|`druid.broker.cache.initialSize`|Initial size of the hashtable backing the cache.|500000| +|`druid.broker.cache.logEvictionCount`|If non-zero, log cache eviction every `logEvictionCount` items.|0| #### Memcache |Property|Description|Default| |--------|-----------|-------| -|`druid.broker.cache.expiration`|Memcache [expiration time ](https://code.google.com/p/memcached/wiki/NewCommands#Standard_Protocol).|2592000 (30 days)| -|`druid.broker.cache.timeout`|Maximum time in milliseconds to wait for a response from Memcache.|500| -|`druid.broker.cache.hosts`|Memcache hosts.|none| -|`druid.broker.cache.maxObjectSize`|Maximum object size in bytes for a Memcache object.|52428800 (50 MB)| -|`druid.broker.cache.memcachedPrefix`|Key prefix for all keys in Memcache.|druid| +|`druid.broker.cache.expiration`|Memcached [expiration time](https://code.google.com/p/memcached/wiki/NewCommands#Standard_Protocol).|2592000 (30 days)| +|`druid.broker.cache.timeout`|Maximum time in milliseconds to wait for a response from Memcached.|500| +|`druid.broker.cache.hosts`|Command separated list of Memcached hosts ``.|none| +|`druid.broker.cache.maxObjectSize`|Maximum object size in bytes for a Memcached object.|52428800 (50 MB)| +|`druid.broker.cache.memcachedPrefix`|Key prefix for all keys in Memcached.|druid| From 3172383e56c75adc128ecd1a52ceef2494d72b58 Mon Sep 17 00:00:00 2001 From: nishantmonu51 Date: Tue, 15 Apr 2014 12:55:40 +0530 Subject: [PATCH 10/15] modify comment --- .../java/io/druid/segment/realtime/plumber/RealtimePlumber.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java index 265015a990c..0210219a3d3 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java @@ -298,7 +298,7 @@ public class RealtimePlumber implements Plumber ); } - // Submits persist-n-merge task for a Sink to the persistExecutor + // Submits persist-n-merge task for a Sink to the mergeExecutor private void persistAndMerge(final long truncatedTime, final Sink sink) { final String threadName = String.format( From d66ae5ac1079ad5104e506d4138faf4e29220e67 Mon Sep 17 00:00:00 2001 From: nishantmonu51 Date: Tue, 15 Apr 2014 13:37:31 +0530 Subject: [PATCH 11/15] fix dependent PostAggregators --- .../query/aggregation/AggregatorUtil.java | 53 ++++++++++++++ .../AggregateTopNMetricFirstAlgorithm.java | 8 ++- .../topn/DimExtractionTopNAlgorithm.java | 11 +-- .../query/topn/InvertedTopNMetricSpec.java | 6 +- .../topn/LexicographicTopNMetricSpec.java | 6 +- .../query/topn/NumericTopNMetricSpec.java | 6 +- .../druid/query/topn/PooledTopNAlgorithm.java | 26 ++++--- .../io/druid/query/topn/TopNBinaryFn.java | 24 +++++-- .../topn/TopNLexicographicResultBuilder.java | 10 +-- .../io/druid/query/topn/TopNMetricSpec.java | 4 +- .../query/topn/TopNNumericResultBuilder.java | 18 ++--- .../druid/query/topn/TopNResultBuilder.java | 4 +- .../io/druid/query/QueryRunnerTestHelper.java | 14 +++- .../query/aggregation/AggregatorUtilTest.java | 69 ++++++++++++++++++ .../druid/query/topn/TopNQueryRunnerTest.java | 71 +++++++++++++++++++ 15 files changed, 282 insertions(+), 48 deletions(-) create mode 100644 processing/src/main/java/io/druid/query/aggregation/AggregatorUtil.java create mode 100644 processing/src/test/java/io/druid/query/aggregation/AggregatorUtilTest.java diff --git a/processing/src/main/java/io/druid/query/aggregation/AggregatorUtil.java b/processing/src/main/java/io/druid/query/aggregation/AggregatorUtil.java new file mode 100644 index 00000000000..6c4b49fd067 --- /dev/null +++ b/processing/src/main/java/io/druid/query/aggregation/AggregatorUtil.java @@ -0,0 +1,53 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.query.aggregation; + +import com.google.common.collect.Lists; + +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; + +public class AggregatorUtil +{ + /** returns the list of dependent postAggregators that should be calculated in order to calculate given postAgg + * @param postAggregatorList List of postAggregator, there is a restriction that the postAgg should be in order that it can + * @param postAggName name of the postAgg on which dependency is to be calculated + */ + public static List pruneDependentPostAgg(List postAggregatorList, String postAggName) + { + LinkedList rv = Lists.newLinkedList(); + Set deps = new HashSet<>(); + deps.add(postAggName); + // Iterate backwards to calculate deps + for (int i = postAggregatorList.size() - 1; i >= 0; i--) { + PostAggregator agg = postAggregatorList.get(i); + if (deps.contains(agg.getName())) { + rv.addFirst(agg); // add to the beginning of List + deps.remove(agg.getName()); + deps.addAll(agg.getDependentFields()); + } + } + + return rv; + } + +} diff --git a/processing/src/main/java/io/druid/query/topn/AggregateTopNMetricFirstAlgorithm.java b/processing/src/main/java/io/druid/query/topn/AggregateTopNMetricFirstAlgorithm.java index 84be056fea1..cd1a60e72fc 100644 --- a/processing/src/main/java/io/druid/query/topn/AggregateTopNMetricFirstAlgorithm.java +++ b/processing/src/main/java/io/druid/query/topn/AggregateTopNMetricFirstAlgorithm.java @@ -56,7 +56,6 @@ public class AggregateTopNMetricFirstAlgorithm implements TopNAlgorithm aggFactories, + List postAggs ) { - return delegate.getResultBuilder(timestamp, dimSpec, threshold, comparator); + return delegate.getResultBuilder(timestamp, dimSpec, threshold, comparator, aggFactories, postAggs); } @Override diff --git a/processing/src/main/java/io/druid/query/topn/LexicographicTopNMetricSpec.java b/processing/src/main/java/io/druid/query/topn/LexicographicTopNMetricSpec.java index 5c1b9ab41e8..b7c7c6a2565 100644 --- a/processing/src/main/java/io/druid/query/topn/LexicographicTopNMetricSpec.java +++ b/processing/src/main/java/io/druid/query/topn/LexicographicTopNMetricSpec.java @@ -80,10 +80,12 @@ public class LexicographicTopNMetricSpec implements TopNMetricSpec DateTime timestamp, DimensionSpec dimSpec, int threshold, - Comparator comparator + Comparator comparator, + List aggFactories, + List postAggs ) { - return new TopNLexicographicResultBuilder(timestamp, dimSpec, threshold, previousStop, comparator); + return new TopNLexicographicResultBuilder(timestamp, dimSpec, threshold, previousStop, comparator, aggFactories); } @Override diff --git a/processing/src/main/java/io/druid/query/topn/NumericTopNMetricSpec.java b/processing/src/main/java/io/druid/query/topn/NumericTopNMetricSpec.java index fe5bce241a6..9ad97e239cd 100644 --- a/processing/src/main/java/io/druid/query/topn/NumericTopNMetricSpec.java +++ b/processing/src/main/java/io/druid/query/topn/NumericTopNMetricSpec.java @@ -121,10 +121,12 @@ public class NumericTopNMetricSpec implements TopNMetricSpec DateTime timestamp, DimensionSpec dimSpec, int threshold, - Comparator comparator + Comparator comparator, + List aggFactories, + List postAggs ) { - return new TopNNumericResultBuilder(timestamp, dimSpec, metric, threshold, comparator); + return new TopNNumericResultBuilder(timestamp, dimSpec, metric, threshold, comparator, aggFactories, postAggs); } @Override diff --git a/processing/src/main/java/io/druid/query/topn/PooledTopNAlgorithm.java b/processing/src/main/java/io/druid/query/topn/PooledTopNAlgorithm.java index d87631c7b57..6dd44d0a4a8 100644 --- a/processing/src/main/java/io/druid/query/topn/PooledTopNAlgorithm.java +++ b/processing/src/main/java/io/druid/query/topn/PooledTopNAlgorithm.java @@ -35,7 +35,8 @@ import java.util.Comparator; /** */ -public class PooledTopNAlgorithm extends BaseTopNAlgorithm +public class PooledTopNAlgorithm + extends BaseTopNAlgorithm { private final Capabilities capabilities; private final TopNQuery query; @@ -117,7 +118,12 @@ public class PooledTopNAlgorithm extends BaseTopNAlgorithm resultsBufHolder; private final ByteBuffer resultsBuf; private final int[] aggregatorSizes; @@ -278,6 +277,11 @@ public class PooledTopNAlgorithm extends BaseTopNAlgorithm getResultsBufHolder() { return resultsBufHolder; diff --git a/processing/src/main/java/io/druid/query/topn/TopNBinaryFn.java b/processing/src/main/java/io/druid/query/topn/TopNBinaryFn.java index 8c91b8f7b53..c6ab513c726 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNBinaryFn.java +++ b/processing/src/main/java/io/druid/query/topn/TopNBinaryFn.java @@ -24,6 +24,7 @@ import io.druid.granularity.AllGranularity; import io.druid.granularity.QueryGranularity; import io.druid.query.Result; import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.AggregatorUtil; import io.druid.query.aggregation.PostAggregator; import io.druid.query.dimension.DimensionSpec; import org.joda.time.DateTime; @@ -63,9 +64,13 @@ public class TopNBinaryFn implements BinaryFn, Result, Result retVal = new LinkedHashMap(aggregations.size() + 2); retVal.put(dimension, dimensionValue); @@ -103,9 +108,7 @@ public class TopNBinaryFn implements BinaryFn, Result, Result aggFactories; private MinMaxPriorityQueue pQueue = null; public TopNLexicographicResultBuilder( @@ -48,12 +48,14 @@ public class TopNLexicographicResultBuilder implements TopNResultBuilder DimensionSpec dimSpec, int threshold, String previousStop, - final Comparator comparator + final Comparator comparator, + List aggFactories ) { this.timestamp = timestamp; this.dimSpec = dimSpec; this.previousStop = previousStop; + this.aggFactories = aggFactories; instantiatePQueue(threshold, comparator); } @@ -62,9 +64,7 @@ public class TopNLexicographicResultBuilder implements TopNResultBuilder public TopNResultBuilder addEntry( String dimName, Object dimValIndex, - Object[] metricVals, - List aggFactories, - List postAggs + Object[] metricVals ) { Map metricValues = Maps.newLinkedHashMap(); diff --git a/processing/src/main/java/io/druid/query/topn/TopNMetricSpec.java b/processing/src/main/java/io/druid/query/topn/TopNMetricSpec.java index 44103f93708..267f2f278dd 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNMetricSpec.java +++ b/processing/src/main/java/io/druid/query/topn/TopNMetricSpec.java @@ -47,7 +47,9 @@ public interface TopNMetricSpec DateTime timestamp, DimensionSpec dimSpec, int threshold, - Comparator comparator + Comparator comparator, + List aggFactories, + List postAggs ); public byte[] getCacheKey(); diff --git a/processing/src/main/java/io/druid/query/topn/TopNNumericResultBuilder.java b/processing/src/main/java/io/druid/query/topn/TopNNumericResultBuilder.java index bfe142efbb5..4a40f4bb2d5 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNNumericResultBuilder.java +++ b/processing/src/main/java/io/druid/query/topn/TopNNumericResultBuilder.java @@ -23,6 +23,7 @@ import com.google.common.collect.Maps; import com.google.common.collect.MinMaxPriorityQueue; import io.druid.query.Result; import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.AggregatorUtil; import io.druid.query.aggregation.PostAggregator; import io.druid.query.dimension.DimensionSpec; import org.joda.time.DateTime; @@ -40,6 +41,8 @@ public class TopNNumericResultBuilder implements TopNResultBuilder private final DateTime timestamp; private final DimensionSpec dimSpec; private final String metricName; + private final List aggFactories; + private final List postAggs; private MinMaxPriorityQueue pQueue = null; public TopNNumericResultBuilder( @@ -47,12 +50,16 @@ public class TopNNumericResultBuilder implements TopNResultBuilder DimensionSpec dimSpec, String metricName, int threshold, - final Comparator comparator + final Comparator comparator, + List aggFactories, + List postAggs ) { this.timestamp = timestamp; this.dimSpec = dimSpec; this.metricName = metricName; + this.aggFactories = aggFactories; + this.postAggs = AggregatorUtil.pruneDependentPostAgg(postAggs, this.metricName); instantiatePQueue(threshold, comparator); } @@ -61,9 +68,7 @@ public class TopNNumericResultBuilder implements TopNResultBuilder public TopNResultBuilder addEntry( String dimName, Object dimValIndex, - Object[] metricVals, - List aggFactories, - List postAggs + Object[] metricVals ) { Map metricValues = Maps.newLinkedHashMap(); @@ -76,10 +81,7 @@ public class TopNNumericResultBuilder implements TopNResultBuilder } for (PostAggregator postAgg : postAggs) { - if (postAgg.getName().equalsIgnoreCase(metricName)) { - metricValues.put(postAgg.getName(), postAgg.compute(metricValues)); - break; - } + metricValues.put(postAgg.getName(), postAgg.compute(metricValues)); } Object topNMetricVal = metricValues.get(metricName); diff --git a/processing/src/main/java/io/druid/query/topn/TopNResultBuilder.java b/processing/src/main/java/io/druid/query/topn/TopNResultBuilder.java index 5823ee3eece..97b20175380 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNResultBuilder.java +++ b/processing/src/main/java/io/druid/query/topn/TopNResultBuilder.java @@ -33,9 +33,7 @@ public interface TopNResultBuilder public TopNResultBuilder addEntry( String dimName, Object dimValIndex, - Object[] metricVals, - List aggFactories, - List postAggs + Object[] metricVals ); public TopNResultBuilder addEntry( diff --git a/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java b/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java index 7ca66b53d7e..b7e7fe0f6de 100644 --- a/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java +++ b/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java @@ -60,6 +60,7 @@ public class QueryRunnerTestHelper public static final String indexMetric = "index"; public static final String uniqueMetric = "uniques"; public static final String addRowsIndexConstantMetric = "addRowsIndexConstant"; + public static String dependentPostAggMetric = "dependentPostAgg"; public static final CountAggregatorFactory rowsCount = new CountAggregatorFactory("rows"); public static final LongSumAggregatorFactory indexLongSum = new LongSumAggregatorFactory("index", "index"); public static final DoubleSumAggregatorFactory indexDoubleSum = new DoubleSumAggregatorFactory("index", "index"); @@ -72,8 +73,19 @@ public class QueryRunnerTestHelper public static final FieldAccessPostAggregator indexPostAgg = new FieldAccessPostAggregator("index", "index"); public static final ArithmeticPostAggregator addRowsIndexConstant = new ArithmeticPostAggregator( - "addRowsIndexConstant", "+", Lists.newArrayList(constant, rowsPostAgg, indexPostAgg) + addRowsIndexConstantMetric, "+", Lists.newArrayList(constant, rowsPostAgg, indexPostAgg) ); + // dependent on AddRowsIndexContact postAgg + public static final ArithmeticPostAggregator dependentPostAgg = new ArithmeticPostAggregator( + dependentPostAggMetric, + "+", + Lists.newArrayList( + constant, + new FieldAccessPostAggregator(addRowsIndexConstantMetric, addRowsIndexConstantMetric) + ) + ); + + public static final List commonAggregators = Arrays.asList( rowsCount, indexDoubleSum, diff --git a/processing/src/test/java/io/druid/query/aggregation/AggregatorUtilTest.java b/processing/src/test/java/io/druid/query/aggregation/AggregatorUtilTest.java new file mode 100644 index 00000000000..7fd91bf74ff --- /dev/null +++ b/processing/src/test/java/io/druid/query/aggregation/AggregatorUtilTest.java @@ -0,0 +1,69 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.query.aggregation; + +import com.google.common.collect.Lists; +import io.druid.query.aggregation.post.ArithmeticPostAggregator; +import io.druid.query.aggregation.post.ConstantPostAggregator; +import io.druid.query.aggregation.post.FieldAccessPostAggregator; +import org.junit.Assert; +import org.junit.Test; + +import java.util.List; + +public class AggregatorUtilTest +{ + + @Test + public void testPruneDependentPostAgg() + { + PostAggregator agg1 = new ArithmeticPostAggregator( + "abc", "+", Lists.newArrayList( + new ConstantPostAggregator("1", 1L, 1L), new ConstantPostAggregator("2", 2L, 2L) + ) + ); + PostAggregator dependency1 = new ArithmeticPostAggregator( + "dep1", "+", Lists.newArrayList( + new ConstantPostAggregator("1", 1L, 1L), new ConstantPostAggregator("4", 4L, 4L) + ) + ); + PostAggregator agg2 = new FieldAccessPostAggregator("def", "def"); + PostAggregator dependency2 = new FieldAccessPostAggregator("dep2", "dep2"); + PostAggregator aggregator = new ArithmeticPostAggregator( + "finalAgg", + "+", + Lists.newArrayList( + new FieldAccessPostAggregator("dep1", "dep1"), + new FieldAccessPostAggregator("dep2", "dep2") + ) + ); + List prunedAgg = AggregatorUtil.pruneDependentPostAgg( + Lists.newArrayList( + agg1, + dependency1, + agg2, + dependency2, + aggregator + ), aggregator.getName() + ); + Assert.assertEquals(Lists.newArrayList(dependency1, dependency2, aggregator), prunedAgg); + } + +} diff --git a/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerTest.java b/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerTest.java index 291eab8171a..c1908c290bf 100644 --- a/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerTest.java @@ -1176,4 +1176,75 @@ public class TopNQueryRunnerTest TestHelper.assertExpectedResults(expectedResults, runner.run(query)); } + + @Test + public void testTopNDependentPostAgg() { + TopNQuery query = new TopNQueryBuilder() + .dataSource(QueryRunnerTestHelper.dataSource) + .granularity(QueryRunnerTestHelper.allGran) + .dimension(providerDimension) + .metric(QueryRunnerTestHelper.dependentPostAggMetric) + .threshold(4) + .intervals(QueryRunnerTestHelper.fullOnInterval) + .aggregators( + Lists.newArrayList( + Iterables.concat( + QueryRunnerTestHelper.commonAggregators, + Lists.newArrayList( + new MaxAggregatorFactory("maxIndex", "index"), + new MinAggregatorFactory("minIndex", "index") + ) + ) + ) + ) + .postAggregators( + Arrays.asList( + QueryRunnerTestHelper.addRowsIndexConstant, + QueryRunnerTestHelper.dependentPostAgg + ) + ) + .build(); + + List> expectedResults = Arrays.asList( + new Result( + new DateTime("2011-01-12T00:00:00.000Z"), + new TopNResultValue( + Arrays.>asList( + ImmutableMap.builder() + .put(providerDimension, "total_market") + .put("rows", 186L) + .put("index", 215679.82879638672D) + .put("addRowsIndexConstant", 215866.82879638672D) + .put(QueryRunnerTestHelper.dependentPostAggMetric, 215867.82879638672D) + .put("uniques", QueryRunnerTestHelper.UNIQUES_2) + .put("maxIndex", 1743.9217529296875D) + .put("minIndex", 792.3260498046875D) + .build(), + ImmutableMap.builder() + .put(providerDimension, "upfront") + .put("rows", 186L) + .put("index", 192046.1060180664D) + .put("addRowsIndexConstant", 192233.1060180664D) + .put(QueryRunnerTestHelper.dependentPostAggMetric, 192234.1060180664D) + .put("uniques", QueryRunnerTestHelper.UNIQUES_2) + .put("maxIndex", 1870.06103515625D) + .put("minIndex", 545.9906005859375D) + .build(), + ImmutableMap.builder() + .put(providerDimension, "spot") + .put("rows", 837L) + .put("index", 95606.57232284546D) + .put("addRowsIndexConstant", 96444.57232284546D) + .put(QueryRunnerTestHelper.dependentPostAggMetric, 96445.57232284546D) + .put("uniques", QueryRunnerTestHelper.UNIQUES_9) + .put("maxIndex", 277.2735290527344D) + .put("minIndex", 59.02102279663086D) + .build() + ) + ) + ) + ); + + TestHelper.assertExpectedResults(expectedResults, runner.run(query)); + } } From 3eb8160b51fc77359d4c30cc13ece10298fb9e96 Mon Sep 17 00:00:00 2001 From: nishantmonu51 Date: Tue, 15 Apr 2014 21:06:30 +0530 Subject: [PATCH 12/15] review comments --- .../query/timeseries/TimeseriesQueryQueryToolChest.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java index b692123a5e0..72db385e8dc 100644 --- a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java @@ -146,12 +146,7 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest( result.getTimestamp(), From 6217c670271843d3010c317da8902cd2504464fb Mon Sep 17 00:00:00 2001 From: nishantmonu51 Date: Tue, 15 Apr 2014 22:55:40 +0530 Subject: [PATCH 13/15] complete javadoc, add comment, use Lists.reverse --- .../io/druid/query/aggregation/AggregatorUtil.java | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/processing/src/main/java/io/druid/query/aggregation/AggregatorUtil.java b/processing/src/main/java/io/druid/query/aggregation/AggregatorUtil.java index 6c4b49fd067..e0e3fc2d1ed 100644 --- a/processing/src/main/java/io/druid/query/aggregation/AggregatorUtil.java +++ b/processing/src/main/java/io/druid/query/aggregation/AggregatorUtil.java @@ -28,8 +28,11 @@ import java.util.Set; public class AggregatorUtil { - /** returns the list of dependent postAggregators that should be calculated in order to calculate given postAgg - * @param postAggregatorList List of postAggregator, there is a restriction that the postAgg should be in order that it can + /** + * returns the list of dependent postAggregators that should be calculated in order to calculate given postAgg + * + * @param postAggregatorList List of postAggregator, there is a restriction that the list should be in an order + * such that all the dependencies of any given aggregator should occur before that aggregator * @param postAggName name of the postAgg on which dependency is to be calculated */ public static List pruneDependentPostAgg(List postAggregatorList, String postAggName) @@ -37,9 +40,8 @@ public class AggregatorUtil LinkedList rv = Lists.newLinkedList(); Set deps = new HashSet<>(); deps.add(postAggName); - // Iterate backwards to calculate deps - for (int i = postAggregatorList.size() - 1; i >= 0; i--) { - PostAggregator agg = postAggregatorList.get(i); + // Iterate backwards to find the last calculated aggregate and add dependent aggregator as we find dependencies in reverse order + for (PostAggregator agg : Lists.reverse(postAggregatorList)) { if (deps.contains(agg.getName())) { rv.addFirst(agg); // add to the beginning of List deps.remove(agg.getName()); From 36f6f8ab5a44d74480d8856895d206b25a9d66b1 Mon Sep 17 00:00:00 2001 From: nishantmonu51 Date: Wed, 16 Apr 2014 00:49:56 +0530 Subject: [PATCH 14/15] Add testOutOfOrderPruneDependentPostAgg --- .../query/aggregation/AggregatorUtil.java | 3 +- .../query/aggregation/AggregatorUtilTest.java | 35 +++++++++++++++++++ 2 files changed, 37 insertions(+), 1 deletion(-) diff --git a/processing/src/main/java/io/druid/query/aggregation/AggregatorUtil.java b/processing/src/main/java/io/druid/query/aggregation/AggregatorUtil.java index e0e3fc2d1ed..6b508c22b3b 100644 --- a/processing/src/main/java/io/druid/query/aggregation/AggregatorUtil.java +++ b/processing/src/main/java/io/druid/query/aggregation/AggregatorUtil.java @@ -32,7 +32,8 @@ public class AggregatorUtil * returns the list of dependent postAggregators that should be calculated in order to calculate given postAgg * * @param postAggregatorList List of postAggregator, there is a restriction that the list should be in an order - * such that all the dependencies of any given aggregator should occur before that aggregator + * such that all the dependencies of any given aggregator should occur before that aggregator. + * See AggregatorUtilTest.testOutOfOrderPruneDependentPostAgg for example. * @param postAggName name of the postAgg on which dependency is to be calculated */ public static List pruneDependentPostAgg(List postAggregatorList, String postAggName) diff --git a/processing/src/test/java/io/druid/query/aggregation/AggregatorUtilTest.java b/processing/src/test/java/io/druid/query/aggregation/AggregatorUtilTest.java index 7fd91bf74ff..767b2fce1a3 100644 --- a/processing/src/test/java/io/druid/query/aggregation/AggregatorUtilTest.java +++ b/processing/src/test/java/io/druid/query/aggregation/AggregatorUtilTest.java @@ -66,4 +66,39 @@ public class AggregatorUtilTest Assert.assertEquals(Lists.newArrayList(dependency1, dependency2, aggregator), prunedAgg); } + @Test + public void testOutOfOrderPruneDependentPostAgg() + { + PostAggregator agg1 = new ArithmeticPostAggregator( + "abc", "+", Lists.newArrayList( + new ConstantPostAggregator("1", 1L, 1L), new ConstantPostAggregator("2", 2L, 2L) + ) + ); + PostAggregator dependency1 = new ArithmeticPostAggregator( + "dep1", "+", Lists.newArrayList( + new ConstantPostAggregator("1", 1L, 1L), new ConstantPostAggregator("4", 4L, 4L) + ) + ); + PostAggregator agg2 = new FieldAccessPostAggregator("def", "def"); + PostAggregator dependency2 = new FieldAccessPostAggregator("dep2", "dep2"); + PostAggregator aggregator = new ArithmeticPostAggregator( + "finalAgg", + "+", + Lists.newArrayList( + new FieldAccessPostAggregator("dep1", "dep1"), + new FieldAccessPostAggregator("dep2", "dep2") + ) + ); + List prunedAgg = AggregatorUtil.pruneDependentPostAgg( + Lists.newArrayList( + agg1, + dependency1, + aggregator, // dependency is added later than the aggregator + agg2, + dependency2 + ), aggregator.getName() + ); + Assert.assertEquals(Lists.newArrayList(dependency1, aggregator), prunedAgg); + } + } From 3117f439e0a26e951a1bcae8fc14cc8cc4b2a741 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Tue, 15 Apr 2014 14:02:18 -0700 Subject: [PATCH 15/15] update to jetty 9.1.4 to fix bug https://bugs.eclipse.org/bugs/show_bug.cgi?id=430654 --- pom.xml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pom.xml b/pom.xml index faadaa5b96f..c48ee0e30d1 100644 --- a/pom.xml +++ b/pom.xml @@ -313,17 +313,17 @@ org.eclipse.jetty jetty-server - 9.1.3.v20140225 + 9.1.4.v20140401 org.eclipse.jetty jetty-servlet - 9.1.3.v20140225 + 9.1.4.v20140401 org.eclipse.jetty jetty-servlets - 9.1.3.v20140225 + 9.1.4.v20140401 joda-time