From e8b993b765226864bab3df183c9cbd43b25503d8 Mon Sep 17 00:00:00 2001 From: nishantmonu51 Date: Mon, 21 Apr 2014 21:26:34 +0530 Subject: [PATCH] fix by segment results 1) fix class cast in by segment results 2) exclude old netty version coming from curator --- pom.xml | 6 ++ .../query/BySegmentResultValueClass.java | 36 ++++++- .../query/FinalizeResultsQueryRunner.java | 71 ++++++------- .../io/druid/query/QueryRunnerTestHelper.java | 5 +- .../druid/query/topn/TopNQueryRunnerTest.java | 99 +++++++++++++++++-- .../query/topn/TopNQueryRunnerTestHelper.java | 73 -------------- 6 files changed, 168 insertions(+), 122 deletions(-) delete mode 100644 processing/src/test/java/io/druid/query/topn/TopNQueryRunnerTestHelper.java diff --git a/pom.xml b/pom.xml index 49d3ccf8475..685de340c4f 100644 --- a/pom.xml +++ b/pom.xml @@ -174,6 +174,12 @@ org.apache.curator curator-framework ${apache.curator.version} + + + org.jboss.netty + netty + + org.apache.curator diff --git a/processing/src/main/java/io/druid/query/BySegmentResultValueClass.java b/processing/src/main/java/io/druid/query/BySegmentResultValueClass.java index c26bfb35706..3a3544cf47a 100644 --- a/processing/src/main/java/io/druid/query/BySegmentResultValueClass.java +++ b/processing/src/main/java/io/druid/query/BySegmentResultValueClass.java @@ -64,10 +64,44 @@ public class BySegmentResultValueClass @Override public String toString() { - return "BySegmentTimeseriesResultValue{" + + return "BySegmentResultValue{" + "results=" + results + ", segmentId='" + segmentId + '\'' + ", interval='" + interval + '\'' + '}'; } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + BySegmentResultValueClass that = (BySegmentResultValueClass) o; + + if (interval != null ? !interval.equals(that.interval) : that.interval != null) { + return false; + } + if (results != null ? !results.equals(that.results) : that.results != null) { + return false; + } + if (segmentId != null ? !segmentId.equals(that.segmentId) : that.segmentId != null) { + return false; + } + + return true; + } + + @Override + public int hashCode() + { + int result = results != null ? results.hashCode() : 0; + result = 31 * result + (segmentId != null ? segmentId.hashCode() : 0); + result = 31 * result + (interval != null ? interval.hashCode() : 0); + return result; + } } diff --git a/processing/src/main/java/io/druid/query/FinalizeResultsQueryRunner.java b/processing/src/main/java/io/druid/query/FinalizeResultsQueryRunner.java index 0eb925f3327..5a5b2c7bf4f 100644 --- a/processing/src/main/java/io/druid/query/FinalizeResultsQueryRunner.java +++ b/processing/src/main/java/io/druid/query/FinalizeResultsQueryRunner.java @@ -51,54 +51,43 @@ public class FinalizeResultsQueryRunner implements QueryRunner final boolean isBySegment = query.getContextBySegment(false); final boolean shouldFinalize = query.getContextFinalize(true); Function finalizerFn; - if (shouldFinalize) { - if (isBySegment) { - finalizerFn = new Function() - { - final Function baseFinalizer = toolChest.makePostComputeManipulatorFn( - query, - new MetricManipulationFn() - { - @Override - public Object manipulate(AggregatorFactory factory, Object object) - { - return factory.finalizeComputation(factory.deserialize(object)); - } - } - ); - - @Override - @SuppressWarnings("unchecked") - public T apply(@Nullable T input) - { - Result> result = (Result>) input; - BySegmentResultValueClass resultsClass = result.getValue(); - - return (T) new Result( - result.getTimestamp(), - new BySegmentResultValueClass( - Lists.transform(resultsClass.getResults(), baseFinalizer), - resultsClass.getSegmentId(), - resultsClass.getInterval() - ) - ); - } - }; - } else { - finalizerFn = toolChest.makePostComputeManipulatorFn( + if (isBySegment) { + finalizerFn = new Function() + { + final Function baseFinalizer = toolChest.makePostComputeManipulatorFn( query, new MetricManipulationFn() { @Override public Object manipulate(AggregatorFactory factory, Object object) { - return factory.finalizeComputation(object); + if (shouldFinalize) { + return factory.finalizeComputation(factory.deserialize(object)); + } else { + return object; + } } } ); - } + + @Override + @SuppressWarnings("unchecked") + public T apply(@Nullable T input) + { + Result> result = (Result>) input; + BySegmentResultValueClass resultsClass = result.getValue(); + + return (T) new Result( + result.getTimestamp(), + new BySegmentResultValueClass( + Lists.transform(resultsClass.getResults(), baseFinalizer), + resultsClass.getSegmentId(), + resultsClass.getInterval() + ) + ); + } + }; } else { - // finalize is false here. finalizerFn = toolChest.makePostComputeManipulatorFn( query, new MetricManipulationFn() @@ -106,7 +95,11 @@ public class FinalizeResultsQueryRunner implements QueryRunner @Override public Object manipulate(AggregatorFactory factory, Object object) { - return object; + if (shouldFinalize) { + return factory.finalizeComputation(factory.deserialize(object)); + } else { + return object; + } } } ); diff --git a/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java b/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java index 44799c8ea2f..1e2b6b4601d 100644 --- a/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java +++ b/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java @@ -171,7 +171,10 @@ public class QueryRunnerTestHelper ) { return new FinalizeResultsQueryRunner( - factory.createRunner(adapter), + new BySegmentQueryRunner( + segmentId, adapter.getDataInterval().getStart(), + factory.createRunner(adapter) + ), factory.getToolchest() ); } diff --git a/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerTest.java b/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerTest.java index 540b5f2b5c5..c8628bfdca5 100644 --- a/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerTest.java @@ -25,6 +25,7 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.metamx.common.guava.Sequences; import io.druid.collections.StupidPool; +import io.druid.query.BySegmentResultValueClass; import io.druid.query.Druids; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerTestHelper; @@ -64,7 +65,7 @@ public class TopNQueryRunnerTest { List retVal = Lists.newArrayList(); retVal.addAll( - TopNQueryRunnerTestHelper.makeQueryRunners( + QueryRunnerTestHelper.makeQueryRunners( new TopNQueryRunnerFactory( TestQueryRunners.getPool(), new TopNQueryQueryToolChest(new TopNQueryConfig()) @@ -72,7 +73,7 @@ public class TopNQueryRunnerTest ) ); retVal.addAll( - TopNQueryRunnerTestHelper.makeQueryRunners( + QueryRunnerTestHelper.makeQueryRunners( new TopNQueryRunnerFactory( new StupidPool( new Supplier() @@ -698,13 +699,13 @@ public class TopNQueryRunnerTest .fields( Lists.newArrayList( Druids.newSelectorDimFilterBuilder() - .dimension(providerDimension) - .value("billyblank") - .build(), + .dimension(providerDimension) + .value("billyblank") + .build(), Druids.newSelectorDimFilterBuilder() - .dimension(QueryRunnerTestHelper.qualityDimension) - .value("mezzanine") - .build() + .dimension(QueryRunnerTestHelper.qualityDimension) + .value("mezzanine") + .build() ) ).build(); TopNQuery query = new TopNQueryBuilder() @@ -1260,4 +1261,86 @@ public class TopNQueryRunnerTest TestHelper.assertExpectedResults(expectedResults, runner.run(query)); } + + @Test + public void testTopNBySegmentResults() + { + TopNQuery query = new TopNQueryBuilder() + .dataSource(QueryRunnerTestHelper.dataSource) + .granularity(QueryRunnerTestHelper.allGran) + .dimension(QueryRunnerTestHelper.providerDimension) + .metric(QueryRunnerTestHelper.dependentPostAggMetric) + .threshold(4) + .intervals(QueryRunnerTestHelper.fullOnInterval) + .aggregators( + Lists.newArrayList( + Iterables.concat( + QueryRunnerTestHelper.commonAggregators, + Lists.newArrayList( + new MaxAggregatorFactory("maxIndex", "index"), + new MinAggregatorFactory("minIndex", "index") + ) + ) + ) + ) + .postAggregators( + Arrays.asList( + QueryRunnerTestHelper.addRowsIndexConstant, + QueryRunnerTestHelper.dependentPostAgg + ) + ) + .context(ImmutableMap.of("finalize", true, "bySegment", true)) + .build(); + TopNResultValue topNResult = new TopNResultValue( + Arrays.>asList( + ImmutableMap.builder() + .put(QueryRunnerTestHelper.providerDimension, "total_market") + .put("rows", 186L) + .put("index", 215679.82879638672D) + .put("addRowsIndexConstant", 215866.82879638672D) + .put(QueryRunnerTestHelper.dependentPostAggMetric, 216053.82879638672D) + .put("uniques", QueryRunnerTestHelper.UNIQUES_2) + .put("maxIndex", 1743.9217529296875D) + .put("minIndex", 792.3260498046875D) + .build(), + ImmutableMap.builder() + .put(QueryRunnerTestHelper.providerDimension, "upfront") + .put("rows", 186L) + .put("index", 192046.1060180664D) + .put("addRowsIndexConstant", 192233.1060180664D) + .put(QueryRunnerTestHelper.dependentPostAggMetric, 192420.1060180664D) + .put("uniques", QueryRunnerTestHelper.UNIQUES_2) + .put("maxIndex", 1870.06103515625D) + .put("minIndex", 545.9906005859375D) + .build(), + ImmutableMap.builder() + .put(QueryRunnerTestHelper.providerDimension, "spot") + .put("rows", 837L) + .put("index", 95606.57232284546D) + .put("addRowsIndexConstant", 96444.57232284546D) + .put(QueryRunnerTestHelper.dependentPostAggMetric, 97282.57232284546D) + .put("uniques", QueryRunnerTestHelper.UNIQUES_9) + .put("maxIndex", 277.2735290527344D) + .put("minIndex", 59.02102279663086D) + .build() + ) + ); + + List> expectedResults = Arrays.asList( + new Result( + new DateTime("2011-01-12T00:00:00.000Z"), + new BySegmentResultValueClass( + Arrays.asList( + new Result( + new DateTime("2011-01-12T00:00:00.000Z"), + topNResult + ) + ), + QueryRunnerTestHelper.segmentId, + new Interval("1970-01-01T00:00:00.000Z/2020-01-01T00:00:00.000Z") + ) + ) + ); + TestHelper.assertExpectedResults(expectedResults, runner.run(query)); + } } diff --git a/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerTestHelper.java b/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerTestHelper.java deleted file mode 100644 index 97b837a4b48..00000000000 --- a/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerTestHelper.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Druid - a distributed column store. - * Copyright (C) 2012, 2013 Metamarkets Group Inc. - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation; either version 2 - * of the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. - */ - -package io.druid.query.topn; - -import io.druid.query.FinalizeResultsQueryRunner; -import io.druid.query.Query; -import io.druid.query.QueryRunner; -import io.druid.query.QueryRunnerFactory; -import io.druid.segment.IncrementalIndexSegment; -import io.druid.segment.QueryableIndex; -import io.druid.segment.QueryableIndexSegment; -import io.druid.segment.Segment; -import io.druid.segment.TestIndex; -import io.druid.segment.incremental.IncrementalIndex; - -import java.io.IOException; -import java.util.Arrays; -import java.util.Collection; - -public class TopNQueryRunnerTestHelper -{ - @SuppressWarnings("unchecked") - public static Collection makeQueryRunners( - QueryRunnerFactory factory - ) - throws IOException - { - final IncrementalIndex rtIndex = TestIndex.getIncrementalTestIndex(); - final QueryableIndex mMappedTestIndex = TestIndex.getMMappedTestIndex(); - final QueryableIndex mergedRealtimeIndex = TestIndex.mergedRealtimeIndex(); - return Arrays.asList( - new Object[][]{ - { - makeQueryRunner(factory, new IncrementalIndexSegment(rtIndex, null)) - }, - { - makeQueryRunner(factory, new QueryableIndexSegment(null, mMappedTestIndex)) - }, - { - makeQueryRunner(factory, new QueryableIndexSegment(null, mergedRealtimeIndex)) - } - } - ); - } - - public static QueryRunner makeQueryRunner( - QueryRunnerFactory> factory, - Segment adapter - ) - { - return new FinalizeResultsQueryRunner( - factory.createRunner(adapter), - factory.getToolchest() - ); - } -} \ No newline at end of file