diff --git a/pom.xml b/pom.xml
index 49d3ccf8475..685de340c4f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -174,6 +174,12 @@
org.apache.curator
curator-framework
${apache.curator.version}
+
+
+ org.jboss.netty
+ netty
+
+
org.apache.curator
diff --git a/processing/src/main/java/io/druid/query/BySegmentResultValueClass.java b/processing/src/main/java/io/druid/query/BySegmentResultValueClass.java
index c26bfb35706..3a3544cf47a 100644
--- a/processing/src/main/java/io/druid/query/BySegmentResultValueClass.java
+++ b/processing/src/main/java/io/druid/query/BySegmentResultValueClass.java
@@ -64,10 +64,44 @@ public class BySegmentResultValueClass
@Override
public String toString()
{
- return "BySegmentTimeseriesResultValue{" +
+ return "BySegmentResultValue{" +
"results=" + results +
", segmentId='" + segmentId + '\'' +
", interval='" + interval + '\'' +
'}';
}
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ BySegmentResultValueClass that = (BySegmentResultValueClass) o;
+
+ if (interval != null ? !interval.equals(that.interval) : that.interval != null) {
+ return false;
+ }
+ if (results != null ? !results.equals(that.results) : that.results != null) {
+ return false;
+ }
+ if (segmentId != null ? !segmentId.equals(that.segmentId) : that.segmentId != null) {
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = results != null ? results.hashCode() : 0;
+ result = 31 * result + (segmentId != null ? segmentId.hashCode() : 0);
+ result = 31 * result + (interval != null ? interval.hashCode() : 0);
+ return result;
+ }
}
diff --git a/processing/src/main/java/io/druid/query/FinalizeResultsQueryRunner.java b/processing/src/main/java/io/druid/query/FinalizeResultsQueryRunner.java
index 0eb925f3327..5a5b2c7bf4f 100644
--- a/processing/src/main/java/io/druid/query/FinalizeResultsQueryRunner.java
+++ b/processing/src/main/java/io/druid/query/FinalizeResultsQueryRunner.java
@@ -51,54 +51,43 @@ public class FinalizeResultsQueryRunner implements QueryRunner
final boolean isBySegment = query.getContextBySegment(false);
final boolean shouldFinalize = query.getContextFinalize(true);
Function finalizerFn;
- if (shouldFinalize) {
- if (isBySegment) {
- finalizerFn = new Function()
- {
- final Function baseFinalizer = toolChest.makePostComputeManipulatorFn(
- query,
- new MetricManipulationFn()
- {
- @Override
- public Object manipulate(AggregatorFactory factory, Object object)
- {
- return factory.finalizeComputation(factory.deserialize(object));
- }
- }
- );
-
- @Override
- @SuppressWarnings("unchecked")
- public T apply(@Nullable T input)
- {
- Result> result = (Result>) input;
- BySegmentResultValueClass resultsClass = result.getValue();
-
- return (T) new Result(
- result.getTimestamp(),
- new BySegmentResultValueClass(
- Lists.transform(resultsClass.getResults(), baseFinalizer),
- resultsClass.getSegmentId(),
- resultsClass.getInterval()
- )
- );
- }
- };
- } else {
- finalizerFn = toolChest.makePostComputeManipulatorFn(
+ if (isBySegment) {
+ finalizerFn = new Function()
+ {
+ final Function baseFinalizer = toolChest.makePostComputeManipulatorFn(
query,
new MetricManipulationFn()
{
@Override
public Object manipulate(AggregatorFactory factory, Object object)
{
- return factory.finalizeComputation(object);
+ if (shouldFinalize) {
+ return factory.finalizeComputation(factory.deserialize(object));
+ } else {
+ return object;
+ }
}
}
);
- }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public T apply(@Nullable T input)
+ {
+ Result> result = (Result>) input;
+ BySegmentResultValueClass resultsClass = result.getValue();
+
+ return (T) new Result(
+ result.getTimestamp(),
+ new BySegmentResultValueClass(
+ Lists.transform(resultsClass.getResults(), baseFinalizer),
+ resultsClass.getSegmentId(),
+ resultsClass.getInterval()
+ )
+ );
+ }
+ };
} else {
- // finalize is false here.
finalizerFn = toolChest.makePostComputeManipulatorFn(
query,
new MetricManipulationFn()
@@ -106,7 +95,11 @@ public class FinalizeResultsQueryRunner implements QueryRunner
@Override
public Object manipulate(AggregatorFactory factory, Object object)
{
- return object;
+ if (shouldFinalize) {
+ return factory.finalizeComputation(factory.deserialize(object));
+ } else {
+ return object;
+ }
}
}
);
diff --git a/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java b/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java
index 44799c8ea2f..1e2b6b4601d 100644
--- a/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java
+++ b/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java
@@ -171,7 +171,10 @@ public class QueryRunnerTestHelper
)
{
return new FinalizeResultsQueryRunner(
- factory.createRunner(adapter),
+ new BySegmentQueryRunner(
+ segmentId, adapter.getDataInterval().getStart(),
+ factory.createRunner(adapter)
+ ),
factory.getToolchest()
);
}
diff --git a/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerTest.java b/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerTest.java
index 540b5f2b5c5..c8628bfdca5 100644
--- a/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerTest.java
+++ b/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerTest.java
@@ -25,6 +25,7 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.metamx.common.guava.Sequences;
import io.druid.collections.StupidPool;
+import io.druid.query.BySegmentResultValueClass;
import io.druid.query.Druids;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerTestHelper;
@@ -64,7 +65,7 @@ public class TopNQueryRunnerTest
{
List